Skip to content

Commit

Permalink
Ensure consumer visibility by waiting for producer addition to comple…
Browse files Browse the repository at this point in the history
…te (fixed #9)
  • Loading branch information
ben-manes committed Apr 18, 2015
1 parent be22113 commit df9ac13
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 166 deletions.
6 changes: 0 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ buildscript {
apply from: "${rootDir}/gradle/dependencies.gradle"

repositories {
mavenCentral()
jcenter()
}

Expand Down Expand Up @@ -77,11 +76,6 @@ subprojects { proj ->
}
}

repositories {
mavenCentral()
jcenter()
}

// Only report code coverage for projects that are distrubuted
def publishedProjects = subprojects.findAll {
!it.path.startsWith(':simulator') && (it.path != ':tracing')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -55,7 +54,7 @@
* concurrently. This approach avoids contention on the queue by combining colliding operations
* that have identical semantics. When a pair of producers collide, the task of performing the
* combined set of operations is delegated to one of the threads and the other thread optionally
* waits for its operation to be performed. This decision of whether to wait for completion is
* waits for its operation to be completed. This decision of whether to wait for completion is
* determined by constructing either a <em>linearizable</em> or <em>optimistic</em> queue.
* <p>
* Iterators are <i>weakly consistent</i>, returning elements reflecting the state of the queue at
Expand Down Expand Up @@ -133,16 +132,6 @@ static int ceilingNextPowerOfTwo(int x) {
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1));
}

/** Returns the arena index for the current thread. */
static final int index() {
int probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
if (probe == 0) {
ThreadLocalRandom.current(); // force initialization
probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}
return (probe & ARENA_MASK);
}

final AtomicReference<Node<E>>[] arena;
final Function<E, Node<E>> factory;

