BlockingQueue






BlockingQueue

Java 5 added a number of classes to the Collections Framework for use in concurrent applications. Most of these are implementations of the Queue subinterface BlockingQueue (see Figure), designed primarily to be used in producer-consumer queues.

BlockingQueue


One common example of the use of producer-consumer queues is in systems that perform print spooling; client processes add print jobs to the spool queue, to be processed by one or more print service processes, each of which repeatedly "consumes" the task at the head of the queue.

The key facilities that BlockingQueue provides to such systems are, as its name implies, enqueuing and dequeueing methods that do not return until they have executed successfully. So, for example, a print server does not need to constantly poll the queue to discover whether any print jobs are waiting; it need only call the poll method, supplying a timeout, and the system will suspend it until either a queue element becomes available or the timeout expires. BlockingQueue defines seven new methods, in three groups:

Adding an Element

boolean offer(E e, long timeout, TimeUnit unit)
                // insert e, waiting up to the timeout
void put(E e)   // add e, waiting as long as necessary

The nonblocking overload of offer defined in Queue will return false if it cannot immediately insert the element. This new overload waits for a time specified using java.util.concurrent.TimeUnit, an Enum which allows timeouts to be defined in units such as milliseconds or seconds.

Taking these methods together with those inherited from Queue, there are four ways in which the methods for adding elements to a BlockingQueue can behave: offer returns false if it does not succeed immediately, blocking offer returns false if it does not succeed within its timeout, add throws an exception if it does not succeed immediately, and put blocks until it succeeds.

Removing an Element

E poll(long timeout, TimeUnit unit)
                // retrieve and remove the head, waiting up to the timeout
E take()        // retrieve and remove the head of this queue, waiting
                // as long as necessary

Again taking these methods together with those inherited from Queue, there are four ways in which the methods for removing elements from a BlockingQueue can behave: poll returns null if it does not succeed immediately, blocking poll returns null if it does not succeed within its timeout, remove throws an exception if it does not succeed immediately, and take blocks until it succeeds.

Retrieving or Querying the Contents of the Queue

int drainTo(Collection<? super E> c)
                // clear the queue into c
int drainTo(Collection<? super E> c, int maxElements)
                // clear at most the specified number of elements into c
int remainingCapacity()
                // return the number of elements that would be accepted
                // without blocking, or Integer.MAX_VALUE if unbounded

The drainTo methods perform atomically and efficiently, so the second overload is useful in situations in which you know that you have processing capability available immediately for a certain number of elements, and the first is usefulfor examplewhen all producer threads have stopped working. Their return value is the number of elements transferred. RemainingCapacity reports the spare capacity of the queue, although as with any such value in multi-threaded contexts, the result of a call should not be used as part of a test-then-act sequence; between the test (the call of remainingCapacity) and the action (adding an element to the queue) of one thread, another thread might have intervened to add or remove elements.

BlockingQueue guarantees that the queue operations of its implementations will be thread-safe and atomic. But this guarantee doesn't extend to the bulk operations inherited from CollectionaddAll, containsAll, retainAll and removeAllunless the individual implementation provides it. So it is possible, for example, for addAll to fail, throwing an exception, after adding only some of the elements in a collection.

Using the Methods of BlockingQueue

A to-do manager that works for just one person at a time is very limited; we really need a cooperative solutionone that will allow us to share both the production and the processing of tasks. Figure shows StoppableTaskQueue, a simple version of a concurrent task manager based on PriorityBlockingQueue, that will allow its usersusto independently add tasks to the task queue as we discover the need for them, and to take them off for processing as we find the time. The class StoppableTaskQueue has three methods: addTask, getTask, and shutDown. A StoppableTaskQueue is either working or stopped. The method addTask returns a boolean value indicating whether it successfully added a task; this value will be true unless the StoppableTaskQueue is stopped. The method getTask returns the head task from the queue. If no task is available, it does not block but returns null. The method shutDown stops the StoppableTaskQueue, waits until all pending addTask operations are completed, then drains the StoppableTaskQueue and returns its contents.

A concurrent queue-based task manager

public class StoppableTaskQueue {
  private final int MAXIMUM_PENDING_OFFERS = Integer.MAX_VALUE;
  private final BlockingQueue<PriorityTask> taskQueue =
          new PriorityBlockingQueue<PriorityTask>();
  private boolean isStopped = false;
  private Semaphore semaphore = new Semaphore(MAXIMUM_PENDING_OFFERS);

