Java Concurrency Overview Part 2

In this part I’ll try to make an overview of java.util.concurrent framework that provides high-level building blocks for creating concurrent applications. These building blocks have high performance, thread-safe, widely used in Java world and moreover they have been thoroughly tested.

traffic_jam

The reason why this framework was introduced in Java 1.5 is that low-level API such as wait(), notify() and so on are too primitive and force developers to create their own custom high-level libraries.

If you are not comfortable with low-level threading constructs such as Thread, Runnable, wait(), notify(), synchronized blocks/methods I recommend you to read Java Concurrency Overview Part 1.

In the sections below I will make an overview of almost all components one by one.

Executors

Executors provide high-level API for starting and managing threads. The API contains interfaces and implementation that can be basically depicted as follows.

executors_diag

Executor interface has only one method execute() that executes a given thread in some time in future. The way how the thread will be executed depends on concrete implementation (ForkJoinPool, ThreadPoolExecutor, ScheduledThreadPoolExecutor).

ThreadPoolExecutor

Normally you will use Executors factory that provides different ways to create ThreadPoolExecutor. But if you decided that you want to manually create a pool using ThreadPoolExecutor you need to know some tuning and configuration properties that you can set via constructor or setters.

Here go some of the core properties:

  1. Core pool size – the number of threads to keep in the pool.
  2. Maximum pool size –  the maximum number of threads to allow in the pool.
  3. Keep alive time – when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
  4. Unit –  the time unit for the keep alive time property.
  5. Work queue –  the queue to use for holding tasks before they are executed. This queue will hold only the {@code Runnable}  tasks submitted by the {@code execute} method.
  6. Thread factory – the factory to use when the executor  creates a new thread.
  7. Handler –  the handler to use when execution is blocked because the thread bounds and queue capacities are reached.

Once you created a ThreadPoolExecutor you can use one of these methods to add a task into the pool:

  1.  invokeAll() – Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first.
  2. invokeAny() – Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do before the given timeout elapses.
  3. newTaskFor() – Returns a RunnableFuture for the given runnable and default value.
  4. submit() – Submits a task for execution and returns a Future representing the pending results of the task.

Another useful methods:

  1. execute() – Executes the given task sometime in the future.
  2. remove() – Removes this task from the executor’s internal queue if it is present, thus causing it not to be run if it has not already started.
  3. shutdown() – Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  4. shutdownNow() – Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

ScheduledThreadPoolExecutor

This thread pool extends ThreadPoolExecutor so it includes all properties and methods of its parent and additionally provides possibility to run tasks after a delay or periodically by timer. Again, normally you will use Executors factory to create scheduled pool rather than manually setup all properties.

The pool implements ScheduledExecutorService interface that provide main methods to manage scheduled tasks:

  1. schedule() – Creates and executes a one-shot action that becomes enabled after the given delay.
  2. scheduleAtFixedRate() – Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.
  3. scheduleWithFixedDelay() – Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.
ForkJoinPool

Fork/Join framework was introduced in Java 1.7. It is designed for problems that can be solved using principle divide and conquer. In other words, a big task is splitted into a smaller tasks for further parallel processing. As a result, your application uses all the available processing power.

The framework is based on work-stealing algorithm which recognizes worker threads that are run out of things to do and steals tasks from other thread workers.

To create a task for for/join pool you need to extend one of two the abstract implementation of ForkJoinTask.

fj_task_hier

 

The difference between RecursiveTask and RecursiveAction is that RecursiveAction doesn’t return a result.

Here go some of the main methods of ForkJoinTask:

  1. fork() – Arranges to asynchronously execute this task in the pool the current task is running in.
  2. join() – Returns the result of the computation when it is done.
  3. invokeAll() – Forks all tasks in the specified collection.

And of course an example of usage RecursiveTask and RecursiveAction:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ThreadLocalRandom;

class MyRecursiveTask extends RecursiveTask<Long> {

    private long[] array;
    private int leftBound;
    private int rightBound;

