LinkedBlockingQueue Example

0

LinkedBlockingQueue is a linked node list that contains an initial capacity, header node and a tail node. The capacity serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to maximum value of integer. As we add node to the linked list, they get added at the tail end. One can keep adding till the queue reaches its capacity. Once the capacity is reached, any further insert may result in either waiting or in an unsuccessful return, based on the API used.

When an item is to be retrieved, the immediate next node from the head of this queue is removed and the node’s value is returned. The retrieve operation might result in waiting, if there are no items in the queue.

Linked Node List

Linked list has a header and a tail node. Head always points to the first node in the linked list. Tail always points to a node in the linked list.

Linked List

Linked List

Head is a dummy item thus always points to null item. The last node initially point to a null item as we haven’t added any node yet.

Last and Head nodes

Last and Head nodes

As we add new node, the header node’s next link points to the newly added node. The actual value is encapsulated in a node and added to the list. Te last node in queue will have no successor so next will be null.

Node structure

Node structure

Two lock queue algorithm

Two-lock queue algorithm is used to allow enqueue and dequeue proceed concurrently. The algorithm uses a separate head lock and tail lock to separate synchronization of enqueueing and dequeueing threads.

As we keep dequeuing, the queue finally becomes empty and thereafter any further ‘retrieve’ operation will result in waiting till a node is added to the queue. The ‘Taker’ thread in waiting will be un-blocked only by a ‘Put’ operation. When a put notices that it has enabled at least one take,  it signals the taker thread.

No items in queue

No items in queue

As queue reaches its capacity, any further ‘Put’ operation will result in waiting. A ‘Taker’ operation in turn will signal the ‘Put’ thread waiting for an item.

Items reached capacity

Items reached capacity

In the below example, we start a ‘Taker’ thread first which is put on wait as there are no elements yet. We then start a ‘Put’ thread which puts in an element which in turn wakes up the ‘Taker’ thread and the element put is retrieved.

LinkedBlockQueueExample:

package com.javarticles.concurrency;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Concurrent offer/take example
 */
public class LinkedBlockQueueExample {
    private static AtomicLong l = new AtomicLong(1);

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();
        TakeRunnable takeRunnable = new TakeRunnable(queue);
        Thread takeThread = new Thread(takeRunnable, "TakeThread");
        takeThread.start();

        Thread.sleep(5000);

