Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Closes #198: Implement the BlockingQueue as a wrapper around the Queu…
Browse files Browse the repository at this point in the history
…eFile

Switched to an ObjectQueue<T> as the backing store and delegate closable.
  • Loading branch information
pschichtel committed Oct 25, 2018
1 parent db76821 commit c799abe
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 144 deletions.
Original file line number Diff line number Diff line change
@@ -1,41 +1,64 @@
package com.squareup.tape2;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* This is a simple and unbounded @{@link BlockingQueue} implementation as a wrapper around a @{@link QueueFile}.
* Thread safety is implemented using a single lock around all operations to the backing @{@link QueueFile}.
* This is a simple and unbounded {@link BlockingQueue} implementation as a wrapper
* around a {@link QueueFile}. Thread safety is implemented using a single lock around all
* operations to the backing {@link QueueFile}.
*
* @param <E> the element type
*/
public class BlockingFileQueue implements BlockingQueue<byte[]> {
public class BlockingObjectQueue<E> implements BlockingQueue<E>, Closeable {

private final Lock lock = new ReentrantLock();

private final Condition nonEmpty = lock.newCondition();

private final QueueFile queue;
private final ObjectQueue<E> queue;

private BlockingObjectQueue(ObjectQueue<E> queue) {
this.queue = queue;
}

/**
* Creates a new @{@link BlockingQueue} of type {@code byte[]} backed by the given @{@link QueueFile}.
* Creates a new {@link BlockingQueue} of type {@code T} backed by the given
* {@link ObjectQueue} of type {@code T}.
*
* @param queue the queue file which should not be shared to other places
* */
public BlockingFileQueue(QueueFile queue) {
this.queue = queue;
* @param qf the queue file which should not be shared to other places
* @param conv the converter used to convert from and to byte arrays
* @param <T> the element type
* @return a BlockObjectQueue implementation
*/
public static <T> BlockingObjectQueue<T> create(QueueFile qf, ObjectQueue.Converter<T> conv) {
return new BlockingObjectQueue<>(new FileObjectQueue<>(qf, conv));
}

@Override public boolean add(byte[] bytes) {
/**
* Creates a new {@link BlockingQueue} of type {@code T} backed by the given
* {@link ObjectQueue} of type {@code T}.
*
* @param qf the queue file which should not be shared to other places
* @return a BlockObjectQueue implementation
*/
public static BlockingObjectQueue<byte[]> create(QueueFile qf) {
return create(qf, ByteArrayConverter.INSTANCE);
}

@Override public boolean add(E element) {
lock.lock();
try {
queue.add(bytes);
queue.add(element);
nonEmpty.signal();
return true;
} catch (IOException e) {
Expand All @@ -45,25 +68,25 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public boolean offer(byte[] bytes) {
return add(bytes);
@Override public boolean offer(E element) {
return add(element);
}

@Override public void put(byte[] bytes) {
add(bytes);
@Override public void put(E element) {
add(element);
}

@Override public boolean offer(byte[] bytes, long timeout, TimeUnit unit) {
return add(bytes);
@Override public boolean offer(E element, long timeout, TimeUnit unit) {
return add(element);
}

@Override public byte[] take() throws InterruptedException {
@Override public E take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
nonEmpty.await();
}
byte[] peek = queue.peek();
E peek = queue.peek();
if (peek == null) {
throw new IllegalStateException("Queue empty!");
}
Expand All @@ -76,14 +99,14 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public byte[] poll(long timeout, TimeUnit unit) throws InterruptedException {
@Override public E poll(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
long timeoutNanos = unit.toNanos(timeout);
while (queue.isEmpty() && timeoutNanos > 0) {
timeoutNanos = nonEmpty.awaitNanos(timeoutNanos);
}
byte[] peek = queue.peek();
E peek = queue.peek();
if (peek == null) {
throw new NoSuchElementException();
}
Expand All @@ -100,23 +123,22 @@ public BlockingFileQueue(QueueFile queue) {
return Integer.MAX_VALUE; // as per BlockingQueue javadoc for unbounded queues
}

/** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */
/**
* The underlying {@link QueueFile} only supports removing the head, so this will only work if
* head matches.
* */
@Override public boolean remove(Object o) {
if (o == null) {
return false;
}
if (o.getClass() != byte[].class && o.getClass() != Byte[].class) {
return false;
}
byte[] remove = (byte[])o;
lock.lock();
try {
if (queue.isEmpty()) {
return false;
}
Iterator<byte[]> it = queue.iterator();
Iterator<E> it = queue.iterator();
while (it.hasNext()) {
if (Arrays.equals(it.next(), remove)) {
if (Objects.deepEquals(it.next(), o)) {
it.remove();
return true;
}
Expand All @@ -128,17 +150,10 @@ public BlockingFileQueue(QueueFile queue) {
}

@Override public boolean contains(Object o) {
if (o == null) {
return false;
}
if (o.getClass() != byte[].class && o.getClass() != Byte[].class) {
return false;
}
byte[] check = (byte[])o;
lock.lock();
try {
for (byte[] entry : queue) {
if (Arrays.equals(entry, check)) {
for (E entry : queue) {
if (Objects.deepEquals(entry, o)) {
return true;
}
}
Expand All @@ -148,11 +163,11 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public int drainTo(Collection<? super byte[]> c) {
@Override public int drainTo(Collection<? super E> c) {
lock.lock();
try {
int size = queue.size();
Iterator<byte[]> it = queue.iterator();
Iterator<E> it = queue.iterator();
while (it.hasNext()) {
c.add(it.next());
it.remove();
Expand All @@ -163,13 +178,13 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public int drainTo(Collection<? super byte[]> c, int maxElements) {
@Override public int drainTo(Collection<? super E> c, int maxElements) {
if (maxElements == 0) {
return 0;
}
lock.lock();
try {
Iterator<byte[]> it = queue.iterator();
Iterator<E> it = queue.iterator();
int i = 0;
while (it.hasNext() && i < maxElements) {
c.add(it.next());
Expand All @@ -182,13 +197,13 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public byte[] remove() {
@Override public E remove() {
lock.lock();
try {
if (queue.isEmpty()) {
throw new NoSuchElementException();
}
byte[] peek = queue.peek();
E peek = queue.peek();
queue.remove();
return peek;
} catch (IOException e) {
Expand All @@ -198,13 +213,13 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public byte[] poll() {
@Override public E poll() {
lock.lock();
try {
if (queue.isEmpty()) {
return null;
}
byte[] peek = queue.peek();
E peek = queue.peek();
queue.remove();
return peek;
} catch (IOException e) {
Expand All @@ -214,7 +229,7 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public byte[] element() {
@Override public E element() {
lock.lock();
try {
if (queue.isEmpty()) {
Expand All @@ -228,7 +243,7 @@ public BlockingFileQueue(QueueFile queue) {
}
}

@Override public byte[] peek() {
@Override public E peek() {
lock.lock();
try {
if (queue.isEmpty()) {
Expand All @@ -243,23 +258,23 @@ public BlockingFileQueue(QueueFile queue) {
}

/**
* This overload is an addition to the @{@link BlockingQueue} interface similar to the {@link #poll(long, TimeUnit)}
* method, a blocking peek operation with a timeout.
* This overload is an addition to the {@link BlockingQueue} interface similar to the
* {@link #poll(long, TimeUnit)} method, a blocking peek operation with a timeout.
*
* @param timeout the timeout
* @param unit the time unit of the timeout
* @return the head of this queue, or {@code null} if this queue is empty
* @see #peek() for more information
* @throws InterruptedException if interrupted while waiting
*/
public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException {
public E peek(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
long timeoutNanos = unit.toNanos(timeout);
while (queue.isEmpty() && timeoutNanos > 0) {
timeoutNanos = nonEmpty.awaitNanos(timeoutNanos);
}
byte[] peek = queue.peek();
E peek = queue.peek();
if (peek == null) {
throw new NoSuchElementException();
}
Expand Down Expand Up @@ -289,7 +304,7 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException {
}
}

@Override public Iterator<byte[]> iterator() {
@Override public Iterator<E> iterator() {
lock.lock();
try {
return queue.iterator();
Expand All @@ -303,7 +318,7 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException {
try {
Object[] out = new Object[queue.size()];
int i = 0;
for (byte[] e : queue) {
for (E e : queue) {
out[i++] = e;
}
return out;
Expand All @@ -330,13 +345,13 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException {
}
}

@Override public boolean addAll(Collection<? extends byte[]> c) {
@Override public boolean addAll(Collection<? extends E> c) {
if (c.isEmpty()) {
return false;
}
lock.lock();
try {
for (byte[] e : c) {
for (E e : c) {
queue.add(e);
}
nonEmpty.signal();
Expand All @@ -348,7 +363,10 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException {
}
}

/** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */
/**
* The underlying {@link QueueFile} only supports removing the head, so this will only work if
* head matches.
* */
@Override public boolean removeAll(Collection<?> c) {
lock.lock();
boolean changed = false;
Expand All @@ -364,17 +382,19 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException {
}
}

private static boolean arrayContained(Collection<?> haystack, byte[] needle) {
private static boolean contains(Iterable<?> haystack, Object needle) {
for (Object o : haystack) {
byte[] byteArray = (byte[])o;
if (Arrays.equals(byteArray, needle)) {
if (Objects.deepEquals(o, needle)) {
return true;
}
}
return false;
}

/** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */
/**
* The underlying {@link QueueFile} only supports removing the head, so this will only work if
* head matches.
* */
@Override public boolean retainAll(Collection<?> c) {
lock.lock();
if (c.isEmpty()) {
Expand All @@ -387,9 +407,9 @@ private static boolean arrayContained(Collection<?> haystack, byte[] needle) {
}
try {
boolean changed = false;
Iterator<byte[]> it = queue.iterator();
Iterator<E> it = queue.iterator();
while (it.hasNext()) {
if (!arrayContained(c, it.next())) {
if (!contains(c, it.next())) {
it.remove();
changed = true;
}
Expand All @@ -410,4 +430,8 @@ private static boolean arrayContained(Collection<?> haystack, byte[] needle) {
lock.unlock();
}
}

@Override public void close() throws IOException {
queue.close();
}
}
20 changes: 20 additions & 0 deletions tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.squareup.tape2;

import java.io.IOException;
import java.io.OutputStream;

/**
* This is a simple "NO-OP" converter for byte[] queues which don't actually need a conversion.
*/
public class ByteArrayConverter implements ObjectQueue.Converter<byte[]> {

public static final ByteArrayConverter INSTANCE = new ByteArrayConverter();

@Override public byte[] from(byte[] source) {
return source;
}

@Override public void toStream(byte[] value, OutputStream sink) throws IOException {
sink.write(value);
}
}
Loading

0 comments on commit c799abe

Please sign in to comment.