    MyRecursiveTask(long[] array, int leftBound, int rightBound) {
        this.array = array;
        this.leftBound = leftBound;
        this.rightBound = rightBound;
    }

    @Override
    protected Long compute() {
        if (rightBound - leftBound < 500) {
            long result = 0;
            for (int i = leftBound; i < rightBound; i++) {
                result += array[i];
            }
            return result;
        } else {
            int mid = (leftBound + rightBound) / 2;

            MyRecursiveTask left = new MyRecursiveTask(array, leftBound, mid);
            left.fork();
            MyRecursiveTask right = new MyRecursiveTask(array, mid, rightBound);

            return right.compute() + left.join();
        }
    }
}

class MyRecursiveAction extends RecursiveAction {

    private long[] array;
    private int leftBound;
    private int rightBound;

    MyRecursiveAction(long[] array, int leftBound, int rightBound) {
        this.array = array;
        this.leftBound = leftBound;
        this.rightBound = rightBound;
    }

    @Override
    protected void compute() {
        if (rightBound - leftBound < 500) {
            for (int i = leftBound; i < rightBound; i++) {
                array[i] = (long) ThreadLocalRandom.current().nextInt(4, 77);
            }
        } else {
            int mid = (leftBound + rightBound) / 2;
            invokeAll(new MyRecursiveAction(array, leftBound, mid), new MyRecursiveAction(array, mid, rightBound));
        }
    }
}

public class ForkJoinExample {

    public static void main(String[] args) {
        long[] array = new long[231488];
        RecursiveAction fillArrayTask = new MyRecursiveAction(array, 0, array.length);
        ForkJoinPool mainPool = new ForkJoinPool();
        mainPool.invoke(fillArrayTask);

        MyRecursiveTask calcArray = new MyRecursiveTask(array, 0, array.length);
        Long sum = mainPool.invoke(calcArray);
        System.out.println(sum);
    }

}
Executors factory

The factory provides methods for creating thread pools. Every pool is an instance of ExecutorService interface.

Here are the factory’s main methods:

  1. newSingleThreadExecutor() – Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
  2. newFixedThreadPool() – Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.
  3. newCachedThreadPool() – Returns a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.
  4. newSingleThreadScheduledExecutor() – Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newScheduledThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
  5. newScheduledThreadPool() – Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
  6. newWorkStealingPool() – Creates a work-stealing thread pool using all available processors as its target parallelism level.

Return types

Many methods in Java concurrent framework operates with variety types of interfaces such us Callable, Future, ScheduledFuture and so on it. Now it’s time to understand what is what.

result_hier

 

Collable – Runnable interface provides method run() that accepts no arguments and returns no value, nor can it throw any checked exceptions. So if you need to return a value or throw exception you can use Callable interface.

Future – whenever you submit a callable to a thread pool you are provided an instance of Future. An instance of Future represents the result of an asynchronous computation that requested via get() method. The get() method  is the equivalent of the join(), because it returns result only after a thread completes.

RunnableFuture – a future that extends Runnable interface. Once the run() method is completed you can get results.

ScheduledFuture – usually scheduled future is the result of scheduling task via ScheduledThreadPoolExecutor.

RunnableScheduledFuture – a scheduled future that extends Runnable interface. Once the run() method is completed you can get results.

Time unit

Some methods in java.util.concurrent framework can take time as an argument. The unit of time can be specified through TimeUnit enumeration. It doesn’t contain exact time but it is used to represent time.

Here is example of usage:

 Lock lock = ...;
 if (lock.tryLock(50L, TimeUnit.MILLISECONDS)) ...

Milliseconds can be replaced with one of these time units: days, hours, microseconds, milliseconds, minutes, nanoseconds, seconds.

Atomic variables

The java.util.concurrent.atomic package contains classes that support thread-safe atomic operations on single variables. The majority of the classes contain following methods:

  1. get() – Gets the current value.
  2. set() – Sets to the given value.
  3. incrementAndGet() – Atomically increments by one the current value. Reverse method: decrementAndGet().
  4. compareAndSet() – Atomically sets the value to the given updated value if the current value == the expected value.
  5. addAndGet() – Atomically adds the given value to the current value. Reverse method: getAndAdd().

