Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Improve readability, maintainability, and reusability #488

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
45 changes: 17 additions & 28 deletions src/jcstress/java/com/lmax/disruptor/SequenceStressVarHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

public class SequenceStressVarHandle
{
public static abstract class AbstractStressTest
{
protected SequenceVarHandle sequence = new SequenceVarHandle(0);
}

/**
* `SequenceVarHandle::incrementAndGet` is atomic and should never lose an update, even with multiple threads racing.
*/
@JCStressTest
@Outcome(id = "1", expect = FORBIDDEN, desc = "One update lost.")
@Outcome(id = "2", expect = ACCEPTABLE, desc = "Both updates.")
@State
public static class IncrementAndGet
public static class IncrementAndGet extends AbstractStressTest
{
SequenceVarHandle sequence = new SequenceVarHandle(0);

@Actor
public void actor1()
{
Expand All @@ -54,10 +57,8 @@ public void arbiter(final J_Result r)
@Outcome(id = {"true, false, 10", "false, true, 20"}, expect = ACCEPTABLE, desc = "Either updated.")
@Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.")
@State
public static class CompareAndSet
public static class CompareAndSet extends AbstractStressTest
{
SequenceVarHandle sequence = new SequenceVarHandle(0);

@Actor
public void actor1(final ZZJ_Result r)
{
Expand Down Expand Up @@ -85,10 +86,8 @@ public void arbiter(final ZZJ_Result r)
@Outcome(id = "20", expect = FORBIDDEN, desc = "One update lost.")
@Outcome(id = "30", expect = ACCEPTABLE, desc = "Both updates.")
@State
public static class AddAndGet
public static class AddAndGet extends AbstractStressTest
{
SequenceVarHandle sequence = new SequenceVarHandle(0);

@Actor
public void actor1()
{
Expand Down Expand Up @@ -119,10 +118,8 @@ public void arbiter(final J_Result r)
@Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.")
@Ref("https://docs.oracle.com/javase/specs/jls/se11/html/jls-17.html#jls-17.7")
@State
public static class LongFullSet
public static class LongFullSet extends AbstractStressTest
{
SequenceVarHandle sequence = new SequenceVarHandle(0);

@Actor
public void writer()
{
Expand All @@ -146,10 +143,8 @@ public void reader(final J_Result r)
@Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.")
@Ref("https://docs.oracle.com/javase/specs/jls/se11/html/jls-17.html#jls-17.7")
@State
public static class LongFullSetVolatile
public static class LongFullSetVolatile extends AbstractStressTest
{
SequenceVarHandle sequence = new SequenceVarHandle(0);

@Actor
public void writer()
{
Expand All @@ -173,10 +168,8 @@ public void reader(final J_Result r)
@Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.")
@Ref("https://docs.oracle.com/javase/specs/jls/se11/html/jls-17.html#jls-17.7")
@State
public static class LongFullCompareAndSet
public static class LongFullCompareAndSet extends AbstractStressTest
{
SequenceVarHandle sequence = new SequenceVarHandle(0);

@Actor
public void writer()
{
Expand All @@ -190,7 +183,6 @@ public void reader(final J_Result r)
}
}


/**
* In absence of synchronization, the order of independent reads is undefined.
* In our case, the value in SequenceVarHandle is volatile which mandates the writes to the same
Expand All @@ -202,7 +194,7 @@ public void reader(final J_Result r)
@Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Doing first read early, not surprising.")
@Outcome(id = "1, 0", expect = FORBIDDEN, desc = "Violates coherence.")
@State
public static class SameVolatileRead
public static class SameVolatileRead extends AbstractStressTest
{
private final Holder h1 = new Holder();
private final Holder h2 = h1;
Expand All @@ -229,7 +221,6 @@ public void actor2(final JJ_Result r)
}
}


/**
* The value field in SequenceVarHandle is volatile so we should never see an update to it without seeing the update to a
* previously set value also.
Expand All @@ -243,7 +234,7 @@ public void actor2(final JJ_Result r)
@Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Caught in the middle: $x is visible, $y is not.")
@Outcome(id = "1, 0", expect = FORBIDDEN, desc = "Seeing $y, but not $x!")
@State
public static class SetVolatileGuard
public static class SetVolatileGuard extends AbstractStressTest
{
long x = 0;
SequenceVarHandle y = new SequenceVarHandle(0);
Expand Down Expand Up @@ -278,7 +269,7 @@ public void actor2(final JJ_Result r)
@Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Caught in the middle: $x is visible, $y is not.")
@Outcome(id = "1, 0", expect = FORBIDDEN, desc = "Seeing $y, but not $x!")
@State
public static class SetGuard
public static class SetGuard extends AbstractStressTest
{
long x = 0;
SequenceVarHandle y = new SequenceVarHandle(0);
Expand All @@ -298,15 +289,14 @@ public void actor2(final JJ_Result r)
}
}


/**
* Volatile setting will experience total ordering.
*/
@JCStressTest
@Outcome(id = {"0, 1", "1, 0", "1, 1"}, expect = ACCEPTABLE, desc = "Trivial under sequential consistency")
@Outcome(id = "0, 0", expect = FORBIDDEN, desc = "Violates sequential consistency")
@State
public static class SetVolatileDekker
public static class SetVolatileDekker extends AbstractStressTest
{
SequenceVarHandle x = new SequenceVarHandle(0);
SequenceVarHandle y = new SequenceVarHandle(0);
Expand All @@ -323,7 +313,6 @@ public void actor2(final JJ_Result r)
{
y.setVolatile(1);
r.r2 = x.get();

}
}

Expand All @@ -334,7 +323,7 @@ public void actor2(final JJ_Result r)
@Outcome(id = {"0, 1", "1, 0", "1, 1"}, expect = ACCEPTABLE, desc = "Trivial under sequential consistency")
@Outcome(id = "0, 0", expect = ACCEPTABLE_INTERESTING, desc = "Violates sequential consistency")
@State
public static class SetDekker
public static class SetDekker extends AbstractStressTest
{
SequenceVarHandle x = new SequenceVarHandle(0);
SequenceVarHandle y = new SequenceVarHandle(0);
Expand All @@ -353,4 +342,4 @@ public void actor2(final JJ_Result r)
r.r2 = x.get();
}
}
}
}
7 changes: 7 additions & 0 deletions src/main/java/com/lmax/disruptor/MultiProducerSequencer.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ public long tryNext(final int n) throws InsufficientCapacityException
throw new IllegalArgumentException("n must be > 0");
}

long next = calculateNextSequence(n);

return next;
}

private long calculateNextSequence(final int n) throws InsufficientCapacityException
{
long current;
long next;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,16 @@ public void onEvent(final FunctionEvent event, final long sequence, final boolea
switch (functionStep)
{
case ONE:
event.setStepOneResult(event.getOperandOne() + event.getOperandTwo());

handleStepOne(event);
break;

case TWO:
event.setStepTwoResult(event.getStepOneResult() + 3L);
handleStepTwo(event);
break;

case THREE:
if ((event.getStepTwoResult() & 4L) == 4L)
{
stepThreeCounter.set(stepThreeCounter.get() + 1L);
}
handleStepThree(event);
break;
}

Expand All @@ -70,4 +68,22 @@ public void onEvent(final FunctionEvent event, final long sequence, final boolea
latch.countDown();
}
}

private void handleStepOne(final FunctionEvent event)
{
event.setStepOneResult(event.getOperandOne() + event.getOperandTwo());
}

private void handleStepTwo(final FunctionEvent event)
{
event.setStepTwoResult(event.getStepOneResult() + 3L);
}

private void handleStepThree(final FunctionEvent event)
{
if ((event.getStepTwoResult() & 4L) == 4L)
{
stepThreeCounter.set(stepThreeCounter.get() + 1L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,37 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public final class FunctionQueueProcessor implements Runnable
public abstract class AbstractQueueProcessor implements Runnable
{
protected volatile boolean running;
protected long stepThreeCounter;
protected long sequence;
protected CountDownLatch latch;

public void reset(final CountDownLatch latch)
{
stepThreeCounter = 0L;
sequence = 0L;
this.latch = latch;
}

public void halt()
{
running = false;
}

@Override
public abstract void run();
}

public final class FunctionQueueProcessor extends AbstractQueueProcessor
{
private final FunctionStep functionStep;
private final BlockingQueue<long[]> stepOneQueue;
private final BlockingQueue<Long> stepTwoQueue;
private final BlockingQueue<Long> stepThreeQueue;
private final long count;

private volatile boolean running;
private long stepThreeCounter;
private long sequence;
private CountDownLatch latch;

public FunctionQueueProcessor(
final FunctionStep functionStep,
final BlockingQueue<long[]> stepOneQueue,
Expand All @@ -50,18 +68,6 @@ public long getStepThreeCounter()
return stepThreeCounter;
}

public void reset(final CountDownLatch latch)
{
stepThreeCounter = 0L;
sequence = 0L;
this.latch = latch;
}

public void halt()
{
running = false;
}

@Override
public void run()
{
Expand All @@ -73,29 +79,21 @@ public void run()
switch (functionStep)
{
case ONE:
{
long[] values = stepOneQueue.take();
stepTwoQueue.put(Long.valueOf(values[0] + values[1]));
break;
}

case TWO:
{
Long value = stepTwoQueue.take();
stepThreeQueue.put(Long.valueOf(value.longValue() + 3));
break;
}

case THREE:
{
Long value = stepThreeQueue.take();
long testValue = value.longValue();
if ((testValue & 4L) == 4L)
{
++stepThreeCounter;
}
break;
}
}

if (null != latch && sequence++ == count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

public final class ValueMutationQueueProcessor implements Runnable
{
private static final boolean RUNNING = true;
private volatile boolean running;
private long value;
private long sequence;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void halt()
@Override
public void run()
{
running = true;
running = RUNNING;
while (true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line of code looks so interested to me. It takes me back to 2014 when I saw my friend create a constant ZERO = 0. So, in my point of view, we can keep this running = true following KISS

{
try
Expand Down