Queue / BlockingQueue / TransferQueue review notes

Overview

A review note of the Collections Framework member interface Queue and its subinterfaces BlockingQueue and TransferQueue. In particular, I wrote sample code for the BlockingQueue implementation class LinkedBlockingQueue and the TransferQueue implementation classLinkedTransferQueue.

Implementation classes for the Interfaces BlockingQueue and TransferQueue make it easy to implement the Producer-Consumer pattern.

Quoted from JavaDoc in BlockingQueue

The BlockingQueue implementation is primarily designed for use in queues between producers and consumers, but it also supports the Collection interface.

Keep in mind that BlockingQueue can be safely used by multiple producers and multiple consumers.

environment

reference

review

Interface Queue \ <E >

Introductory version: 1.5

In addition to the basic Collection operations, queues provide additional insert, extract, and check operations. Each of these methods has two forms. One throws an exception when the operation fails and the other returns a special value (either null or false depending on the operation).

Queue operation method (throws an exception on failure)

operation Method signature Exception to throw on failure
Insert boolean add​(E e) IllegalStateException
Get / Delete E remove() NoSuchElementException
Get E element() NoSuchElementException

add Throws true if you add an element to the queue, IllegalStateException if there is no free space available remove Get and delete the beginning of the queue, throw NoSuchElementException if the queue is empty element Get the beginning of the queue, throw NoSuchElementException if the queue is empty

Queue operation method (returns a special value)

operation Method signature Value returned
Insert boolean offer​(E e) False if not queued
Get / Delete E poll() Null if queue is empty
Get E peek() Null if queue is empty

offer True if you add an element to the queue, false otherwise poll Get and delete the beginning of the queue, return null if the queue is empty peek Get the beginning of the queue, return null if the queue is empty

Interface BlockingQueue \ <E >

Introductory version: 1.5

A Queue that additionally supports operations that wait until the queue is empty when retrieving an element, or wait until the queue is empty when storing an element.

Blocking operation

operation Method signature
Insert void put​(E e) throws InterruptedException
Get / Delete E take() throws InterruptedException
Get -

put Adds an element to the queue, waits until the queue becomes empty, and throws an InterruptedException if an interrupt occurs during the wait. take Gets and deletes the beginning of the queue, waits until the element can be acquired, and throws an InterruptedException if an interrupt occurs during the wait.

Operation that times out

operation Method signature
Insert boolean offer​(E e, long timeout, TimeUnit unit) throws InterruptedException
Get / Delete E poll​(long timeout, TimeUnit unit) throws InterruptedException
Get -

offer Inserts a queue into an element and waits for the queue to become free until the specified wait time poll Get and delete the head of the queue, wait for the element to become available until the specified wait time

Check the free space in the queue

operation Method signature
Check free space int remainingCapacity()

remainingCapacity Returns the number of additional elements that the queue can accept without blocking. Returns Integer.MAX_VALUE if no built-in restrictions exist.

Class LinkedBlockingQueue \ <E >

Introductory version: 1.5

An optional restricted blocking queue based on the link node. This queue uses FIFO (first in, first out) to order the elements. The beginning of this queue is the element that has been in the queue for the longest time.

Class PriorityBlockingQueue \ <E >

Introductory version: 1.5

An unlimited blocking queue that uses the same ordering rules as the class PriorityQueue and provides blocking capture operations.

constructor

Pass a comparator to the constructor to order the queues.

public PriorityBlockingQueue​(int initialCapacity,
                             Comparator<? super E> comparator)
public PriorityBlockingQueue​(Collection<? extends E> c)

Interface TransferQueue \ <E >

Introductory version: 1.7

A Blocking Queue where the producer waits for the consumer to receive the element.

Blocking operation

operation Method signature
Insert void transfer​(E e) throws InterruptedException

transfer Transfer immediately if any consumer is waiting to receive the element, otherwise wait until the consumer receives it

Operation that does not time out

operation Method signature
Insert boolean tryTransfer​(E e)

tryTransfer Forward immediately if any consumer is waiting to receive the element, otherwise return false

Operation that times out

operation Method signature
Insert boolean tryTransfer​(E e, long timeout, TimeUnit unit) throws InterruptedException

tryTransfer Transfers immediately if there is a consumer waiting to receive the element, otherwise waits for the consumer to receive until the specified wait time, returns false if the specified wait time has elapsed

Check which consumers are waiting

operation Method signature
Confirmation of waiting consumers boolean hasWaitingConsumer()
Confirmation of waiting consumers int getWaitingConsumerCount()

hasWaitingConsumer Returns true if there is at least one consumer waiting to receive the element in BlockingQueue.take or poll getWaitingConsumerCount Returns an estimated number of consumers waiting to receive an element in BlockingQueue.take or poll

Class LinkedTransferQueue \ <E >

Introductory version: 1.7

An unlimited TransferQueue based on linked nodes. This queue orders the elements in FIFO (first in, first out) for any specified producer. The head of the queue is the element that has been in the queue for the longest time for a particular producer.

Sample code

LinkedBlockingQueue sample code

In this sample, 1 producer periodically adds elements to the Blocking Queue (integer type serial numbers in this sample), and 3 consumers periodically retrieve the elements from the Blocking Queue. Instead of running indefinitely, it will exit when the producer adds the element to the queue 100 times.