Locks

Instead of synchronized block or methods java concurrent package provide several types of locks that can be used in your application. One of the main feature of locks is ability to back out of an attempt to acquire a lock.

lock_hier2

ReentrantLock – implements Lock interface that is very close to synchronized block with extended capabilities. For instance, besides lock() and unlock() methods, you can use tryLock() method that acquires the lock only if it is free at the time of invocation.

class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
}

Another interesting property of ReentrantLock is fairness. If fairness set to true the lock garanties sequence of acquiring of lock. For example, we have many threads that constantly acquire lock. Without fairness in such situation some threads may not to acquire lock at all.

ReentrantReadWriteLock – implements ReadWriteLock interface and allows to create read or write locks. It is useful when we have multiple read threads and single write thread. Read lock means that if any other thread holds a write lock then stop and wait until no other thread is writing. Write lock means that if any other thread is reading or writing, stop and wait until no other thread is reading or writing.

StampedLock – allows to make optimistic locking for read operations and stamped locks are not reentrant this means that the lock is acquired on per-invocation basis.

Concurrent collections and queues

In addition to the standard collections, Java provides concurrent collections framework.

BlockingQueue – it is a queue that will be useful when capacity is limited. For example, one thread namely ‘producer’ enqueued elements into the queue and another thread is a consumer that dequeues elements. In case if the second thread will be to slow we can get an OutOfMemoryError. To avoid this the BlockingQueue provides methods that will wait or throw exception if capacity is exceeded. Another advantage of the BQ is that the producer has possibility to wait until a new element become available. Here go main implementations:

  1. ArrayBlockingQueue – a blocking queue that is based on array.
  2. LinkedBlockingQueue – a blocking queue that is based on linked list.
  3. DelayQueue – allows to get elements from the queue by delay.
  4. PriorityBlockingQueue – a priority blocking queue.
  5. SynchronousQueue – A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. .

ConcurrentMap – a map that supports thread-safe operations. Here go main implementations:

  1. ConcurrentHashMap – a hash table with thread-safe operations. Unlike Hashtable the ConcurrentHashMap doesn’t block whole table. Because the table is splited into segments and only segment is blocked.
  2. ConcurrentSkipListMap – thread-safe version of TreeMap.

CopyOnWriteArrayList – it is an array list in which all methods that modify data copy the underlying array. As a result you avoid ConcurrentModificationException.

CopyOnWriteArraySet – it is a set in which all methods that modify data copy the underlying set. As a result you avoid ConcurrentModificationException.

Concurrent rundom

Use ThreadLocalRandom instead of Random class in concurrent applications. Despite of the fact that both classes are thread-safe, ThreadLocalRandom shows better performance in applications with high contention.

Synchronizers

Java offers five advanced classes that can be used for synchronizing threads:

  1. Semaphore is a classic concurrency tool that can be used to limit the number of threads working with the same critical section. The access to the critical section is controlled by count. If counter is greater than zero a thread can access critical section, otherwise it waits. Whenever a thread has accessed a critical section the counter is decremented. You can set up counter via constructor.
  2. CountDownLatch is a very simple yet very common utility for blocking until a given number of signals, events, or conditions hold. CountDownLatch is one-off, so – cannot reset the counter.
  3. A CyclicBarrier is a resettable multiway synchronization point useful in some styles of parallel programming. In short, the cyclic barrier puts on hold threads until the number of waiting threads is equal to counter that is set up via constructor.
  4. A Phaser provides a more flexible form of barrier that may be used to control phased computation among multiple threads. It allows to change number of awaiting threads and the phaser is reusable.
  5. An Exchanger allows two threads to exchange objects at a rendezvous point, and is useful in several pipeline designs. It works as follows. One thread calls method exchange() and waits until second thread calls the same method. Once the method is called by two threads they exchange data and go on with processing.

 

P.S.: That’s all, it was Java concurrency overview part 2. Please let me know if missed something or made a mistake.

Leave a Reply