From d876c77d817c3d2fad981129e92ebdd66c46ae80 Mon Sep 17 00:00:00 2001 From: saikumarmamidala Date: Wed, 27 Nov 2024 20:19:17 -0400 Subject: [PATCH 1/6] Refactor: Extract sequence logic into getNextSequence for better readability --- .../java/com/lmax/disruptor/MultiProducerSequencer.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java b/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java index b773920e0..8499c729c 100644 --- a/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java +++ b/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java @@ -156,6 +156,13 @@ public long tryNext(final int n) throws InsufficientCapacityException throw new IllegalArgumentException("n must be > 0"); } + long next = getNextSequence(n); + + return next; + } + + private long getNextSequence(final int n) throws InsufficientCapacityException + { long current; long next; From 34ad34707bd6dec20496fba0a5076727310c9d3d Mon Sep 17 00:00:00 2001 From: saikumarmamidala Date: Wed, 27 Nov 2024 20:50:02 -0400 Subject: [PATCH 2/6] Refactor: Rename getNextSequence to calculateNextSequence for better clarity --- src/main/java/com/lmax/disruptor/MultiProducerSequencer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java b/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java index 8499c729c..2cddd71ac 100644 --- a/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java +++ b/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java @@ -156,12 +156,12 @@ public long tryNext(final int n) throws InsufficientCapacityException throw new IllegalArgumentException("n must be > 0"); } - long next = getNextSequence(n); + long next = calculateNextSequence(n); return next; } - private long getNextSequence(final int n) throws InsufficientCapacityException + private long calculateNextSequence(final int n) throws InsufficientCapacityException { long current; long next; From b3bb837209445aaa4ba91b1272054a6cfada4f28 Mon Sep 17 00:00:00 2001 From: saikumarmamidala Date: Wed, 27 Nov 2024 21:01:55 -0400 Subject: [PATCH 3/6] Refactor: Decompose onEvent switch-case into separate methods for clarity and maintainability --- .../support/FunctionEventHandler.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/perftest/java/com/lmax/disruptor/support/FunctionEventHandler.java b/src/perftest/java/com/lmax/disruptor/support/FunctionEventHandler.java index 0d636cf62..8531d2ebf 100644 --- a/src/perftest/java/com/lmax/disruptor/support/FunctionEventHandler.java +++ b/src/perftest/java/com/lmax/disruptor/support/FunctionEventHandler.java @@ -50,18 +50,16 @@ public void onEvent(final FunctionEvent event, final long sequence, final boolea switch (functionStep) { case ONE: - event.setStepOneResult(event.getOperandOne() + event.getOperandTwo()); + + handleStepOne(event); break; case TWO: - event.setStepTwoResult(event.getStepOneResult() + 3L); + handleStepTwo(event); break; case THREE: - if ((event.getStepTwoResult() & 4L) == 4L) - { - stepThreeCounter.set(stepThreeCounter.get() + 1L); - } + handleStepThree(event); break; } @@ -70,4 +68,22 @@ public void onEvent(final FunctionEvent event, final long sequence, final boolea latch.countDown(); } } + + private void handleStepOne(final FunctionEvent event) + { + event.setStepOneResult(event.getOperandOne() + event.getOperandTwo()); + } + + private void handleStepTwo(final FunctionEvent event) + { + event.setStepTwoResult(event.getStepOneResult() + 3L); + } + + private void handleStepThree(final FunctionEvent event) + { + if ((event.getStepTwoResult() & 4L) == 4L) + { + stepThreeCounter.set(stepThreeCounter.get() + 1L); + } + } } From 92f7b17f3f396bfe4608d8d122c46a9bd5b9467c Mon Sep 17 00:00:00 2001 From: saikumarmamidala Date: Wed, 27 Nov 2024 23:14:46 -0400 Subject: [PATCH 4/6] Refactor: Pull down variable to improve encapsulation and readability --- .../disruptor/SequenceStressVarHandle.java | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/src/jcstress/java/com/lmax/disruptor/SequenceStressVarHandle.java b/src/jcstress/java/com/lmax/disruptor/SequenceStressVarHandle.java index 7cbabb9c7..a5ed571d9 100644 --- a/src/jcstress/java/com/lmax/disruptor/SequenceStressVarHandle.java +++ b/src/jcstress/java/com/lmax/disruptor/SequenceStressVarHandle.java @@ -17,6 +17,11 @@ public class SequenceStressVarHandle { + public static abstract class AbstractStressTest + { + protected SequenceVarHandle sequence = new SequenceVarHandle(0); + } + /** * `SequenceVarHandle::incrementAndGet` is atomic and should never lose an update, even with multiple threads racing. */ @@ -24,10 +29,8 @@ public class SequenceStressVarHandle @Outcome(id = "1", expect = FORBIDDEN, desc = "One update lost.") @Outcome(id = "2", expect = ACCEPTABLE, desc = "Both updates.") @State - public static class IncrementAndGet + public static class IncrementAndGet extends AbstractStressTest { - SequenceVarHandle sequence = new SequenceVarHandle(0); - @Actor public void actor1() { @@ -54,10 +57,8 @@ public void arbiter(final J_Result r) @Outcome(id = {"true, false, 10", "false, true, 20"}, expect = ACCEPTABLE, desc = "Either updated.") @Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.") @State - public static class CompareAndSet + public static class CompareAndSet extends AbstractStressTest { - SequenceVarHandle sequence = new SequenceVarHandle(0); - @Actor public void actor1(final ZZJ_Result r) { @@ -85,10 +86,8 @@ public void arbiter(final ZZJ_Result r) @Outcome(id = "20", expect = FORBIDDEN, desc = "One update lost.") @Outcome(id = "30", expect = ACCEPTABLE, desc = "Both updates.") @State - public static class AddAndGet + public static class AddAndGet extends AbstractStressTest { - SequenceVarHandle sequence = new SequenceVarHandle(0); - @Actor public void actor1() { @@ -119,10 +118,8 @@ public void arbiter(final J_Result r) @Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.") @Ref("https://docs.oracle.com/javase/specs/jls/se11/html/jls-17.html#jls-17.7") @State - public static class LongFullSet + public static class LongFullSet extends AbstractStressTest { - SequenceVarHandle sequence = new SequenceVarHandle(0); - @Actor public void writer() { @@ -146,10 +143,8 @@ public void reader(final J_Result r) @Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.") @Ref("https://docs.oracle.com/javase/specs/jls/se11/html/jls-17.html#jls-17.7") @State - public static class LongFullSetVolatile + public static class LongFullSetVolatile extends AbstractStressTest { - SequenceVarHandle sequence = new SequenceVarHandle(0); - @Actor public void writer() { @@ -173,10 +168,8 @@ public void reader(final J_Result r) @Outcome(expect = FORBIDDEN, desc = "Other cases are forbidden.") @Ref("https://docs.oracle.com/javase/specs/jls/se11/html/jls-17.html#jls-17.7") @State - public static class LongFullCompareAndSet + public static class LongFullCompareAndSet extends AbstractStressTest { - SequenceVarHandle sequence = new SequenceVarHandle(0); - @Actor public void writer() { @@ -190,7 +183,6 @@ public void reader(final J_Result r) } } - /** * In absence of synchronization, the order of independent reads is undefined. * In our case, the value in SequenceVarHandle is volatile which mandates the writes to the same @@ -202,7 +194,7 @@ public void reader(final J_Result r) @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Doing first read early, not surprising.") @Outcome(id = "1, 0", expect = FORBIDDEN, desc = "Violates coherence.") @State - public static class SameVolatileRead + public static class SameVolatileRead extends AbstractStressTest { private final Holder h1 = new Holder(); private final Holder h2 = h1; @@ -229,7 +221,6 @@ public void actor2(final JJ_Result r) } } - /** * The value field in SequenceVarHandle is volatile so we should never see an update to it without seeing the update to a * previously set value also. @@ -243,7 +234,7 @@ public void actor2(final JJ_Result r) @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Caught in the middle: $x is visible, $y is not.") @Outcome(id = "1, 0", expect = FORBIDDEN, desc = "Seeing $y, but not $x!") @State - public static class SetVolatileGuard + public static class SetVolatileGuard extends AbstractStressTest { long x = 0; SequenceVarHandle y = new SequenceVarHandle(0); @@ -278,7 +269,7 @@ public void actor2(final JJ_Result r) @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Caught in the middle: $x is visible, $y is not.") @Outcome(id = "1, 0", expect = FORBIDDEN, desc = "Seeing $y, but not $x!") @State - public static class SetGuard + public static class SetGuard extends AbstractStressTest { long x = 0; SequenceVarHandle y = new SequenceVarHandle(0); @@ -298,7 +289,6 @@ public void actor2(final JJ_Result r) } } - /** * Volatile setting will experience total ordering. */ @@ -306,7 +296,7 @@ public void actor2(final JJ_Result r) @Outcome(id = {"0, 1", "1, 0", "1, 1"}, expect = ACCEPTABLE, desc = "Trivial under sequential consistency") @Outcome(id = "0, 0", expect = FORBIDDEN, desc = "Violates sequential consistency") @State - public static class SetVolatileDekker + public static class SetVolatileDekker extends AbstractStressTest { SequenceVarHandle x = new SequenceVarHandle(0); SequenceVarHandle y = new SequenceVarHandle(0); @@ -323,7 +313,6 @@ public void actor2(final JJ_Result r) { y.setVolatile(1); r.r2 = x.get(); - } } @@ -334,7 +323,7 @@ public void actor2(final JJ_Result r) @Outcome(id = {"0, 1", "1, 0", "1, 1"}, expect = ACCEPTABLE, desc = "Trivial under sequential consistency") @Outcome(id = "0, 0", expect = ACCEPTABLE_INTERESTING, desc = "Violates sequential consistency") @State - public static class SetDekker + public static class SetDekker extends AbstractStressTest { SequenceVarHandle x = new SequenceVarHandle(0); SequenceVarHandle y = new SequenceVarHandle(0); @@ -353,4 +342,4 @@ public void actor2(final JJ_Result r) r.r2 = x.get(); } } -} +} \ No newline at end of file From a7c15ad117e696b4479b89bb07e9d2e211072609 Mon Sep 17 00:00:00 2001 From: saikumarmamidala Date: Wed, 27 Nov 2024 23:29:27 -0400 Subject: [PATCH 5/6] Refactor: Replace magic literal with named constant RUNNING --- .../lmax/disruptor/support/ValueMutationQueueProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/perftest/java/com/lmax/disruptor/support/ValueMutationQueueProcessor.java b/src/perftest/java/com/lmax/disruptor/support/ValueMutationQueueProcessor.java index b76af8a0e..95bfb6321 100644 --- a/src/perftest/java/com/lmax/disruptor/support/ValueMutationQueueProcessor.java +++ b/src/perftest/java/com/lmax/disruptor/support/ValueMutationQueueProcessor.java @@ -20,6 +20,7 @@ public final class ValueMutationQueueProcessor implements Runnable { + private static final boolean RUNNING = true; private volatile boolean running; private long value; private long sequence; @@ -57,7 +58,7 @@ public void halt() @Override public void run() { - running = true; + running = RUNNING; while (true) { try From aa4cbed333e894aaf80d20d1c58c6e3370b25c97 Mon Sep 17 00:00:00 2001 From: saikumarmamidala Date: Wed, 27 Nov 2024 23:46:21 -0400 Subject: [PATCH 6/6] Refactor: Move common fields to AbstractQueueProcessor superclass --- .../support/FunctionQueueProcessor.java | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/src/perftest/java/com/lmax/disruptor/support/FunctionQueueProcessor.java b/src/perftest/java/com/lmax/disruptor/support/FunctionQueueProcessor.java index e6af4d77b..473aa8801 100644 --- a/src/perftest/java/com/lmax/disruptor/support/FunctionQueueProcessor.java +++ b/src/perftest/java/com/lmax/disruptor/support/FunctionQueueProcessor.java @@ -18,7 +18,30 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -public final class FunctionQueueProcessor implements Runnable +public abstract class AbstractQueueProcessor implements Runnable +{ + protected volatile boolean running; + protected long stepThreeCounter; + protected long sequence; + protected CountDownLatch latch; + + public void reset(final CountDownLatch latch) + { + stepThreeCounter = 0L; + sequence = 0L; + this.latch = latch; + } + + public void halt() + { + running = false; + } + + @Override + public abstract void run(); +} + +public final class FunctionQueueProcessor extends AbstractQueueProcessor { private final FunctionStep functionStep; private final BlockingQueue stepOneQueue; @@ -26,11 +49,6 @@ public final class FunctionQueueProcessor implements Runnable private final BlockingQueue stepThreeQueue; private final long count; - private volatile boolean running; - private long stepThreeCounter; - private long sequence; - private CountDownLatch latch; - public FunctionQueueProcessor( final FunctionStep functionStep, final BlockingQueue stepOneQueue, @@ -50,18 +68,6 @@ public long getStepThreeCounter() return stepThreeCounter; } - public void reset(final CountDownLatch latch) - { - stepThreeCounter = 0L; - sequence = 0L; - this.latch = latch; - } - - public void halt() - { - running = false; - } - @Override public void run() { @@ -73,21 +79,14 @@ public void run() switch (functionStep) { case ONE: - { long[] values = stepOneQueue.take(); stepTwoQueue.put(Long.valueOf(values[0] + values[1])); break; - } - case TWO: - { Long value = stepTwoQueue.take(); stepThreeQueue.put(Long.valueOf(value.longValue() + 3)); break; - } - case THREE: - { Long value = stepThreeQueue.take(); long testValue = value.longValue(); if ((testValue & 4L) == 4L) @@ -95,7 +94,6 @@ public void run() ++stepThreeCounter; } break; - } } if (null != latch && sequence++ == count)