Implement Bounded Blocking Queue
Write a multithreaded bounded Blocking Queue where the capacity of the queue is limited. Implementsize
, add
, remove
, and peek
methods.There could be multiple producer and consumer threads.
Producers fill up the queue. If the queue is full, producers should wait;
On the other hand, consumers take elements from the queue. If the queue is empty, consumers should wait.
Solution
The make the actions of adding or removing an element from the underlying queue, we need to either use lock orsynchronized
the relative blocks that conduct the actions.Here is an implementation with
synchronized
.One shortcoming of using synchronization is that it only allow one thread access the queue at the same time, either consumer or producer.)
Plus, we need to use
notifyAll
instead of notify
since there could be multiple waiting producers and consumers and notify
can wake up any thread which could be a producer or a consumer. This stackoverflow post gives a detailed example to explain notify vs. notifyAll.public class BoundedBlockingQueue<E> {
private final Queue<E> queue = new LinkedList<E>();
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
public BoundedBlockingQueue(int capacity) {
if (capacity <= 0) throw new InvalidArgumentException("The capacity of the queue must be > 0.");
this.capacity = capacity;
}
public int size() {
return count.get();
}
public synchronized void add(E e) throws RuntimeException {
if (e == null) throw new NullPointerException("Null element is not allowed.");
int oldCount = -1;
while (count.get() == capacity) wait();
queue.add(e);
oldCount = count.getAndIncrement();
if (oldCount == 0) {
notifyAll(); // notify other waiting threads (could be producers or consumers)
}
}
public synchronized E remove() throws NoSuchElementException {
E e;
int oldCount = -1;
while (count.get() == 0) wait();
e = queue.remove();
oldCount = count.getAndDecrement();
if (oldCount == this.capacity) {
notifyAll(); // notify other waiting threads (could be producers or consumers)
}
return e;
}
/* Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. */
public E peek() {
if (count.get() == 0) return null;
synchronized(this) {
return queue.peek();
}
}
}
Notice that if the queue was empty before add
or full before remove
, we need to notify other waiting threads to unblock them.We only need to emit such notifications in the above two cases since otherwise there cannot be any waiting threads.
Here is an implementation with locks.
We use two Reentrant Locks to replace the use of synchronized methods. With separate locks for put and take, a consumer and a producer can access the queue at the same time (if it is neither empty nor full). A reentrant lock provides the same basic behaviors as a Lock does by using synchronized methods and statements. Beyond that, it is owned by the thread last successfully locking and thus when the same thread invokes
lock()
again, it will return immediately without lock it again.Together with lock, we use Condition to replace the object monitor (wait and notifyAll). A Condition instance is intrinsically bound to a lock. Thus, we can use it to signal threads that are waiting for the associated lock. Even better, multiple condition instances can be associated with one single lock and each instance will have its own wait-thread-set, which means instead of waking up all threads waiting for a lock, we can wake up a predefined subset of such threads. Similar to
wait()
, Condition.await()
can atomically release the associated lock and suspend the current thread.We use Atomic Integer for the count of elements in the queue to ensure that the count will be updated atomically.
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBlockingQueue<E> {
private final Queue<E> queue = new LinkedList<E>();
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Condition notEmpty = takeLock.newCondition();
public BoundedBlockingQueue(int capacity) {
if (capacity <= 0) throw new InvalidArgumentException("The capacity of the queue must be > 0.");
this.capacity = capacity;
}
public int size() {
return count.get();
}
public void add(E e) throws RuntimeException {
if (e == null) throw new NullPointerException("Null element is not allowed.");
int oldCount = -1;
putLock.lock();
try {
// we use count as a wait condition although count isn't protected by a lock
// since at this point all other put threads are blocked, count can only
// decrease (via some take thread).
while (count.get() == capacity) notFull.await();
queue.add(e);
oldCount = count.getAndIncrement();
if (oldCount + 1 < capacity) {
notFull.signal(); // notify other producers for count change
}
} finally {
putLock.unlock();
}
// notify other waiting consumers
if (oldCount == 0) {
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
}
public E remove() throws NoSuchElementException {
E e;
int oldCount = -1;
takeLock.lock();
try {
while (count.get() == 0) notEmpty.await();
e = queue.remove();
oldCount = count.getAndDecrement();
if (oldCount > 1) {
notEmpty.signal(); // notify other consumers for count change
}
} finally {
takeLock.unlock();
}
// notify other waiting producers
if (oldCount == capacity) {
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
return e;
}
/* Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. */
public E peek() {
if (count.get() == 0) return null;
takeLock.lock();
try {
return queue.peek();
} finally {
takeLock.unlock();
}
}
}
Thanks for the nice post. In your first implementation, since you use Synchronized, i.e. one thread can call the method at the same time, can we just use queue.size() to check size instead of using AtomInteger?
ReplyDeletehow to run this code ?
ReplyDeleteThis comment has been removed by the author.
ReplyDelete