  // return true if the task was successfully placed on the queue, false
  // if the queue has been shut down.
  public boolean addTask(PriorityTask task) {
    synchronized (this) {
      if (isStopped) return false;
      if (! semaphore.tryAcquire()) throw new Error("too many threads");
    }
    try {
      return taskQueue.offer(task);
    } finally {
      semaphore.release();
    }
  }

  // return the head task from the queue, or null if no task is available
  public PriorityTask getTask() {
    return taskQueue.poll();
  }

  // stop the queue, wait for producers to finish, then return the contents
  public Collection<PriorityTask> shutDown() {
    synchronized(this) { isStopped = true; }
    semaphore.acquireUninterruptibly(MAXIMUM_PENDING_OFFERS);
    Set<PriorityTask> returnCollection = new HashSet<PriorityTask>();
    taskQueue.drainTo(returnCollection);
    return returnCollection;
  }
}

In this example, as in most uses of the java.util.concurrent collections, the collection itself takes care of the problems arising from the interaction of different threads in adding or removing items from the queue. Most of the code of Figure is instead solving the problem of providing an orderly shutdown mechanism. The reason for this emphasis is that when we go on to use the class StoppableTaskQueue as a component in a larger system, we will need to be able to stop daily task queues without losing task information. Achieving graceful shutdown can often be a problem in concurrent systems: for more detail, see Chapter 7 of Java Concurrency in Practice by Brian Goetz et. al. (Addison-Wesley).

The larger system will model each day's scheduled tasks over the next year, allowing consumers to process tasks from each day's queue. An implicit assumption of the example of this section is that if there are no remaining tasks scheduled for this day, a consumer will not wait for one to become available, but will immediately go on to look for a task in the next day's queue. (In the real world, we would go home at this point, or more likely go out to celebrate.) This assumption simplifies the example, as we don't need to invoke any of the blocking methods of PriorityBlockingQueue, though we will use one method, drainTo, from the BlockingQueue interface.

There are a number of ways of shutting down a producer-consumer queue such as this; in the one we've chosen for this example, the manager exposes a shutdown method that can be called by a "supervisor" thread in order to stop producers writing to the queue, to drain it, and to return the result. The shutdown method sets a boolean stopped, which task-producing threads will read before trying to put a task on to the queue. Task-consuming threads simply poll the queue, returning null if no tasks are available. The problem with this simple idea is that a producer thread might read the stopped flag, find it false, but then be suspended for some time before it places its value on the queue. We have to prevent this by ensuring that the shutdown method, having stopped the queue, will wait until all the pending values have been inserted before draining it.

Figure achieves this using a semaphorea thread-safe object that maintains a fixed number of permits. Semaphores are usually used to regulate access to a finite set of resourcesa pool of database connections, for example. The permits the semaphore has available at any time represent the resources not currently in use. A thread requiring a resource acquires a permit from the semaphore, and releases it when it releases the resource. If all the resources are in use, the semaphore will have no permits available; at that point, a thread attempting to acquire a permit will block until some other thread returns one.

The semaphore in this example is used differently. We don't want to restrict producer threads from writing to the queueit's an unbounded concurrent queue, after all, quite capable of handling concurrent access without help from us. We just want to keep a count of the writes currently in progress. So we create the semaphore with the largest possible number of permits, which in practice will never all be required. The producer method addTask checks to see if the queue has been stoppedin which case its contract says it should return nulland, if not, it acquires a permit using the semaphore method tryAcquire, which does not block (unlike the more commonly used blocking method acquire, tryAcquire returns null immediately if no permits are available). This test-then-act sequence is made atomic to ensure that at any point visible to another thread the program maintains its invariant: the number of unwritten values is no greater than the number of permits available.

The shutdown method sets the stopped flag in a synchronized block (remember that the only way of ensuring that variable writes made by one thread are visible to reads by another is for both writes and reads to take place within blocks synchronized on the same lock). Now the addTask method cannot acquire any more permits, and shutdown just has to wait until all the permits previously acquired have been returned. To do that, it calls acquire, specifying that it needs all the permits; that call will block until they are all released by the producer threads. At that point, the invariant guarantees that there are no tasks still to be written to the queue, and shutdown can be completed.