Expand All @@ -161,7 +150,8 @@ private SingleConsumerQueue(Function<E, Node<E>> factory) {
/**
* Creates a queue that with an optimistic backoff strategy. A thread completes its operation
* without waiting after it successfully hands off the additional element(s) to another producing
* thread for batch insertion.
* thread for batch insertion. This optimistic behavior may result in additions not appearing in
* FIFO order due to the backoff strategy trying to compensate for stack contention.
*
* @param <E> the type of elements held in this collection
* @return a new queue where producers complete their operation immediately if combined with
Expand Down Expand Up @@ -191,12 +181,16 @@ public boolean isEmpty() {

@Override
public int size() {
Node<E> h = head;
Node<E> cursor = h.getNextRelaxed();
Node<E> cursor = head;
Node<E> t = tail;
int size = 0;
while (cursor != null) {
cursor = cursor.getNextRelaxed();
while ((cursor != t) && (size != Integer.MAX_VALUE)) {
Node<E> next = cursor.getNextRelaxed();
if (next == null) {
while ((next = cursor.next) == null) {}
}
size++;
cursor = next;
}
return size;
}
Expand All @@ -206,51 +200,23 @@ public void clear() {
lazySetHead(tail);
}

@Override
public boolean contains(@Nullable Object o) {
if (o == null) {
return false;
}

Node<E> cursor = head.getNextRelaxed();
while (cursor != null) {
if (o.equals(cursor.value)) {
return true;
}
cursor = cursor.next;
}
return false;
}

@Override
public boolean containsAll(Collection<?> c) {
Objects.requireNonNull(c);
for (Object e : c) {
if (!contains(e)) {
return false;
}
}
return true;
}

@Override
public E peek() {
Node<E> next = head.getNextRelaxed();
return (next == null) ? null : next.value;
}

@Override
public E element() {
E e = peek();
if (e == null) {
throw new NoSuchElementException();
Node<E> h = head;
Node<E> t = tail;
if (h == t) {
return null;
}
return e;
Node<E> next = h.getNextRelaxed();
if (next == null) {
while ((next = h.next) == null) {}
}
return next.value;
}

@Override
public boolean offer(E e) {
Objects.requireNonNull(e);
requireNonNull(e);

Node<E> node = factory.apply(e);
append(node, node);
Expand All @@ -259,9 +225,15 @@ public boolean offer(E e) {

@Override
public E poll() {
Node<E> next = head.getNextRelaxed();
Node<E> h = head;
Node<E> next = h.getNextRelaxed();
if (next == null) {
return null;
Node<E> t = tail;
if (h == t) {
return null;
} else {
while ((next = h.next) == null) {}
}
}
lazySetHead(next);
E e = next.value;
Expand All @@ -276,7 +248,7 @@ public boolean add(E e) {

@Override
public boolean addAll(Collection<? extends E> c) {
Objects.requireNonNull(c);
requireNonNull(c);

Node<E> first = null;
Node<E> last = null;
Expand All @@ -286,7 +258,7 @@ public boolean addAll(Collection<? extends E> c) {
first = factory.apply(e);
last = first;
} else {
Node<E> newLast = factory.apply(e);
Node<E> newLast = new Node<E>(e);
last.lazySetNext(newLast);
last = newLast;
}
Expand All @@ -299,7 +271,7 @@ public boolean addAll(Collection<? extends E> c) {
}

/** Adds the linked list of nodes to the queue. */
void append(Node<E> first, Node<E> last) {
void append(@Nonnull Node<E> first, @Nonnull Node<E> last) {
for (;;) {
Node<E> t = tail;
if (casTail(t, last)) {
Expand All @@ -323,13 +295,13 @@ void append(Node<E> first, Node<E> last) {
}

/**
* Attempts to transfer the linked list to a waiting consumer or receive a linked list from a
* waiting producer.
* Attempts to receive a linked list from a waiting producer or transfer the specified linked list
* to an arriving producer.
*
* @param first the first node in the linked list to try to transfer
* @param last the last node in the linked list to try to transfer
* @return either {@code null} if the element was transferred, the first node if neither a
* transfer or receive were successful, or the received last element from a producer
* transfer nor receive were successful, or the received last element from a producer
*/
@Nullable Node<E> transferOrCombine(@Nonnull Node<E> first, Node<E> last) {
int index = index();
Expand Down Expand Up @@ -362,92 +334,31 @@ void append(Node<E> first, Node<E> last) {
}
}

/** Returns the arena index for the current thread. */
static final int index() {
int probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
if (probe == 0) {
ThreadLocalRandom.current(); // force initialization
probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}
return (probe & ARENA_MASK);
}

/** Returns the last node in the linked list. */
static <E> Node<E> findLast(Node<E> node) {
@Nonnull static <E> Node<E> findLast(@Nonnull Node<E> node) {
Node<E> next;
while ((next = node.getNextRelaxed()) != null) {
node = next;
}
return node;
}

@Override
public E remove() {
E e = poll();
if (e == null) {
throw new NoSuchElementException();
}
return e;
}

@Override
public boolean remove(Object o) {
Objects.requireNonNull(o);

Node<E> t = tail;
Node<E> prev = getHeadRelaxed();
Node<E> cursor = prev.getNextRelaxed();
while (cursor != null) {
Node<E> next = cursor.getNextRelaxed();
if (o.equals(cursor.value)) {
if ((t == cursor) && !casTail(t, prev) && (next == null)) {
next = t.next;
}
prev.lazySetNext(next);
return true;
}
prev = cursor;
cursor = next;
}
return false;
}

@Override
public boolean removeAll(Collection<?> c) {
return removeByPresentce(c, false);
}

@Override
public boolean retainAll(Collection<?> c) {
return removeByPresentce(c, true);
}

/**
* Removes elements based on whether they are also present in the provided collection.
*
* @param c collection containing elements to keep or discard
* @param retain whether to retain only or remove only elements present in both collections
*/
boolean removeByPresentce(Collection<?> c, boolean retain) {
Objects.requireNonNull(c);

Node<E> t = tail;
Node<E> prev = getHeadRelaxed();
Node<E> cursor = prev.getNextRelaxed();
boolean modified = false;
while (cursor != null) {
boolean present = c.contains(cursor.value);
Node<E> next = cursor.getNextRelaxed();
if (present != retain) {
if ((t == cursor) && !casTail(t, prev) && (next == null)) {
next = t.next;
}
prev.lazySetNext(next);
modified = true;
} else {
prev = cursor;
}
cursor = prev.getNextRelaxed();
}
return modified;
}

@Override
public Iterator<E> iterator() {
return new Iterator<E>() {
Node<E> t = tail;
Node<E> prev = null;
Node<E> cursor = getHeadRelaxed();
Node<E> cursor = head;
boolean failOnRemoval = true;

@Override
Expand All @@ -466,6 +377,10 @@ public E next() {
}

private void advance() {
if (cursor.getNextRelaxed() == null) {
while (cursor.next == null) {}
}

if ((prev == null) || !failOnRemoval) {
prev = cursor;
}
Expand All @@ -487,21 +402,6 @@ public void remove() {
};
}

@Override
public Object[] toArray() {
return stream().toArray();
}

@Override
public <T> T[] toArray(T[] a) {
return stream().collect(Collectors.toList()).toArray(a);
}

@Override
public String toString() {
return stream().map(Object::toString).collect(Collectors.joining(", ", "[", "]"));
}

/* ---------------- Serialization Support -------------- */

static final long serialVersionUID = 1;
Expand Down Expand Up @@ -596,22 +496,17 @@ boolean isDone() {
}
}

abstract class PadHead {
abstract class PadHead<E> extends AbstractQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}

/** Enforces a memory layout to avoid false sharing by padding the head node. */
abstract class HeadRef<E> extends PadHead {
abstract class HeadRef<E> extends PadHead<E> {
static final long HEAD_OFFSET = UnsafeAccess.objectFieldOffset(HeadRef.class, "head");

volatile Node<E> head;

@SuppressWarnings("unchecked")
Node<E> getHeadRelaxed() {
return (Node<E>) UnsafeAccess.UNSAFE.getObject(this, HEAD_OFFSET);
}

void lazySetHead(Node<E> next) {
UnsafeAccess.UNSAFE.putOrderedObject(this, HEAD_OFFSET, next);
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tasks.jmh {
}

jmh {
jmhVersion = '1.8'
jmhVersion = '1.9'

if (project.hasProperty('includePattern')) {
include = project.includePattern
Expand Down

0 comments on commit df9ac13

Please sign in to comment.