From 9172b4751752b7c56d4a839ea53b910586b5346a Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 29 Oct 2018 01:16:29 +0200 Subject: [PATCH 1/5] fix issue when current and future streams are not terminated after connection dispose (#541) Signed-off-by: Maksym Ostroverkhov Signed-off-by: Kyle Bahr --- .../main/java/io/rsocket/RSocketClient.java | 57 ++++++++++------- .../rsocket/RSocketClientTerminationTest.java | 64 +++++++++++++++++++ .../java/io/rsocket/RSocketClientTest.java | 2 +- 3 files changed, 98 insertions(+), 25 deletions(-) create mode 100644 rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 44460a938..8f0941285 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -21,8 +21,11 @@ import io.rsocket.framing.FrameType; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; + +import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import org.jctools.maps.NonBlockingHashMapLong; @@ -72,7 +75,7 @@ class RSocketClient implements RSocket { // DO NOT Change the order here. The Send processor must be subscribed to before receiving this.sendProcessor = new UnboundedProcessor<>(); - connection.onClose().doFinally(signalType -> cleanup()).subscribe(null, errorConsumer); + connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer); connection .send(sendProcessor) @@ -92,7 +95,9 @@ class RSocketClient implements RSocket { keepAlive -> { String message = String.format("No keep-alive acks for %d ms", keepAlive.getTimeoutMillis()); - errorConsumer.accept(new ConnectionErrorException(message)); + ConnectionErrorException err = new ConnectionErrorException(message); + lifecycle.terminate(err); + errorConsumer.accept(err); connection.dispose(); }); keepAliveHandler.send().subscribe(sendProcessor::onNext); @@ -157,12 +162,7 @@ public Flux requestChannel(Publisher payloads) { @Override public Mono metadataPush(Payload payload) { - return Mono.fromRunnable( - () -> { - final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); - payload.release(); - sendProcessor.onNext(requestFrame); - }); + return handleMetadataPush(payload); } @Override @@ -187,7 +187,7 @@ public Mono onClose() { private Mono handleFireAndForget(Payload payload) { return lifecycle - .started() + .active() .then( Mono.fromRunnable( () -> { @@ -201,7 +201,7 @@ private Mono handleFireAndForget(Payload payload) { private Flux handleRequestStream(final Payload payload) { return lifecycle - .started() + .active() .thenMany( Flux.defer( () -> { @@ -247,7 +247,7 @@ private Flux handleRequestStream(final Payload payload) { private Mono handleRequestResponse(final Payload payload) { return lifecycle - .started() + .active() .then( Mono.defer( () -> { @@ -274,7 +274,7 @@ private Mono handleRequestResponse(final Payload payload) { private Flux handleChannel(Flux request) { return lifecycle - .started() + .active() .thenMany( Flux.defer( () -> { @@ -365,11 +365,25 @@ private Flux handleChannel(Flux request) { })); } + private Mono handleMetadataPush(Payload payload) { + return lifecycle + .active() + .then(Mono.fromRunnable( + () -> { + final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); + payload.release(); + sendProcessor.onNext(requestFrame); + })); + } + private boolean contains(int streamId) { return receivers.containsKey(streamId); } - protected void cleanup() { + protected void terminate() { + + lifecycle.terminate(new ClosedChannelException()); + if (keepAliveHandler != null) { keepAliveHandler.dispose(); } @@ -397,13 +411,8 @@ private synchronized void cleanUpLimitableRequestPublisher( } private synchronized void cleanUpSubscriber(UnicastProcessor subscriber) { - Throwable err = lifecycle.terminationError(); try { - if (err != null) { - subscriber.onError(err); - } else { - subscriber.cancel(); - } + subscriber.onError(lifecycle.terminationError()); } catch (Throwable t) { errorConsumer.accept(t); } @@ -519,12 +528,12 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame private static class Lifecycle { - private volatile Throwable terminationError; + private final AtomicReference terminationError = new AtomicReference<>(); - public Mono started() { + public Mono active() { return Mono.create( sink -> { - Throwable err = terminationError; + Throwable err = terminationError(); if (err == null) { sink.success(); } else { @@ -534,11 +543,11 @@ public Mono started() { } public void terminate(Throwable err) { - this.terminationError = err; + this.terminationError.compareAndSet(null, err); } public Throwable terminationError() { - return terminationError; + return terminationError.get(); } } } diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java new file mode 100644 index 000000000..a8e60d02b --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java @@ -0,0 +1,64 @@ +package io.rsocket; + +import io.rsocket.RSocketClientTest.ClientSocketRule; +import io.rsocket.util.EmptyPayload; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.Arrays; +import java.util.function.Function; + +@RunWith(Parameterized.class) +public class RSocketClientTerminationTest { + + @Rule + public final ClientSocketRule rule = new ClientSocketRule(); + private Function> interaction; + + public RSocketClientTerminationTest(Function> interaction) { + this.interaction = interaction; + } + + @Test + public void testCurrentStreamIsTerminatedOnConnectionClose() { + RSocketClient rSocket = rule.socket; + + Mono.delay(Duration.ofSeconds(1)) + .doOnNext(v -> rule.connection.dispose()) + .subscribe(); + + StepVerifier.create(interaction.apply(rSocket)) + .expectError(ClosedChannelException.class) + .verify(Duration.ofSeconds(5)); + } + + @Test + public void testSubsequentStreamIsTerminatedAfterConnectionClose() { + RSocketClient rSocket = rule.socket; + + rule.connection.dispose(); + StepVerifier.create(interaction.apply(rSocket)) + .expectError(ClosedChannelException.class) + .verify(Duration.ofSeconds(5)); + } + + @Parameterized.Parameters + public static Iterable>> rsocketInteractions() { + EmptyPayload payload = EmptyPayload.INSTANCE; + Publisher payloadStream = Flux.just(payload); + + Function> resp = rSocket -> rSocket.requestResponse(payload); + Function> stream = rSocket -> rSocket.requestStream(payload); + Function> channel = rSocket -> rSocket.requestChannel(payloadStream); + + return Arrays.asList(resp, stream, channel); + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 4fcb65751..a153e2f63 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -215,7 +215,7 @@ protected RSocketClient newRSocket() { throwable -> errors.add(throwable), StreamIdSupplier.clientSupplier(), Duration.ofMillis(100), - Duration.ofMillis(100), + Duration.ofMillis(10_000), 4); } From efc2e88f5ea6c0ed85d3890ac3b0bafdd512ca3f Mon Sep 17 00:00:00 2001 From: Kyle Bahr Date: Wed, 31 Oct 2018 12:37:51 -0700 Subject: [PATCH 2/5] change reactor snapshot to SR1 Signed-off-by: Kyle Bahr --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 381ed7236..f8b537d6b 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ plugins { subprojects { apply plugin: 'io.spring.dependency-management' - ext['reactor-bom.version'] = 'Californium-BUILD-SNAPSHOT' + ext['reactor-bom.version'] = 'Californium-SR1' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' ext['netty.version'] = '4.1.29.Final' From 95f026a7e0930405025c409907a148debf9a9822 Mon Sep 17 00:00:00 2001 From: Kyle Bahr Date: Wed, 31 Oct 2018 19:49:30 -0700 Subject: [PATCH 3/5] uprev to 11.13 Signed-off-by: Kyle Bahr --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 6f0fa213d..9008d313b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.11.10.BUILD-SNAPSHOT \ No newline at end of file +version=0.11.13.BUILD-SNAPSHOT \ No newline at end of file From 655094e2693c2a49c466965c2383f5703d0ab1ec Mon Sep 17 00:00:00 2001 From: Kyle Bahr Date: Fri, 2 Nov 2018 12:31:58 -0700 Subject: [PATCH 4/5] create final artifact version and minor gradle fixes from master Signed-off-by: Kyle Bahr --- bintray.gradle | 1 + build.gradle | 4 ++-- gradle.properties | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bintray.gradle b/bintray.gradle index fd3207af7..6fe0db84b 100644 --- a/bintray.gradle +++ b/bintray.gradle @@ -25,6 +25,7 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey') && publications = ['maven'] publish = true + override = true pkg { repo = 'RSocket' diff --git a/build.gradle b/build.gradle index f8b537d6b..e841a03f0 100644 --- a/build.gradle +++ b/build.gradle @@ -18,8 +18,8 @@ plugins { id 'com.gradle.build-scan' version '1.16' id 'com.github.sherter.google-java-format' version '0.7.1' - id 'com.jfrog.artifactory' version '4.7.3' - id 'com.jfrog.bintray' version '1.8.4' + id 'com.jfrog.artifactory' version '4.7.3' apply false + id 'com.jfrog.bintray' version '1.8.4' apply false id 'me.champeau.gradle.jmh' version '0.4.7' apply false id 'io.spring.dependency-management' version '1.0.6.RELEASE' apply false id 'io.morethan.jmhreport' version '0.9.0' apply false diff --git a/gradle.properties b/gradle.properties index 9008d313b..a40e6afdf 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.11.13.BUILD-SNAPSHOT \ No newline at end of file +version=0.11.13 \ No newline at end of file From b9117e8c5c11c4c3e7276ce3585a5bd369122ab6 Mon Sep 17 00:00:00 2001 From: Kyle Bahr Date: Fri, 2 Nov 2018 12:53:33 -0700 Subject: [PATCH 5/5] set back to snapshot for PR Signed-off-by: Kyle Bahr --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index a40e6afdf..9008d313b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.11.13 \ No newline at end of file +version=0.11.13.BUILD-SNAPSHOT \ No newline at end of file