Implementing BlockingQueue

The Collections Framework provides five implementations of BlockingQueue.

LinkedBlockingQueue

This class is a thread-safe, FIFO-ordered queue, based on a linked node structure. It is the implementation of choice whenever you need an unbounded blocking queue. Even for bounded use, it may still be better than ArrayBlockingQueue (linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications).

The two standard collection constructors create a thread-safe blocking queue with a capacity of Integer.MAX_VALUE. You can specify a lower capacity using a third constructor:

LinkedBlockingQueue(int capacity)

The ordering imposed by LinkedBlockingQueue is FIFO. Queue insertion and removal are executed in constant time; operations such as contains which require traversal of the array require linear time. The iterators are weakly consistent.

ArrayBlockingQueue

This implementation is based on a circular arraya linear structure in which the first and last elements are logically adjacent. Figure(a) shows the idea. The position labeled "head" indicates the head of the queue; each time the head element is removed from the queue, the head index is advanced. Similarly, each new element is added at the tail position, resulting in that index being advanced. When either index needs to be advanced past the last element of the array, it gets the value 0. If the two indices have the same value, the queue is either full or empty, so an implementation must separately keep track of the count of elements in the queue.

A circular array in which the head and tail can be continuously advanced in this way this is better as a queue implementation than a noncircular one (e.g., the standard implementation of ArrayList, which we cover in Section 15.2) in which removing the head element requires changing the position of all the remaining elements so that the new head is at position 0. Notice, though, that only the elements at the ends of the queue can be inserted and removed in constant time. If an element is to be removed from near the middle, which can be done for queues via the method Iterator.remove, then all the elements from one end must be moved along to maintain a compact representation. Figure(b) shows the element at index 6 being removed from the queue. As a result, insertion and removal of elements in the middle of the queue has time complexity O(n).

A circular array


