Багатопотоковість і паралелізм в Java - 24 Жовтня 2011 - Вчимо java разом!

Пакет java.util.concurrent містить три Executor-інтерфейси:

  • Executor
  • ExecutorService
  • ScheduledExecutorService

Цей інтерфейс був введений для відділення самого процесу відправлення завдання на виконання від механізму виконання кожної конкретної задачі. Класи, що реалізують даний інтерфейс, представляють об'єкти, які і визначають, як виконувати кожну конкретну задачу.

Цей інтерфейс є розширенням інтерфейсу Executor і додає такі корисні можливості:

  • Можливість зупинити виконуваний процес
  • Можливість виконання не тільки Runnable об'єктів, а й java.util.concurrent.Callable. Основна їхня відмінність від Runnable об'єктів - можливість повертати значення потоку, з якого робився виклик.
  • Можливість повертати об'єкт, що викликав потоку java.util.concurrent.Future, який містить серед іншого і повертається значення.

ScheduledExecutorService

Даний інтерфейс по суті справи є тим самим ExecutorService, але з можливістю відкладати початок виконання завдань на певний проміжок часу, або планувати виконання завдань через заданий тимчасовий інтервал.

Приклад використання

public class CallableImpl implements Callable public Integer call() //… return new Integer(someValue); > > //… Callable callable = new CallableImpl(); ExecutorService executor = Executors.newFixedThreadPool(5); Future future = executor.submit(callable); >try System.out.println( "Future value: " + future.get()); > catch (Exception e) e.printStackTrace(); >

Шляхом виклику Executors.newFixedThreadPool був створений пул на 5 потоків. Тимсамим, у разі необхідності створення великої кількості потоків заощаджуватиметься час на створення нового потоку шляхом використання існуючих потоків з пулу. Виклик методу get у об'єкта типу Future привів до очікування поточним потоком значення, що повертається.

Атомарні змінні

Атомарні класи надають можливість атомарного виконання операцій основних примітивних і посилальних типів. У якості прикладу розглянемо клас AtomicInteger та його основні методи. Як зрозуміло з назви, даний клас є обгорткою навколо примітивного типу int, що надає можливість атомарно оновлювати його значення.

Розглянемо основні методи цього класу:

Приклад використання 1

private AtomicInteger someValue; //… int previousBits, newBits; do previousValue = someValue.get(); newValue = changeValue(previousValue); > while (!someValue.compareAndSet(previousValue, newValue));

У наведеному вище прикладі робиться спроба оновлення деякого значення, доки оновлення не буде виконано успішно. Завдяки використанню атомарної змінної, ми уникнули необхідності використовувати синхронізацію.

AtomicReferenceFieldUpdater

Досить часто виникають ситуації, коли атомарне оновлення необхідне лише в одному з багатьох випадків використання об'єкта. Безумовно, у такій ситуації не хочеться обтяжувати себе роботою з атомарними типами і не мати можливості працювати з об'єктом безпосередньо. Для цих цілей чудово підійде AtomicReferenceFieldUpdater. Саме поле класу, яке буде використане спільно з AtomicReferenceFieldUpdater, має бути оголошене з ключовим словом volatile. З розглянутого нижче прикладу стане ясно, якимчином це можна зробити.

Приклад використання 2

public class Node private volatile InnerNode next; //. > //… private static final AtomicReferenceFieldUpdater nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, InnerNode.class, "next"); //… nextUpdater.compareAndSet(currentNode, expectedInnerNode, newInnerNode);

Синхронізатори

До синхронізаторів можна віднести різного роду структури, які відповідають за координацію потік. Розглянемо деякі такі структури, які були додані в пакеті java.util.concurrency:

У 1968 р. Е. Дейкстра запропонував зручну форму механізму захоплення/звільнення ресурсів, яку він назвав операціями P і V над семафорами, що вважаються. Вважаючим семафором називають цілісну змінну, що виконує ті ж функції, що і байт блокування. Однак на відміну від останнього вона може приймати крім "0" і "1" та інші цілі позитивні значення. Як можна зрозуміти з написаного вище, семафори використовуються для обмеження числа потоків, які використовують певний ресурс. Для виконання операцій P і V в Java класі java.util.concurrent.Semaphore існують спеціальні методи. tryAcquire - намагається отримати доступ до ресурсу в момент виклику без блокування поточного потоку. release - звільнення ресурсу. Слід зазначити, що у Семафора може бути різною сама стратегія отримання ресурсу, що звільнився.

Приклад використання

private final Semaphore available = New Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException available.acquire(); returngetNextAvailableItem(); >

public void putItem(Object x) if (markAsUnused(x)) available.release(); >

Бар'єр це засіб синхронізації, який використовується для того, щоб деяка кількість потоків очікувала закінчення один одного в деякому місці, що є бар'єром або точкою синхронізації. Після того, як всі потоки досягли точки синхронізації, вони розблокуються і можуть продовжувати виконання. На практиці бар'єри використовуються для збору результатів виконання деякої розпаралеленої задачі. У якості прикладу можна розглянути задачу множення матриць. При розпаралелювання даного завдання кожному потоку буде доручено множення певних рядків на певні стовпці. У точці синхронізації ж отримані результати збираються з усіх потоків, і будується результуюча матриця. У пакеті java.util.concurrent клас CyclicBarrier є реалізацією бар'єру. Розглянемо приклад його використання нижче.

Приклад використання

class Worker extends Thread //… @Override public void run() < // Деяка дія try < barrier.await(); > catch (InterruptedException e) < e.printStackTrace(); > catch (BrokenBarrierException e) < e.printStackTrace(); > > > //… barrier = new CyclicBarrier(N, new Runnable() public void run() < // Дії, що виконуються при досягненні бар'єру всіма потоками > ;); for (int i = 0; i exchanger = new Exchanger (); class Loop1 implements Runnable public void run() < My > loop1Value = exchanger.exchange(loop1Value ); //… < > class Loop2 implements Runnable public void run() < My <loop2Value = exchanger>> >

Клямка - засіб синхронізації, який використовується для того, щоб один або кілька потоків могли дочекатися виконання певної кількості операцій в інших потоках. переліченими вище властивостями. Цей клас працює за принципом таймера. Відбувається ініціалізація його деяким початковим значенням і зворотний відлік. При викликі методу await даного класу будь-яким потоком , він переходить у стан очікування моменту досягнення лічильником таймера значення 0. На практиці даний клас зручно використовувати для координації моменту початку і закінчення певної кількості потоків . Це означає наступне:

  • можна зробити так, щоб певна кількість потоків починалося в той самий момент часу;
  • можна відстежити момент закінчення кількох потоків.

Приклад використання 1

У першому прикладі розглянемо перший випадок з перерахованих вище, тобто. початок виконання кількох потоків одночасно.