From 58a8d8fad8d914b68afb7ad355891f3b10571c30 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 29 Oct 2018 01:16:29 +0200 Subject: [PATCH 01/10] fix issue when current and future streams are not terminated after connection dispose (#541) Signed-off-by: Maksym Ostroverkhov --- .../main/java/io/rsocket/RSocketClient.java | 57 ++++++++++------- .../rsocket/RSocketClientTerminationTest.java | 64 +++++++++++++++++++ .../java/io/rsocket/RSocketClientTest.java | 2 +- 3 files changed, 98 insertions(+), 25 deletions(-) create mode 100644 rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 44460a938..8f0941285 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -21,8 +21,11 @@ import io.rsocket.framing.FrameType; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; + +import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import org.jctools.maps.NonBlockingHashMapLong; @@ -72,7 +75,7 @@ class RSocketClient implements RSocket { // DO NOT Change the order here. The Send processor must be subscribed to before receiving this.sendProcessor = new UnboundedProcessor<>(); - connection.onClose().doFinally(signalType -> cleanup()).subscribe(null, errorConsumer); + connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer); connection .send(sendProcessor) @@ -92,7 +95,9 @@ class RSocketClient implements RSocket { keepAlive -> { String message = String.format("No keep-alive acks for %d ms", keepAlive.getTimeoutMillis()); - errorConsumer.accept(new ConnectionErrorException(message)); + ConnectionErrorException err = new ConnectionErrorException(message); + lifecycle.terminate(err); + errorConsumer.accept(err); connection.dispose(); }); keepAliveHandler.send().subscribe(sendProcessor::onNext); @@ -157,12 +162,7 @@ public Flux requestChannel(Publisher payloads) { @Override public Mono metadataPush(Payload payload) { - return Mono.fromRunnable( - () -> { - final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); - payload.release(); - sendProcessor.onNext(requestFrame); - }); + return handleMetadataPush(payload); } @Override @@ -187,7 +187,7 @@ public Mono onClose() { private Mono handleFireAndForget(Payload payload) { return lifecycle - .started() + .active() .then( Mono.fromRunnable( () -> { @@ -201,7 +201,7 @@ private Mono handleFireAndForget(Payload payload) { private Flux handleRequestStream(final Payload payload) { return lifecycle - .started() + .active() .thenMany( Flux.defer( () -> { @@ -247,7 +247,7 @@ private Flux handleRequestStream(final Payload payload) { private Mono handleRequestResponse(final Payload payload) { return lifecycle - .started() + .active() .then( Mono.defer( () -> { @@ -274,7 +274,7 @@ private Mono handleRequestResponse(final Payload payload) { private Flux handleChannel(Flux request) { return lifecycle - .started() + .active() .thenMany( Flux.defer( () -> { @@ -365,11 +365,25 @@ private Flux handleChannel(Flux request) { })); } + private Mono handleMetadataPush(Payload payload) { + return lifecycle + .active() + .then(Mono.fromRunnable( + () -> { + final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); + payload.release(); + sendProcessor.onNext(requestFrame); + })); + } + private boolean contains(int streamId) { return receivers.containsKey(streamId); } - protected void cleanup() { + protected void terminate() { + + lifecycle.terminate(new ClosedChannelException()); + if (keepAliveHandler != null) { keepAliveHandler.dispose(); } @@ -397,13 +411,8 @@ private synchronized void cleanUpLimitableRequestPublisher( } private synchronized void cleanUpSubscriber(UnicastProcessor subscriber) { - Throwable err = lifecycle.terminationError(); try { - if (err != null) { - subscriber.onError(err); - } else { - subscriber.cancel(); - } + subscriber.onError(lifecycle.terminationError()); } catch (Throwable t) { errorConsumer.accept(t); } @@ -519,12 +528,12 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame private static class Lifecycle { - private volatile Throwable terminationError; + private final AtomicReference terminationError = new AtomicReference<>(); - public Mono started() { + public Mono active() { return Mono.create( sink -> { - Throwable err = terminationError; + Throwable err = terminationError(); if (err == null) { sink.success(); } else { @@ -534,11 +543,11 @@ public Mono started() { } public void terminate(Throwable err) { - this.terminationError = err; + this.terminationError.compareAndSet(null, err); } public Throwable terminationError() { - return terminationError; + return terminationError.get(); } } } diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java new file mode 100644 index 000000000..a8e60d02b --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java @@ -0,0 +1,64 @@ +package io.rsocket; + +import io.rsocket.RSocketClientTest.ClientSocketRule; +import io.rsocket.util.EmptyPayload; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.Arrays; +import java.util.function.Function; + +@RunWith(Parameterized.class) +public class RSocketClientTerminationTest { + + @Rule + public final ClientSocketRule rule = new ClientSocketRule(); + private Function> interaction; + + public RSocketClientTerminationTest(Function> interaction) { + this.interaction = interaction; + } + + @Test + public void testCurrentStreamIsTerminatedOnConnectionClose() { + RSocketClient rSocket = rule.socket; + + Mono.delay(Duration.ofSeconds(1)) + .doOnNext(v -> rule.connection.dispose()) + .subscribe(); + + StepVerifier.create(interaction.apply(rSocket)) + .expectError(ClosedChannelException.class) + .verify(Duration.ofSeconds(5)); + } + + @Test + public void testSubsequentStreamIsTerminatedAfterConnectionClose() { + RSocketClient rSocket = rule.socket; + + rule.connection.dispose(); + StepVerifier.create(interaction.apply(rSocket)) + .expectError(ClosedChannelException.class) + .verify(Duration.ofSeconds(5)); + } + + @Parameterized.Parameters + public static Iterable>> rsocketInteractions() { + EmptyPayload payload = EmptyPayload.INSTANCE; + Publisher payloadStream = Flux.just(payload); + + Function> resp = rSocket -> rSocket.requestResponse(payload); + Function> stream = rSocket -> rSocket.requestStream(payload); + Function> channel = rSocket -> rSocket.requestChannel(payloadStream); + + return Arrays.asList(resp, stream, channel); + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 4fcb65751..a153e2f63 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -215,7 +215,7 @@ protected RSocketClient newRSocket() { throwable -> errors.add(throwable), StreamIdSupplier.clientSupplier(), Duration.ofMillis(100), - Duration.ofMillis(100), + Duration.ofMillis(10_000), 4); } From 398db15737a394921db0861dd28c87096b3f3d39 Mon Sep 17 00:00:00 2001 From: kbahr Date: Mon, 29 Oct 2018 13:48:59 -0700 Subject: [PATCH 02/10] update reactor-bom dependency (#542) Signed-off-by: Kyle Bahr --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 381ed7236..41e604488 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ plugins { subprojects { apply plugin: 'io.spring.dependency-management' - ext['reactor-bom.version'] = 'Californium-BUILD-SNAPSHOT' + ext['reactor-bom.version'] = 'Californium-SR2' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' ext['netty.version'] = '4.1.29.Final' From f0ba135d23f1b8b73e2276316ca070df476ee0e4 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 29 Oct 2018 23:12:45 +0200 Subject: [PATCH 03/10] unapply bintray and artifactory plugins Signed-off-by: Oleh Dokuka --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 41e604488..7626acef0 100644 --- a/build.gradle +++ b/build.gradle @@ -18,8 +18,8 @@ plugins { id 'com.gradle.build-scan' version '1.16' id 'com.github.sherter.google-java-format' version '0.7.1' - id 'com.jfrog.artifactory' version '4.7.3' - id 'com.jfrog.bintray' version '1.8.4' + id 'com.jfrog.artifactory' version '4.7.3' apply false + id 'com.jfrog.bintray' version '1.8.4' apply false id 'me.champeau.gradle.jmh' version '0.4.7' apply false id 'io.spring.dependency-management' version '1.0.6.RELEASE' apply false id 'io.morethan.jmhreport' version '0.9.0' apply false From af6628c7823952a91f3b7b3d37054373be267482 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 29 Oct 2018 23:45:02 +0200 Subject: [PATCH 04/10] included a few fixes for SwitchTransform Signed-off-by: Oleh Dokuka --- .../rsocket/internal/SwitchTransformFlux.java | 507 ++++++++++++++---- .../internal/SwitchTransformFluxTest.java | 370 +++++++++++-- 2 files changed, 734 insertions(+), 143 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java index 6b4626f9b..6a36cf090 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -16,18 +16,21 @@ package io.rsocket.internal; -import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; + +import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Operators; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; public final class SwitchTransformFlux extends Flux { @@ -35,7 +38,7 @@ public final class SwitchTransformFlux extends Flux { final BiFunction, Publisher> transformer; public SwitchTransformFlux( - Publisher source, BiFunction, Publisher> transformer) { + Publisher source, BiFunction, Publisher> transformer) { this.source = Objects.requireNonNull(source, "source"); this.transformer = Objects.requireNonNull(transformer, "transformer"); } @@ -46,30 +49,47 @@ public int getPrefetch() { } @Override + @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber actual) { - source.subscribe(new SwitchTransformMain<>(actual, transformer)); + if (actual instanceof Fuseable.ConditionalSubscriber) { + source.subscribe(new SwitchTransformConditionalOperator<>((Fuseable.ConditionalSubscriber) actual, transformer)); + return; + } + source.subscribe(new SwitchTransformOperator<>(actual, transformer)); } - static final class SwitchTransformMain implements CoreSubscriber, Scannable { + static final class SwitchTransformOperator extends Flux + implements CoreSubscriber, Subscription, Scannable { - final CoreSubscriber actual; + final CoreSubscriber outer; final BiFunction, Publisher> transformer; - final SwitchTransformInner inner; Subscription s; + Throwable throwable; - volatile int once; + volatile boolean done; + volatile T first; + volatile CoreSubscriber inner; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformMain.class, "once"); + static final AtomicReferenceFieldUpdater INNER = + AtomicReferenceFieldUpdater.newUpdater(SwitchTransformOperator.class, CoreSubscriber.class, "inner"); - SwitchTransformMain( - CoreSubscriber actual, - BiFunction, Publisher> transformer) { - this.actual = actual; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip"); + + volatile int once; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once"); + + SwitchTransformOperator( + CoreSubscriber outer, + BiFunction, Publisher> transformer) { + this.outer = outer; this.transformer = transformer; - this.inner = new SwitchTransformInner<>(this); } @Override @@ -81,6 +101,44 @@ public Object scanUnsafe(Attr key) { return null; } + @Override + public Context currentContext() { + CoreSubscriber actual = inner; + + if (actual != null) { + return actual.currentContext(); + } + + return outer.currentContext(); + } + + @Override + public void cancel() { + if (s != Operators.cancelledSubscription()) { + Subscription s = this.s; + this.s = Operators.cancelledSubscription(); + ReferenceCountUtil.safeRelease(first); + + if (WIP.getAndIncrement(this) == 0) { + INNER.lazySet(this, null); + first = null; + } + + s.cancel(); + } + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + INNER.lazySet(this, actual); + actual.onSubscribe(this); + } + else { + Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + } + @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { @@ -91,161 +149,428 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (isCanceled()) { + if (done) { + Operators.onNextDropped(t, currentContext()); return; } - if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + CoreSubscriber i = inner; + + if (i == null) { try { - inner.first = t; + first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, inner), "The transformer returned a null value"); - result.subscribe(actual); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); + result.subscribe(outer); return; - } catch (Throwable e) { - onError(Operators.onOperatorError(s, e, t, actual.currentContext())); + } + catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return; } } - inner.onNext(t); + i.onNext(t); } @Override public void onError(Throwable t) { - if (isCanceled()) { + if (done) { + Operators.onErrorDropped(t, currentContext()); return; } - if (once != 0) { - inner.onError(t); - } else { - actual.onSubscribe(Operators.emptySubscription()); - actual.onError(t); + throwable = t; + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.error(outer, t); } } @Override public void onComplete() { - if (isCanceled()) { + if (done) { return; } - if (once != 0) { - inner.onComplete(); - } else { - actual.onSubscribe(Operators.emptySubscription()); - actual.onComplete(); + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.complete(outer); } } - boolean isCanceled() { - return s == Operators.cancelledSubscription(); + @Override + public void request(long n) { + if (first != null && drainRegular() && n != Long.MAX_VALUE) { + n = Operators.addCap(n, -1); + if (n > 0) { + s.request(n); + } + } + else { + s.request(n); + } } - void cancel() { - s.cancel(); - s = Operators.cancelledSubscription(); + boolean drainRegular() { + if (WIP.getAndIncrement(this) != 0) { + return false; + } + + T f = first; + int m = 1; + boolean sent = false; + Subscription s = this.s; + CoreSubscriber a = inner; + + for (;;) { + if (f != null) { + first = null; + ReferenceCountUtil.safeRelease(f); + + if (s == Operators.cancelledSubscription()) { + Operators.onNextDropped(f, a.currentContext()); + return true; + } + + a.onNext(f); + f = null; + sent = true; + } + + if (s == Operators.cancelledSubscription()) { + return sent; + } + + if (done) { + Throwable t = throwable; + if (t != null) { + a.onError(t); + } + else { + a.onComplete(); + } + return sent; + } + + + m = WIP.addAndGet(this, -m); + + if (m == 0) { + return sent; + } + } } } - static final class SwitchTransformInner extends Flux implements Scannable, Subscription { - final SwitchTransformMain parent; + static final class SwitchTransformConditionalOperator extends Flux + implements Fuseable.ConditionalSubscriber, Subscription, Scannable { - volatile CoreSubscriber actual; + final Fuseable.ConditionalSubscriber outer; + final BiFunction, Publisher> transformer; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ACTUAL = - AtomicReferenceFieldUpdater.newUpdater( - SwitchTransformInner.class, CoreSubscriber.class, "actual"); + Subscription s; + Throwable throwable; - volatile V first; + volatile boolean done; + volatile T first; + volatile Fuseable.ConditionalSubscriber inner; @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater FIRST = - AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, Object.class, "first"); + static final AtomicReferenceFieldUpdater INNER = + AtomicReferenceFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, Fuseable.ConditionalSubscriber.class, "inner"); - volatile int once; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip"); + volatile int once; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformInner.class, "once"); + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once"); - SwitchTransformInner(SwitchTransformMain parent) { - this.parent = parent; + SwitchTransformConditionalOperator( + Fuseable.ConditionalSubscriber outer, + BiFunction, Publisher> transformer) { + this.outer = outer; + this.transformer = transformer; } - public void onNext(V t) { - CoreSubscriber a = actual; + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription(); + if (key == Attr.PREFETCH) return 1; - if (a != null) { - a.onNext(t); - } + return null; } - public void onError(Throwable t) { - CoreSubscriber a = actual; + @Override + public Context currentContext() { + CoreSubscriber actual = inner; - if (a != null) { - a.onError(t); + if (actual != null) { + return actual.currentContext(); } + + return outer.currentContext(); } - public void onComplete() { - CoreSubscriber a = actual; + @Override + public void cancel() { + if (s != Operators.cancelledSubscription()) { + Subscription s = this.s; + this.s = Operators.cancelledSubscription(); + ReferenceCountUtil.safeRelease(first); + + if (WIP.getAndIncrement(this) == 0) { + INNER.lazySet(this, null); + first = null; + } - if (a != null) { - a.onComplete(); + s.cancel(); } } @Override - public void subscribe(CoreSubscriber actual) { + @SuppressWarnings("unchecked") + public void subscribe(CoreSubscriber actual) { if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { - ACTUAL.lazySet(this, actual); + if (actual instanceof Fuseable.ConditionalSubscriber) { + INNER.lazySet(this, (Fuseable.ConditionalSubscriber) actual); + } + else { + INNER.lazySet(this, new ConditionalSubscriberAdapter<>(actual)); + } actual.onSubscribe(this); - } else { - actual.onError(new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + else { + Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + s.request(1); + } + } + + @Override + public void onNext(T t) { + if (done) { + Operators.onNextDropped(t, currentContext()); + return; + } + + CoreSubscriber i = inner; + + if (i == null) { + try { + first = t; + Publisher result = + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); + result.subscribe(outer); + return; + } + catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, currentContext())); + ReferenceCountUtil.safeRelease(t); + return; + } + } + + i.onNext(t); + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + Operators.onNextDropped(t, currentContext()); + return false; + } + + Fuseable.ConditionalSubscriber i = inner; + + if (i == null) { + try { + first = t; + Publisher result = + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); + result.subscribe(outer); + return true; + } + catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, currentContext())); + ReferenceCountUtil.safeRelease(t); + return false; + } + } + + return i.tryOnNext(t); + } + + @Override + public void onError(Throwable t) { + if (done) { + Operators.onErrorDropped(t, currentContext()); + return; + } + + throwable = t; + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.error(outer, t); + } + } + + @Override + public void onComplete() { + if (done) { + return; + } + + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.complete(outer); } } @Override public void request(long n) { - V f = first; + if (first != null && drainRegular() && n != Long.MAX_VALUE) { + if (--n > 0) { + s.request(n); + } + } + else { + s.request(n); + } + } + + boolean drainRegular() { + if (WIP.getAndIncrement(this) != 0) { + return false; + } - if (f != null && FIRST.compareAndSet(this, f, null)) { - actual.onNext(f); + T f = first; + int m = 1; + boolean sent = false; + Subscription s = this.s; + CoreSubscriber a = inner; + + for (;;) { + if (f != null) { + first = null; + ReferenceCountUtil.safeRelease(f); + + if (s == Operators.cancelledSubscription()) { + Operators.onNextDropped(f, a.currentContext()); + return true; + } + + a.onNext(f); + f = null; + sent = true; + } + + if (s == Operators.cancelledSubscription()) { + return sent; + } - long r = Operators.addCap(n, -1); - if (r > 0) { - parent.s.request(r); + if (done) { + Throwable t = throwable; + if (t != null) { + a.onError(t); + } + else { + a.onComplete(); + } + return sent; + } + + + m = WIP.addAndGet(this, -m); + + if (m == 0) { + return sent; } - } else { - parent.s.request(n); } } + } + + static final class ConditionalSubscriberAdapter implements Fuseable.ConditionalSubscriber { + + final CoreSubscriber delegate; + + ConditionalSubscriberAdapter(CoreSubscriber delegate) { + this.delegate = delegate; + } @Override - public void cancel() { - actual = null; - first = null; - parent.cancel(); + public Context currentContext() { + return delegate.currentContext(); } @Override - @Nullable - public Object scanUnsafe(Attr key) { - if (key == Attr.PARENT) return parent; - if (key == Attr.ACTUAL) return actual(); + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } - return null; + @Override + public void onNext(T t) { + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); } - public CoreSubscriber actual() { - return actual; + @Override + public void onComplete() { + delegate.onComplete(); + } + + @Override + public boolean tryOnNext(T t) { + delegate.onNext(t); + return true; } } } diff --git a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java index 9159641e3..09240e724 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java @@ -1,61 +1,327 @@ package io.rsocket.internal; import java.time.Duration; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; +import reactor.test.util.RaceTestUtils; +import reactor.util.context.Context; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; public class SwitchTransformFluxTest { @Test - public void backpressureTest() { + public void shouldBeAbleToCancelSubscription() throws InterruptedException { + for (int j = 0; j < 10; j++) { + ArrayList capturedElements = new ArrayList<>(); + ArrayList capturedCompletions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + TestPublisher publisher = TestPublisher.createCold(); + AtomicLong captureElement = new AtomicLong(0L); + AtomicBoolean captureCompletion = new AtomicBoolean(false); + AtomicLong requested = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(1); + Flux switchTransformed = publisher.flux() + .doOnRequest(requested::addAndGet) + .doOnCancel(latch::countDown) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux)); + + publisher.next(1L); + + switchTransformed.subscribe(captureElement::set, + __ -> { + }, + () -> captureCompletion.set(true), + s -> new Thread(() -> RaceTestUtils.race(publisher::complete, + () -> RaceTestUtils.race(s::cancel, + () -> s.request(1), + Schedulers.parallel()), + Schedulers.parallel())).start()); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(requested.get(), 1L); + capturedElements.add(captureElement.get()); + capturedCompletions.add(captureCompletion.get()); + } + + Assume.assumeThat(capturedElements, hasItem(equalTo(0L))); + Assume.assumeThat(capturedCompletions, hasItem(equalTo(false))); + } + } + + @Test + public void shouldRequestExpectedAmountOfElements() throws InterruptedException { TestPublisher publisher = TestPublisher.createCold(); + AtomicLong capture = new AtomicLong(); + AtomicLong requested = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(1); + Flux switchTransformed = publisher.flux() + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux)); + + publisher.next(1L); + + switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, + s -> { + for (int i = 0; i < 10000; i++) { + RaceTestUtils.race(() -> s.request(1), () -> s.request(1)); + } + RaceTestUtils.race(publisher::complete, publisher::complete); + }); + + latch.await(5, TimeUnit.SECONDS); - Flux switchTransformed = - publisher + Assert.assertEquals(capture.get(), 1L); + Assert.assertEquals(requested.get(), 20000L); + } + + @Test + public void shouldReturnCorrectContextOnEmptySource() { + Flux switchTransformed = Flux + .empty() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux + )) + .subscriberContext(Context.of("a", "c")) + .subscriberContext(Context.of("c", "d")); + + StepVerifier.create(switchTransformed, 0) + .expectSubscription() + .thenRequest(1) + .expectAccessibleContext() + .contains("a", "c") + .contains("c", "d") + .then() + .expectComplete() + .verify(); + } + + @Test + public void shouldNotFailOnIncorrectPublisherBehavior() { + TestPublisher publisher = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE); + Flux switchTransformed = publisher .flux() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux + .subscriberContext(Context.of("a", "b")) + )); + + StepVerifier.create(new Flux() { + @Override + public void subscribe(CoreSubscriber actual) { + switchTransformed.subscribe(actual); + publisher.next(1L); + } + }, 0) + .thenRequest(1) + .expectNext(1L) + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext(2L) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .expectError() + .verifyThenAssertThat() + .hasDroppedErrors(3) + .tookLessThan(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + } + +// @Test +// public void shouldNotFailOnIncorrePu + + @Test + public void shouldBeAbleToAccessUpstreamContext() { + TestPublisher publisher = TestPublisher.createCold(); + + Flux switchTransformed = publisher + .flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .subscriberContext(Context.of("a", "b")) + )) + .subscriberContext(Context.of("a", "c")) + .subscriberContext(Context.of("c", "d")); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext("2") - .then(publisher::complete) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .expectAccessibleContext() + .contains("a", "b") + .contains("c", "d") + .then() + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); } @Test - public void shouldErrorOnOverflowTest() { + public void shouldNotHangWhenOneElementUpstream() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = - publisher + Flux switchTransformed = publisher .flux() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .subscriberContext(Context.of("a", "b")) + )) + .subscriberContext(Context.of("a", "c")) + .subscriberContext(Context.of("c", "d")); + + publisher.next(1L); + publisher.complete(); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectNext("1") + .expectComplete() + .verify(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + } + + @Test + public void backpressureTest() { + TestPublisher publisher = TestPublisher.createCold(); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher.flux() + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .then(() -> publisher.next(2L)) - .expectError() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + + Assert.assertEquals(2L, requested.get()); + } + + @Test + public void backpressureConditionalTest() { + Flux publisher = Flux.range(0, 10000); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf))) + .filter(e -> false); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + Assert.assertEquals(2L, requested.get()); + } + + @Test + public void backpressureHiddenConditionalTest() { + Flux publisher = Flux.range(0, 10000); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .hide())) + .filter(e -> false); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + Assert.assertEquals(10001L, requested.get()); + } + + @Test + public void backpressureDrawbackOnConditionalInTransformTest() { + Flux publisher = Flux.range(0, 10000); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .filter(e -> false))); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + Assert.assertEquals(10001L, requested.get()); + } + + @Test + public void shouldErrorOnOverflowTest() { + TestPublisher publisher = TestPublisher.createCold(); + + Flux switchTransformed = publisher.flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); + + publisher.next(1L); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectNext("1") + .then(() -> publisher.next(2L)) + .expectError() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -63,44 +329,44 @@ public void shouldErrorOnOverflowTest() { @Test public void shouldPropagateonCompleteCorrectly() { - Flux switchTransformed = - Flux.empty() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); - - StepVerifier.create(switchTransformed).expectComplete().verify(Duration.ofSeconds(10)); + Flux switchTransformed = Flux.empty() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); + + StepVerifier.create(switchTransformed) + .expectComplete() + .verify(Duration.ofSeconds(10)); } @Test public void shouldPropagateErrorCorrectly() { - Flux switchTransformed = - Flux.error(new RuntimeException("hello")) - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + Flux switchTransformed = Flux.error(new RuntimeException("hello")) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); StepVerifier.create(switchTransformed) - .expectErrorMessage("hello") - .verify(Duration.ofSeconds(10)); + .expectErrorMessage("hello") + .verify(Duration.ofSeconds(10)); } @Test public void shouldBeAbleToBeCancelledProperly() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = - publisher - .flux() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + Flux switchTransformed = publisher.flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); - publisher.emit(1, 2, 3, 4, 5); + publisher.next(1); - StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10)); + StepVerifier.create(switchTransformed, 0) + .thenCancel() + .verify(Duration.ofSeconds(10)); publisher.assertCancelled(); publisher.assertWasRequested(); From 48459fa075f6efd44eed0d010d54519ba7f860cc Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 29 Oct 2018 23:48:16 +0200 Subject: [PATCH 05/10] fix for switchTransform Signed-off-by: Oleh Dokuka --- .../rsocket/internal/SwitchTransformFlux.java | 122 +++--- .../internal/SwitchTransformFluxTest.java | 369 ++++++++++-------- 2 files changed, 256 insertions(+), 235 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java index 6a36cf090..b4710e541 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -16,12 +16,11 @@ package io.rsocket.internal; +import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; - -import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -38,7 +37,7 @@ public final class SwitchTransformFlux extends Flux { final BiFunction, Publisher> transformer; public SwitchTransformFlux( - Publisher source, BiFunction, Publisher> transformer) { + Publisher source, BiFunction, Publisher> transformer) { this.source = Objects.requireNonNull(source, "source"); this.transformer = Objects.requireNonNull(transformer, "transformer"); } @@ -52,42 +51,48 @@ public int getPrefetch() { @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber actual) { if (actual instanceof Fuseable.ConditionalSubscriber) { - source.subscribe(new SwitchTransformConditionalOperator<>((Fuseable.ConditionalSubscriber) actual, transformer)); + source.subscribe( + new SwitchTransformConditionalOperator<>( + (Fuseable.ConditionalSubscriber) actual, transformer)); return; } source.subscribe(new SwitchTransformOperator<>(actual, transformer)); } static final class SwitchTransformOperator extends Flux - implements CoreSubscriber, Subscription, Scannable { + implements CoreSubscriber, Subscription, Scannable { final CoreSubscriber outer; final BiFunction, Publisher> transformer; Subscription s; - Throwable throwable; + Throwable throwable; volatile boolean done; - volatile T first; + volatile T first; volatile CoreSubscriber inner; + @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater INNER = - AtomicReferenceFieldUpdater.newUpdater(SwitchTransformOperator.class, CoreSubscriber.class, "inner"); + AtomicReferenceFieldUpdater.newUpdater( + SwitchTransformOperator.class, CoreSubscriber.class, "inner"); volatile int wip; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip"); volatile int once; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once"); SwitchTransformOperator( - CoreSubscriber outer, - BiFunction, Publisher> transformer) { + CoreSubscriber outer, + BiFunction, Publisher> transformer) { this.outer = outer; this.transformer = transformer; } @@ -133,9 +138,9 @@ public void subscribe(CoreSubscriber actual) { if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { INNER.lazySet(this, actual); actual.onSubscribe(this); - } - else { - Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } else { + Operators.error( + actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); } } @@ -160,12 +165,11 @@ public void onNext(T t) { try { first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, this), "The transformer returned a null value"); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); result.subscribe(outer); return; - } - catch (Throwable e) { + } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return; @@ -190,8 +194,7 @@ public void onError(Throwable t) { if (first == null) { drainRegular(); } - } - else { + } else { Operators.error(outer, t); } } @@ -209,8 +212,7 @@ public void onComplete() { if (first == null) { drainRegular(); } - } - else { + } else { Operators.complete(outer); } } @@ -222,8 +224,7 @@ public void request(long n) { if (n > 0) { s.request(n); } - } - else { + } else { s.request(n); } } @@ -239,7 +240,7 @@ boolean drainRegular() { Subscription s = this.s; CoreSubscriber a = inner; - for (;;) { + for (; ; ) { if (f != null) { first = null; ReferenceCountUtil.safeRelease(f); @@ -262,14 +263,12 @@ boolean drainRegular() { Throwable t = throwable; if (t != null) { a.onError(t); - } - else { + } else { a.onComplete(); } return sent; } - m = WIP.addAndGet(this, -m); if (m == 0) { @@ -279,37 +278,44 @@ boolean drainRegular() { } } - static final class SwitchTransformConditionalOperator extends Flux - implements Fuseable.ConditionalSubscriber, Subscription, Scannable { + implements Fuseable.ConditionalSubscriber, Subscription, Scannable { final Fuseable.ConditionalSubscriber outer; final BiFunction, Publisher> transformer; Subscription s; - Throwable throwable; + Throwable throwable; volatile boolean done; - volatile T first; + volatile T first; volatile Fuseable.ConditionalSubscriber inner; + @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater INNER = - AtomicReferenceFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, Fuseable.ConditionalSubscriber.class, "inner"); + static final AtomicReferenceFieldUpdater< + SwitchTransformConditionalOperator, Fuseable.ConditionalSubscriber> + INNER = + AtomicReferenceFieldUpdater.newUpdater( + SwitchTransformConditionalOperator.class, + Fuseable.ConditionalSubscriber.class, + "inner"); volatile int wip; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip"); volatile int once; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once"); SwitchTransformConditionalOperator( - Fuseable.ConditionalSubscriber outer, - BiFunction, Publisher> transformer) { + Fuseable.ConditionalSubscriber outer, + BiFunction, Publisher> transformer) { this.outer = outer; this.transformer = transformer; } @@ -356,14 +362,13 @@ public void subscribe(CoreSubscriber actual) { if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { if (actual instanceof Fuseable.ConditionalSubscriber) { INNER.lazySet(this, (Fuseable.ConditionalSubscriber) actual); - } - else { + } else { INNER.lazySet(this, new ConditionalSubscriberAdapter<>(actual)); } actual.onSubscribe(this); - } - else { - Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } else { + Operators.error( + actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); } } @@ -388,12 +393,11 @@ public void onNext(T t) { try { first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, this), "The transformer returned a null value"); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); result.subscribe(outer); return; - } - catch (Throwable e) { + } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return; @@ -416,12 +420,11 @@ public boolean tryOnNext(T t) { try { first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, this), "The transformer returned a null value"); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); result.subscribe(outer); return true; - } - catch (Throwable e) { + } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return false; @@ -446,8 +449,7 @@ public void onError(Throwable t) { if (first == null) { drainRegular(); } - } - else { + } else { Operators.error(outer, t); } } @@ -465,8 +467,7 @@ public void onComplete() { if (first == null) { drainRegular(); } - } - else { + } else { Operators.complete(outer); } } @@ -477,8 +478,7 @@ public void request(long n) { if (--n > 0) { s.request(n); } - } - else { + } else { s.request(n); } } @@ -494,7 +494,7 @@ boolean drainRegular() { Subscription s = this.s; CoreSubscriber a = inner; - for (;;) { + for (; ; ) { if (f != null) { first = null; ReferenceCountUtil.safeRelease(f); @@ -517,14 +517,12 @@ boolean drainRegular() { Throwable t = throwable; if (t != null) { a.onError(t); - } - else { + } else { a.onComplete(); } return sent; } - m = WIP.addAndGet(this, -m); if (m == 0) { diff --git a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java index 09240e724..e4b897409 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java @@ -1,12 +1,14 @@ package io.rsocket.internal; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; + import java.time.Duration; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -18,9 +20,6 @@ import reactor.test.util.RaceTestUtils; import reactor.util.context.Context; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; - public class SwitchTransformFluxTest { @Test @@ -34,24 +33,30 @@ public void shouldBeAbleToCancelSubscription() throws InterruptedException { AtomicBoolean captureCompletion = new AtomicBoolean(false); AtomicLong requested = new AtomicLong(); CountDownLatch latch = new CountDownLatch(1); - Flux switchTransformed = publisher.flux() - .doOnRequest(requested::addAndGet) - .doOnCancel(latch::countDown) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux)); + Flux switchTransformed = + publisher + .flux() + .doOnRequest(requested::addAndGet) + .doOnCancel(latch::countDown) + .transform( + flux -> new SwitchTransformFlux<>(flux, (first, innerFlux) -> innerFlux)); publisher.next(1L); - switchTransformed.subscribe(captureElement::set, - __ -> { - }, - () -> captureCompletion.set(true), - s -> new Thread(() -> RaceTestUtils.race(publisher::complete, - () -> RaceTestUtils.race(s::cancel, - () -> s.request(1), - Schedulers.parallel()), - Schedulers.parallel())).start()); + switchTransformed.subscribe( + captureElement::set, + __ -> {}, + () -> captureCompletion.set(true), + s -> + new Thread( + () -> + RaceTestUtils.race( + publisher::complete, + () -> + RaceTestUtils.race( + s::cancel, () -> s.request(1), Schedulers.parallel()), + Schedulers.parallel())) + .start()); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertEquals(requested.get(), 1L); @@ -70,21 +75,24 @@ public void shouldRequestExpectedAmountOfElements() throws InterruptedException AtomicLong capture = new AtomicLong(); AtomicLong requested = new AtomicLong(); CountDownLatch latch = new CountDownLatch(1); - Flux switchTransformed = publisher.flux() - .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux)); + Flux switchTransformed = + publisher + .flux() + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>(flux, (first, innerFlux) -> innerFlux)); publisher.next(1L); - switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, - s -> { - for (int i = 0; i < 10000; i++) { - RaceTestUtils.race(() -> s.request(1), () -> s.request(1)); - } - RaceTestUtils.race(publisher::complete, publisher::complete); - }); + switchTransformed.subscribe( + capture::set, + __ -> {}, + latch::countDown, + s -> { + for (int i = 0; i < 10000; i++) { + RaceTestUtils.race(() -> s.request(1), () -> s.request(1)); + } + RaceTestUtils.race(publisher::complete, publisher::complete); + }); latch.await(5, TimeUnit.SECONDS); @@ -94,94 +102,97 @@ public void shouldRequestExpectedAmountOfElements() throws InterruptedException @Test public void shouldReturnCorrectContextOnEmptySource() { - Flux switchTransformed = Flux - .empty() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux - )) + Flux switchTransformed = + Flux.empty() + .transform(flux -> new SwitchTransformFlux<>(flux, (first, innerFlux) -> innerFlux)) .subscriberContext(Context.of("a", "c")) .subscriberContext(Context.of("c", "d")); StepVerifier.create(switchTransformed, 0) - .expectSubscription() - .thenRequest(1) - .expectAccessibleContext() - .contains("a", "c") - .contains("c", "d") - .then() - .expectComplete() - .verify(); + .expectSubscription() + .thenRequest(1) + .expectAccessibleContext() + .contains("a", "c") + .contains("c", "d") + .then() + .expectComplete() + .verify(); } @Test public void shouldNotFailOnIncorrectPublisherBehavior() { - TestPublisher publisher = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE); - Flux switchTransformed = publisher + TestPublisher publisher = + TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE); + Flux switchTransformed = + publisher .flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux - .subscriberContext(Context.of("a", "b")) - )); - - StepVerifier.create(new Flux() { - @Override - public void subscribe(CoreSubscriber actual) { - switchTransformed.subscribe(actual); - publisher.next(1L); - } - }, 0) - .thenRequest(1) - .expectNext(1L) - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext(2L) - .then(() -> publisher.error(new RuntimeException())) - .then(() -> publisher.error(new RuntimeException())) - .then(() -> publisher.error(new RuntimeException())) - .then(() -> publisher.error(new RuntimeException())) - .expectError() - .verifyThenAssertThat() - .hasDroppedErrors(3) - .tookLessThan(Duration.ofSeconds(10)); + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.subscriberContext(Context.of("a", "b")))); + + StepVerifier.create( + new Flux() { + @Override + public void subscribe(CoreSubscriber actual) { + switchTransformed.subscribe(actual); + publisher.next(1L); + } + }, + 0) + .thenRequest(1) + .expectNext(1L) + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext(2L) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .expectError() + .verifyThenAssertThat() + .hasDroppedErrors(3) + .tookLessThan(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); } -// @Test -// public void shouldNotFailOnIncorrePu + // @Test + // public void shouldNotFailOnIncorrePu @Test public void shouldBeAbleToAccessUpstreamContext() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .subscriberContext(Context.of("a", "b")) - )) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> + innerFlux.map(String::valueOf).subscriberContext(Context.of("a", "b")))) .subscriberContext(Context.of("a", "c")) .subscriberContext(Context.of("c", "d")); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext("2") - .expectAccessibleContext() - .contains("a", "b") - .contains("c", "d") - .then() - .then(publisher::complete) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .expectAccessibleContext() + .contains("a", "b") + .contains("c", "d") + .then() + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -191,13 +202,15 @@ public void shouldBeAbleToAccessUpstreamContext() { public void shouldNotHangWhenOneElementUpstream() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .subscriberContext(Context.of("a", "b")) - )) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> + innerFlux.map(String::valueOf).subscriberContext(Context.of("a", "b")))) .subscriberContext(Context.of("a", "c")) .subscriberContext(Context.of("c", "d")); @@ -205,10 +218,10 @@ public void shouldNotHangWhenOneElementUpstream() { publisher.complete(); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -219,23 +232,26 @@ public void backpressureTest() { TestPublisher publisher = TestPublisher.createCold(); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher.flux() - .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf))); + Flux switchTransformed = + publisher + .flux() + .doOnRequest(requested::addAndGet) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext("2") - .then(publisher::complete) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -248,17 +264,19 @@ public void backpressureConditionalTest() { Flux publisher = Flux.range(0, 10000); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf))) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))) .filter(e -> false); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); Assert.assertEquals(2L, requested.get()); } @@ -268,18 +286,19 @@ public void backpressureHiddenConditionalTest() { Flux publisher = Flux.range(0, 10000); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .hide())) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf).hide())) .filter(e -> false); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); Assert.assertEquals(10001L, requested.get()); } @@ -289,17 +308,19 @@ public void backpressureDrawbackOnConditionalInTransformTest() { Flux publisher = Flux.range(0, 10000); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .filter(e -> false))); + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf).filter(e -> false))); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); Assert.assertEquals(10001L, requested.get()); } @@ -308,20 +329,22 @@ public void backpressureDrawbackOnConditionalInTransformTest() { public void shouldErrorOnOverflowTest() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher.flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); + Flux switchTransformed = + publisher + .flux() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .then(() -> publisher.next(2L)) - .expectError() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .then(() -> publisher.next(2L)) + .expectError() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -329,44 +352,44 @@ public void shouldErrorOnOverflowTest() { @Test public void shouldPropagateonCompleteCorrectly() { - Flux switchTransformed = Flux.empty() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); - - StepVerifier.create(switchTransformed) - .expectComplete() - .verify(Duration.ofSeconds(10)); + Flux switchTransformed = + Flux.empty() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + + StepVerifier.create(switchTransformed).expectComplete().verify(Duration.ofSeconds(10)); } @Test public void shouldPropagateErrorCorrectly() { - Flux switchTransformed = Flux.error(new RuntimeException("hello")) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); + Flux switchTransformed = + Flux.error(new RuntimeException("hello")) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); StepVerifier.create(switchTransformed) - .expectErrorMessage("hello") - .verify(Duration.ofSeconds(10)); + .expectErrorMessage("hello") + .verify(Duration.ofSeconds(10)); } @Test public void shouldBeAbleToBeCancelledProperly() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher.flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); + Flux switchTransformed = + publisher + .flux() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1); - StepVerifier.create(switchTransformed, 0) - .thenCancel() - .verify(Duration.ofSeconds(10)); + StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10)); publisher.assertCancelled(); publisher.assertWasRequested(); From 553dd253eeba8815ac19e712fbdabe00218563cb Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 30 Oct 2018 00:35:22 +0200 Subject: [PATCH 06/10] enable artifacts overriding Signed-off-by: Oleh Dokuka --- bintray.gradle | 1 + .../io/rsocket/RSocketClientTerminationTest.java | 16 ++++++---------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/bintray.gradle b/bintray.gradle index fd3207af7..6fe0db84b 100644 --- a/bintray.gradle +++ b/bintray.gradle @@ -25,6 +25,7 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey') && publications = ['maven'] publish = true + override = true pkg { repo = 'RSocket' diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java index a8e60d02b..ae3bfc489 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java @@ -2,6 +2,10 @@ import io.rsocket.RSocketClientTest.ClientSocketRule; import io.rsocket.util.EmptyPayload; +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.Arrays; +import java.util.function.Function; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -11,16 +15,10 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.nio.channels.ClosedChannelException; -import java.time.Duration; -import java.util.Arrays; -import java.util.function.Function; - @RunWith(Parameterized.class) public class RSocketClientTerminationTest { - @Rule - public final ClientSocketRule rule = new ClientSocketRule(); + @Rule public final ClientSocketRule rule = new ClientSocketRule(); private Function> interaction; public RSocketClientTerminationTest(Function> interaction) { @@ -31,9 +29,7 @@ public RSocketClientTerminationTest(Function> in public void testCurrentStreamIsTerminatedOnConnectionClose() { RSocketClient rSocket = rule.socket; - Mono.delay(Duration.ofSeconds(1)) - .doOnNext(v -> rule.connection.dispose()) - .subscribe(); + Mono.delay(Duration.ofSeconds(1)).doOnNext(v -> rule.connection.dispose()).subscribe(); StepVerifier.create(interaction.apply(rSocket)) .expectError(ClosedChannelException.class) From 5e00f4f7c4a561890bd12c633735be05b11aa199 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 30 Oct 2018 01:09:14 +0200 Subject: [PATCH 07/10] prepare to next development iteration Signed-off-by: Oleh Dokuka --- build.gradle | 2 +- gradle.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 7626acef0..35c4a408f 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ plugins { subprojects { apply plugin: 'io.spring.dependency-management' - ext['reactor-bom.version'] = 'Californium-SR2' + ext['reactor-bom.version'] = 'Californium-BUILD-SNAPSHOT' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' ext['netty.version'] = '4.1.29.Final' diff --git a/gradle.properties b/gradle.properties index 6f0fa213d..f99c8c544 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.11.10.BUILD-SNAPSHOT \ No newline at end of file +version=0.11.12.BUILD-SNAPSHOT \ No newline at end of file From 0fe099547cc882b7ec1cf6e51e2472c56b334347 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 30 Oct 2018 23:12:54 +0200 Subject: [PATCH 08/10] fix release first Signed-off-by: Oleh Dokuka --- .../main/java/io/rsocket/internal/SwitchTransformFlux.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java index b4710e541..55866ed92 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -243,7 +243,6 @@ boolean drainRegular() { for (; ; ) { if (f != null) { first = null; - ReferenceCountUtil.safeRelease(f); if (s == Operators.cancelledSubscription()) { Operators.onNextDropped(f, a.currentContext()); @@ -251,6 +250,7 @@ boolean drainRegular() { } a.onNext(f); + ReferenceCountUtil.safeRelease(f); f = null; sent = true; } @@ -497,7 +497,6 @@ boolean drainRegular() { for (; ; ) { if (f != null) { first = null; - ReferenceCountUtil.safeRelease(f); if (s == Operators.cancelledSubscription()) { Operators.onNextDropped(f, a.currentContext()); @@ -505,6 +504,7 @@ boolean drainRegular() { } a.onNext(f); + ReferenceCountUtil.safeRelease(f); f = null; sent = true; } From aa3e8ed7e4df218488c74129b3633e751b335f77 Mon Sep 17 00:00:00 2001 From: Ryland Degnan Date: Tue, 30 Oct 2018 15:57:42 -0700 Subject: [PATCH 09/10] Remove extraneous setup.release() in PingHandler --- rsocket-test/src/main/java/io/rsocket/test/PingHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/rsocket-test/src/main/java/io/rsocket/test/PingHandler.java b/rsocket-test/src/main/java/io/rsocket/test/PingHandler.java index 9ef1f394b..2f54ddb50 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/PingHandler.java +++ b/rsocket-test/src/main/java/io/rsocket/test/PingHandler.java @@ -41,7 +41,6 @@ public PingHandler(byte[] data) { @Override public Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket) { - setup.release(); return Mono.just( new AbstractRSocket() { @Override From 1b32df01a9eb2bbc30a26e94003f8d3d6552c82e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 31 Oct 2018 15:54:16 +0200 Subject: [PATCH 10/10] removed safeRelease Signed-off-by: Oleh Dokuka --- .../rsocket/internal/SwitchTransformFlux.java | 47 ++++++++++--------- .../internal/SwitchTransformFluxTest.java | 47 +++++++++++++++++++ 2 files changed, 73 insertions(+), 21 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java index 55866ed92..b8ec5b863 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -16,7 +16,6 @@ package io.rsocket.internal; -import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -122,11 +121,15 @@ public void cancel() { if (s != Operators.cancelledSubscription()) { Subscription s = this.s; this.s = Operators.cancelledSubscription(); - ReferenceCountUtil.safeRelease(first); if (WIP.getAndIncrement(this) == 0) { INNER.lazySet(this, null); - first = null; + + T f = first; + if (f != null) { + first = null; + Operators.onDiscard(f, currentContext()); + } } s.cancel(); @@ -171,7 +174,6 @@ public void onNext(T t) { return; } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); - ReferenceCountUtil.safeRelease(t); return; } } @@ -219,13 +221,14 @@ public void onComplete() { @Override public void request(long n) { - if (first != null && drainRegular() && n != Long.MAX_VALUE) { - n = Operators.addCap(n, -1); - if (n > 0) { + if (Operators.validate(n)) { + if (first != null && drainRegular() && n != Long.MAX_VALUE) { + if (--n > 0) { + s.request(n); + } + } else { s.request(n); } - } else { - s.request(n); } } @@ -245,12 +248,11 @@ boolean drainRegular() { first = null; if (s == Operators.cancelledSubscription()) { - Operators.onNextDropped(f, a.currentContext()); + Operators.onDiscard(f, a.currentContext()); return true; } a.onNext(f); - ReferenceCountUtil.safeRelease(f); f = null; sent = true; } @@ -345,11 +347,15 @@ public void cancel() { if (s != Operators.cancelledSubscription()) { Subscription s = this.s; this.s = Operators.cancelledSubscription(); - ReferenceCountUtil.safeRelease(first); if (WIP.getAndIncrement(this) == 0) { INNER.lazySet(this, null); - first = null; + + T f = first; + if (f != null) { + first = null; + Operators.onDiscard(f, currentContext()); + } } s.cancel(); @@ -399,7 +405,6 @@ public void onNext(T t) { return; } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); - ReferenceCountUtil.safeRelease(t); return; } } @@ -426,7 +431,6 @@ public boolean tryOnNext(T t) { return true; } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); - ReferenceCountUtil.safeRelease(t); return false; } } @@ -474,12 +478,14 @@ public void onComplete() { @Override public void request(long n) { - if (first != null && drainRegular() && n != Long.MAX_VALUE) { - if (--n > 0) { + if (Operators.validate(n)) { + if (first != null && drainRegular() && n != Long.MAX_VALUE) { + if (--n > 0) { + s.request(n); + } + } else { s.request(n); } - } else { - s.request(n); } } @@ -499,12 +505,11 @@ boolean drainRegular() { first = null; if (s == Operators.cancelledSubscription()) { - Operators.onNextDropped(f, a.currentContext()); + Operators.onDiscard(f, a.currentContext()); return true; } a.onNext(f); - ReferenceCountUtil.safeRelease(f); f = null; sent = true; } diff --git a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java index e4b897409..2297d6bfa 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java @@ -394,4 +394,51 @@ public void shouldBeAbleToBeCancelledProperly() { publisher.assertCancelled(); publisher.assertWasRequested(); } + + @Test + public void shouldBeAbleToCatchDiscardedElement() { + TestPublisher publisher = TestPublisher.createCold(); + Integer[] discarded = new Integer[1]; + Flux switchTransformed = + publisher + .flux() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))) + .doOnDiscard(Integer.class, e -> discarded[0] = e); + + publisher.next(1); + + StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10)); + + publisher.assertCancelled(); + publisher.assertWasRequested(); + + Assert.assertArrayEquals(new Integer[] {1}, discarded); + } + + @Test + public void shouldBeAbleToCatchDiscardedElementInCaseOfConditional() { + TestPublisher publisher = TestPublisher.createCold(); + Integer[] discarded = new Integer[1]; + Flux switchTransformed = + publisher + .flux() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))) + .filter(t -> true) + .doOnDiscard(Integer.class, e -> discarded[0] = e); + + publisher.next(1); + + StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10)); + + publisher.assertCancelled(); + publisher.assertWasRequested(); + + Assert.assertArrayEquals(new Integer[] {1}, discarded); + } }