Constructors for array-backed collection classes generally have a single configuration parameter, the initial length of the array. For fixed-size classes like ArrayBlockingQueue, this parameter is necessary in order to define the capacity of the collection. (For variable-size classes like ArrayList, a default initial capacity can be used, so constructors are provided that don't require the capacity.) For ArrayBlockingQueue, the three constructors are:

ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

The Collection parameter to the last of these allows an ArrayBlockingQueue to be initialized with the contents of the specified collection, added in the traversal order of the collection's iterator. For this constructor, the specified capacity must be at least as great as that of the supplied collection, or be at least 1 if the supplied collection is empty. Besides configuring the backing array, the last two constructors also require a boolean argument to control how the queue will handle multiple blocked requests. These will occur when multiple threads attempt to remove items from an empty queue or enqueue items on to a full one. When the queue becomes able to service one of these requests, which one should it choose? The alternatives are to require a guarantee that the queue will choose the one that has been waiting longestthat is, to implement a fair scheduling policyor to allow the implementation to choose one. Fair scheduling sounds like the better alternative, since it avoids the possibility that an unlucky thread might be delayed indefinitely but, in practice, the benefits it provides are rarely important enough to justify incurring the large overhead that it imposes on a queue's operation. If fair scheduling is not specified, ArrayBlockingQueue will normally approximate fair operation, but with no guarantees.

The ordering imposed by ArrayBlockingQueue is FIFO. Queue insertion and removal are executed in constant time; operations such as contains which require traversal of the array require linear time. The iterators are weakly consistent.

PriorityBlockingQueue

This implementation is a thread-safe, blocking version of PriorityQueue (see Section 14.2), with similar ordering and performance characteristics. Its iterators are fail-fast, so in normal use they will throw ConcurrentModificationException; only if the queue is quiescent will they succeed. To iterate safely over a PriorityBlockingQueue, transfer the elements to an array and iterate over that instead.

DelayQueue

This is a specialized priority queue, in which the ordering is based on the delay time for each elementthe time remaining before the element will be ready to be taken from the queue. If all elements have a positive delay timethat is, none of their associated delay times has expiredan attempt to poll the queue will return null (although peek will still allow you to see the first unexpired element). If one or more elements has an expired delay time, the one with the longest-expired delay time will be at the head of the queue. The elements of a DelayQueue belong to a class that implements java.util.concurrent.Delayed:

interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

The geTDelay method of a Delayed object returns the remaining delay associated with that object. The compareTo method (see Section 3.1) of Comparable must be defined to give results that are consistent with the delays of the objects being compared. This means that it will rarely be compatible with equals, so Delayed objects are not suitable for use with implementations of SortedSet and SortedMap.

For example, in our to-do manager we are likely to need reminder tasks, to ensure that we follow up e-mail and phone messages that have gone unanswered. We could define a new class DelayedTask as in Figure, and use it to implement a reminder queue.

The class DelayedTask

public class DelayedTask implements Delayed {
  public final static long MILLISECONDS_IN_DAY = 60 * 60 * 24 * 1000;
  private long endTime;     // in milliseconds after January 1, 1970
  private Task task;
  DelayedTask(Task t, int daysDelay) {
    endTime = System.currentTimeMillis() + daysDelay * MILLISECONDS_IN_DAY;
    task = t;
  }
  public long getDelay(TimeUnit unit) {
    long remainingTime = endTime - System.currentTimeMillis();
    return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
  }
  public int compareTo(Delayed t) {
    long thisDelay = getDelay(TimeUnit.MILLISECONDS);
    long otherDelay = t.getDelay(TimeUnit.MILLISECONDS);
    return (thisDelay < otherDelay) ? -1 : (thisDelay > otherDelay) ? 1 : 0;
  }
  Task getTask() { return task; }
}

BlockingQueue<DelayedTask> reminderQueue = new DelayQueue<DelayedTask>();
reminderQueue.offer(new DelayedTask (databaseCode, 1));
reminderQueue.offer(new DelayedTask (interfaceCode, 2));
...
// now get the first reminder task that is ready to be processed
DelayedTask t1 = reminderQueue.poll();
if (t1 == null) {
  // no reminders ready yet
} else {
  // process t1
}

Most queue operations respect delay values and will treat a queue with no unexpired elements as if it were empty. The exceptions are peek and remove, which, perhaps surprisingly, will allow you to examine the head element of a DelayQueue whether or not it is expired. Like them and unlike the other methods of Queue, collection operations on a DelayQueue do not respect delay values. For example, here are two ways of copying the elements of reminderQueue into a set:

Set<DelayedTask> delayedTaskSet1 = new HashSet<DelayedTask>();
delayedTaskSet1.addAll(reminderQueue);
Set<DelayedTask> delayedTaskSet2 = new HashSet<DelayedTask>();
reminderQueue.drainTo(delayedTaskSet2);

The set delayedTaskSet1 will contain all the reminders in the queue, whereas the set delayedTaskSet2 will contain only those ready to be used.

DelayQueue shares the performance characteristics of the PriorityQueue on which it is based and, like it, has fail-fast iterators. The comments on PriorityBlockingQueue iterators apply to these too.

SynchronousQueue

At first sight, you might think there is little point to a queue with no internal capacity, which is a short description of SynchronousQueue. But, in fact, it can be very useful; a thread that wants to add an element to a SynchronousQueue must wait until another thread is ready to simultaneously take it off, and the same is truein reversefor a thread that wants to take an element off the queue. So SynchronousQueue has the function that its name suggests, that of a rendezvousa mechanism for synchronizing two threads. (Don't confuse the concept of synchronizing threads in this wayallowing them to cooperate by exchanging datawith Java's keyword synchronized, which prevents simultaneous execution of code by different threads.) There are two constructors for SynchronousQueue:

SynchronousQueue()
SynchronousQueue(boolean fair)

A common application for SynchronousQueue is in work-sharing systems where the design ensures that there are enough consumer threads to ensure that producer threads can hand tasks over without having to wait. In this situation, it allows safe transfer of task data between threads without incurring the BlockingQueue overhead of enqueuing, then dequeuing, each task being transferred.

As far as the Collection methods are concerned, a SynchronousQueue behaves like an empty Collection; Queue and BlockingQueue methods behave as you would expect for a queue with zero capacity, which is therefore always empty. The iterator method returns an empty iterator, in which hasNext always returns false.



 Python   SQL   Java   php   Perl 
 game development   web development   internet   *nix   graphics   hardware 
 telecommunications   C++ 
 Flash   Active Directory   Windows