Get started with an ArrayBlockingQueue

An ArrayBlockingQueue is a FIFO blocking queue with an array used by the underlying implementation. At construction, you specify the maximum space used by the queue. Attempting to put more elements in the queue than is allowed by its capacity constraint will result in a blocking wait. Similarly, attempting to take an element from an empty queue will block.

The following example starts up two threads, a consumer and a producer, that communicate with each other with an ArrayBlockingQueue and generic Message objects. The capacity is 5 elements. The producer tries to put 10 elements on the queue, but blocks until space in the queue is freed up by the consumer.

Main.java:

import java.util.concurrent.*;
 
public class Main
{
   public static void main(String []args) {
      BlockingQueue blockingQueue = new ArrayBlockingQueue<Message>(5);
       
      Producer p = new Producer(blockingQueue);
      new Thread(p).start();
 
      // wait a bit before consuming to allow the queue to fill up
      // and force a blocking wait
      try { Thread.sleep(1000); } catch(InterruptedException e) { }
      Consumer c = new Consumer(blockingQueue);
      new Thread(c).start();
   }
}
 
class Producer implements Runnable
{
   private BlockingQueue<Message> blockingQueue;
 
   public Producer(BlockingQueue<Message> blockingQueue) {
      this.blockingQueue = blockingQueue;
   }
 
   public void run() {
      for (int i=0; i<10; i++) {
         Message<String> m = new Message<String>("message contents #" + i);
         System.out.println("Producing '" + m + "'");
         try {
            blockingQueue.put(m);
         }
         catch(InterruptedException e) {
            return;
         }  
      }
   }
}
 
class Consumer implements Runnable
{
   private BlockingQueue<Message> blockingQueue;
   
   public Consumer(BlockingQueue<Message> blockingQueue) {
      this.blockingQueue = blockingQueue;
   }
    
   public void run() {
    
      while (true) {
         try {
            Message m = blockingQueue.take();
            System.out.println("tConsuming '" + m + "'");
         }
         catch(InterruptedException e) { } 
      }
   }
}
 
class Message<T> {
   private T contents;
    
   public Message(T contents) {
      this.contents = contents;
   }
    
   public T getContents() {
      return contents;
   }
    
   public String toString() {
      return ""+contents;
   }
}

outputs:

Producing 'message contents #0'
Producing 'message contents #1'
Producing 'message contents #2'
Producing 'message contents #3'
Producing 'message contents #4'
Producing 'message contents #5'
Producing 'message contents #6'
        Consuming 'message contents #0'
Producing 'message contents #7'
        Consuming 'message contents #1'
Producing 'message contents #8'
        Consuming 'message contents #2'
Producing 'message contents #9'
        Consuming 'message contents #3'
        Consuming 'message contents #4'
        Consuming 'message contents #5'
        Consuming 'message contents #6'
        Consuming 'message contents #7'
        Consuming 'message contents #8'
        Consuming 'message contents #9'

Notice that the following warnings are generated when compiling the above code:

Note: Main.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

The reason for this is that the ArrayBlockingQueue is cast to a BlockingQueue without a specified type parameter (Message). This means that type safety may be broken. In fact, we could simply insert the following line of code to produce a ClassCastException at runtime:

      BlockingQueue blockingQueue = new ArrayBlockingQueue<Message>(5);
 
      try { blockingQueue.put("hello"); } catch(InterruptedException e) { }

Solve this by specifying the type parameter:

      BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(5);

An example of the other methods of the ArrayBlockingQueue can be found here.