Also, although not directly related to the subject of this article, we use the ScheduledExecutorService to implement producers and consumers.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingDemo {

	private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
	private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
	private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);

	private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

	public static void main(String[] args) throws Exception {
		BlockingDemo demo = new BlockingDemo();
		demo.execute();
	}

	void execute() throws Exception {
		println("main start");

		CountDownLatch doneSignal = new CountDownLatch(100);

		//Starts after 2 seconds, then runs at 1 second intervals
		println("create producer task");
		producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);

		println("create consumer task");
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);

		doneSignal.await();

		shutdown(producer);
		shutdown(consumer);

		println("main end");
	}

	class ProducerTask implements Runnable {
		private final BlockingQueue<Integer> queue;
		private final CountDownLatch doneSignal;
		private final AtomicInteger counter = new AtomicInteger(0);
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
			this.queue = queue;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			try {
				Integer e = Integer.valueOf(counter.incrementAndGet());
				queue.put(e);
				System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class ConsumerTask implements Runnable {
		private final BlockingQueue<Integer> queue;
		private final String name;
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ConsumerTask(BlockingQueue<Integer> queue, String name) {
			this.queue = queue;
			this.name = name;
		}

		@Override
		public void run() {
			try {
				//Integer e = queue.take();
				Integer e = queue.poll(1, TimeUnit.SECONDS);
				if (e != null) {
					System.out.println(String.format("[%s]             [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	void shutdown(ScheduledExecutorService service) {
		println("shutdown start");

		// Disable new tasks from being submitted
		service.shutdown();
		try {
			// Wait a while for existing tasks to terminate
			if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
				// Cancel currently executing tasks
				service.shutdownNow();
				// Wait a while for tasks to respond to being cancelled
				if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
					println("Pool did not terminate");
				}
			}
		} catch (InterruptedException e) {
			System.err.println(e);
			// (Re-)Cancel if current thread also interrupted
			service.shutdownNow();
			// Preserve interrupt status
			Thread.currentThread().interrupt();
		}

		println("shutdown end");
	}

	void println(String message) {
		System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
	}

}

constructor

You can determine the capacity of the queue in the constructor. In this sample, the maximum number of elements that can be added to the queue is set to 10.

private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

If the capacity is not specified, it is the same as Integer.MAX_VALUE specified.

private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

Insert an element into the queue

It uses a put method that blocks indefinitely until the queue is free. If an interrupt occurs while blocking the put method, an InterruptedException will occur.

try {
	Integer e = Integer.valueOf(counter.incrementAndGet());
	queue.put(e);
	doneSignal.countDown();
} catch (InterruptedException e) {
	e.printStackTrace();
}

Remove elements from the queue

It uses the poll method, which waits for 1 second before dequeuing an element from the queue. I've commented it out, but using the take method will block it indefinitely until it can be retrieved.

try {
	//Integer e = queue.take();
	Integer e = queue.poll(1, TimeUnit.SECONDS);
	if (e != null) {
		// do something
	}
} catch (InterruptedException e) {
	e.printStackTrace();
}

Linked Transfer Queue sample code

In this sample, 1 producer periodically adds elements (integer type serial number in this sample) to TransferQueue, and 3 consumers periodically retrieve elements from TransferQueue. This sample also ends when the producer adds the element to the queue 100 times.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class TransferDemo {

	private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
	private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
	private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);

	private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

	public static void main(String[] args) throws Exception {
		TransferDemo demo = new TransferDemo();
		demo.execute();
	}

	void execute() throws Exception {
		println("main start");

		CountDownLatch doneSignal = new CountDownLatch(100);

		println("create producer task");
		producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);

		println("create consumer task");
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);

		doneSignal.await();

		shutdown(producer);
		shutdown(consumer);

		println("main stop");
	}

	class ProducerTask implements Runnable {
		private final TransferQueue<Integer> queue;
		private final CountDownLatch doneSignal;
		private final AtomicInteger counter = new AtomicInteger(0);
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
			this.queue = queue;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			try {
				Integer e = Integer.valueOf(counter.incrementAndGet());
				queue.transfer(e);
				System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class ConsumerTask implements Runnable {
		private final TransferQueue<Integer> queue;
		private final String name;
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ConsumerTask(TransferQueue<Integer> queue, String name) {
			this.queue = queue;
			this.name = name;
		}

		@Override
		public void run() {
			try {
				// Integer e = queue.take();
				Integer e = queue.poll(1, TimeUnit.SECONDS);
				if (e != null) {
					System.out.println(String.format("[%s]             [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	void shutdown(ScheduledExecutorService service) {
		// Disable new tasks from being submitted
		service.shutdown();
		try {
			// Wait a while for existing tasks to terminate
			if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
				// Cancel currently executing tasks
				service.shutdownNow();
				// Wait a while for tasks to respond to being cancelled
				if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
					System.err.println("Pool did not terminate");
				}
			}
		} catch (InterruptedException e) {
			System.err.println(e);
			// (Re-)Cancel if current thread also interrupted
			service.shutdownNow();
			// Preserve interrupt status
			Thread.currentThread().interrupt();
		}
		System.out.println("shutdown");
	}

	void println(String message) {
		System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
	}

}

constructor

There is no capacity limit.

private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();

Insert an element into the queue

It uses a transfer method that blocks indefinitely until the element is received by the consumer. If an interrupt occurs while blocking the transfer method, an InterruptedException will occur.

try {
	Integer e = Integer.valueOf(counter.incrementAndGet());
	queue.transfer(e);
	doneSignal.countDown();
} catch (InterruptedException e) {
	e.printStackTrace();
}

Remove elements from the queue

It uses the poll method, which waits for 1 second before dequeuing an element from the queue. I've commented it out, but using the take method will block it indefinitely until it can be retrieved.

try {
	// Integer e = queue.take();
	Integer e = queue.poll(1, TimeUnit.SECONDS);
	if (e != null) {
		// do something
	}
} catch (InterruptedException e) {
	e.printStackTrace();
}

Recommended Posts

Queue / BlockingQueue / TransferQueue review notes
Enum review notes
Functional interface review notes
Java NIO 2 review notes
Java Collections Framework Review Notes