        OfferRunnable offerRunnable = new OfferRunnable(queue);
        Thread putThread = new Thread(offerRunnable, "PutThread");
        putThread.start();
    }

    private static class OfferRunnable implements Runnable {
        LinkedBlockingQueue<Long> queue;

        OfferRunnable(LinkedBlockingQueue<Long> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            print("Offer item to queue");
            queue.offer(l.getAndIncrement());
        }
    }

    private static class TakeRunnable implements Runnable {
        LinkedBlockingQueue<Long> queue;

        TakeRunnable(LinkedBlockingQueue<Long> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                print("Waiting item from queue");
                Object o = queue.take();
                print("Item is " + o);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void print(String p) {
        System.out.println(Thread.currentThread().getName() + ": " + p);
    }
}

Output:

TakeThread: Waiting item from queue
PutThread: Offer item to queue
TakeThread: Item is 1

LinkedBlockingQueue follows FIFO (first-in-first-out). In the next couple of sections we see enqueue and dequeue algorithm, how the node structure changes as enqueue and dequeue.

Enqueue

Nodes are only inserted after the last node in the linked list.  First the green node is inserted and then then the yellow one.

Adding new items to queue

Adding new items to queue

Dequeue

Nodes are only deleted from the beginning of the linked list. that is, from the head of queue.

  1. The item that header node points to is null. When we dequeue, the blue node from the head of the queue is removed. The item (Item1) that node ‘p’ points to returned.
  2. Item1 is the value that node p holds.
  3. The node dequeued becomes the new header node. The old header node’s next points to itself to help GC cleanup the de-linked header.
  4. The node ‘p’ dequeued becomes the new header.
Drain To Algorithm

Dequeue

In the below example, we start multiple ‘Put’ threads and ‘Taker’ Threads. The capacity of the queue is two and since the ‘Taker’ threads are lesser than the ‘Put’ threads, few of the ‘Put’ threads end up in waiting state as the queue count reaches its capacity.

LinkedBlockQueueMultipleOffersExample:

package com.javarticles.concurrency;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Concurrent put/take operations.
 */
public class LinkedBlockQueueMultipleOffersExample {
    private static AtomicLong l = new AtomicLong(1);

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<Long>(2);
        int putThreadCount = 5;
        int takeThreadCount = 2;
        Thread[] putThreads = new Thread[putThreadCount];
        for (int i = 0; i < putThreadCount; i++) {
            PutRunnable putRunnable = new PutRunnable(queue);
            putThreads[i]= new Thread(putRunnable, "PutThread" + i);
            putThreads[i].start();
        }
        Thread.sleep(1000);
        for (int i = 0; i < takeThreadCount; i++) {
            TakeRunnable takeRunnable = new TakeRunnable(queue);
            Thread takeThread = new Thread(takeRunnable, "TakeThread" + i);
            takeThread.start();
        }
        Thread.sleep(1000);
        for (int i = 0; i < putThreadCount; i++) {
            putThreads[i].interrupt();
        }
    }

    private static class PutRunnable implements Runnable {
        LinkedBlockingQueue<Long> queue;
        PutRunnable(LinkedBlockingQueue<Long> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            long item = l.getAndIncrement();
            print("Put item " + item + " into queue");
            try {
                queue.put(item);
                print("done");
            } catch (InterruptedException e) {
                print("Interrupted");
            }
        }
    }

    private static class TakeRunnable implements Runnable {
        LinkedBlockingQueue<Long> queue;
        TakeRunnable(LinkedBlockingQueue<Long> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            try {
                print("take item from the queue");
                Object o = queue.take();
                print("Item de-queued is " + o);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void print(String p) {
        System.out.println(Thread.currentThread().getName() + ": " + p);
    }
}

Output:

PutThread0: Put item 1 into queue
PutThread4: Put item 5 into queue
PutThread3: Put item 4 into queue
PutThread2: Put item 3 into queue
PutThread1: Put item 2 into queue
PutThread2: done
PutThread1: done
TakeThread0: take item from the queue
TakeThread1: take item from the queue
TakeThread0: Item de-queued is 3
PutThread3: done
TakeThread1: Item de-queued is 2
PutThread4: done
PutThread0: Interrupted

Take Operation

As we keep dequeuing, the queue finally becomes empty and thereafter any further ‘retrieve’ operation will result in waiting till a node is added to the queue. The ‘Taker’ thread in waiting will be un-blocked only by a ‘Put’ operation. The ‘Taker’ thread wakes up, dequeues, decrements the count and verifies if more items have been entered since the signal in which case it will signals other ‘Taker’ threads in wait. After signalling other takers, the ‘take’ lock is released and any ‘Put’ threads in waiting are signaled to indicate that the space is now available for them to insert.

Take algorithm

Take algorithm

Put Operation

Put operation inserts the specified element at the tail of this queue. First the put lock is acquired. If the queue has reached the capacity, the ‘Put’ thread will be put on an indefinite wait till space becomes available. If one wants to wait for a specified amount of time then the overloaded offer() API with the time parameter should be used instead of put().

Put algorithm

Put algorithm

Offer Operation

Inserts the specified element at the tail of this queue. If the queue is already full, it will return false instead of waiting.

Offer algorthm

Offer algorithm

Poll Operation

It is similar to take() operation. It retrieves and removes the head of this queue. Only difference is that the API will return null if the queue is empty.

Poll algorithm

Poll algorithm

Drain Operation

We can use drainTo() to remove elements in batch. A collection is passed in for the queue elements to be transferred into and the number of elements to be transferred.

Drain To Algorithm

Drain To Algorithm

In the below example, we start a thread that tries to retrieve the elements in batches. Once retrieved it delegates it to a handler to process them. We first call queue.take() and then queue.drainTo() as take() forces the thread to wait till an element is available.

LinkedBlockQueueConsumer:

package com.javarticles.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Producer/Consumer example
 */
public class LinkedBlockQueueConsumer {
    private final LinkedBlockingQueue<String> queue;
    private final ExecutorService qExecutor;

    private LinkedBlockQueueConsumer() {
        queue = new LinkedBlockingQueue<String>();
        qExecutor = Executors.newSingleThreadExecutor();
        qExecutor.submit(new ExecutorRunner(queue, this));
    }


    private static class ExecutorRunner implements Runnable {
        static final int BATCH_SIZE = 4;
        ExecutorRunner(final LinkedBlockingQueue<String> queue,
                       final LinkedBlockQueueConsumer consumer) {
            this.queue = queue;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            for (; ; ) {
                final List<String> commands = new ArrayList<String>();
                try {
                    commands.add(queue.take());
                    queue.drainTo(commands, BATCH_SIZE -1);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted while waiting for command..." + e);
                    queue.clear();
                    break;
                }

                try {
                    consumer.consumeQueue(commands);
                } catch (final Throwable t) {
                    System.out.println("Exception thrown by session consumeQueue()." + t);
                }
            }
        }

        private final LinkedBlockingQueue<String> queue;
        private final LinkedBlockQueueConsumer consumer;
    }

    private void consumeQueue(List<String> commands) {
        System.out.println("consumeQueue: " + commands);
    }

    private void post(final String command) {
        System.out.println("Posting : " + command);
        queue.add(command);
    }

    public static void main(String[] args) {
        LinkedBlockQueueConsumer consumer = new LinkedBlockQueueConsumer();
        for (int i = 0; i < 10; i++) {
            consumer.post("C" + i);
        }
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumer.qExecutor.shutdown();
        try {
            if (!consumer.qExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
                consumer.qExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output:

main: Posting : C0
main: Posting : C1
main: Posting : C2
main: Posting : C3
main: Posting : C4
main: Posting : C5
main: Posting : C6
main: Posting : C7
main: Posting : C8
pool-1-thread-1: consumeQueue: [C0, C1]
main: Posting : C9
pool-1-thread-1: consumeQueue: [C2, C3, C4, C5]
pool-1-thread-1: consumeQueue: [C6, C7, C8, C9]
pool-1-thread-1: Interrupted while waiting for command...java.lang.InterruptedException

Download the source code

This was an example of LinkedBlockingQueue.

You can download the source code here: javalinkedblockqueueexample.zip

About Author

Ram's expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring. You can reach him at rsatish.m@gmail.com

Comments are closed.