From 9a504a882a6d43bb26298660c5b0b7a2f71caeb2 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Mon, 28 Mar 2022 18:16:07 +0300 Subject: [PATCH] fixes `block()` in MetadataPushRequesterMono/FnfRequesterMono (#1044) --- .../core/FireAndForgetRequesterMono.java | 5 ++ .../core/MetadataPushRequesterMono.java | 12 +++- .../io/rsocket/core/RSocketRequesterTest.java | 63 ++++++++++++++++++- 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java b/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java index eceb0976c..a5d527f5c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java @@ -185,6 +185,11 @@ public Void block(Duration m) { return block(); } + /** + * This method is deliberately non-blocking regardless it is named as `.block`. The main intent to + * keep this method along with the {@link #subscribe()} is to eliminate redundancy which comes + * with a default block method implementation. + */ @Override @Nullable public Void block() { diff --git a/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java b/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java index 226e9a0af..e2512e995 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java @@ -120,6 +120,11 @@ public Void block(Duration m) { return block(); } + /** + * This method is deliberately non-blocking regardless it is named as `.block`. The main intent to + * keep this method along with the {@link #subscribe()} is to eliminate redundancy which comes + * with a default block method implementation. + */ @Override @Nullable public Void block() { @@ -133,15 +138,16 @@ public Void block() { try { final boolean hasMetadata = p.hasMetadata(); metadata = p.metadata(); - if (hasMetadata) { + if (!hasMetadata) { lazyTerminate(STATE, this); p.release(); - throw new IllegalArgumentException("Metadata push does not support metadata field"); + throw new IllegalArgumentException("Metadata push should have metadata field present"); } if (!isValidMetadata(this.maxFrameLength, metadata)) { lazyTerminate(STATE, this); p.release(); - throw new IllegalArgumentException("Too Big Payload size"); + throw new IllegalArgumentException( + String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)); } } catch (IllegalReferenceCountException e) { lazyTerminate(STATE, this); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 327172255..183785d2f 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -33,6 +33,7 @@ import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; import static io.rsocket.frame.FrameType.REQUEST_STREAM; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -81,7 +82,6 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; -import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -169,7 +169,7 @@ protected void hookOnSubscribe(Subscription subscription) { public void testHandleSetupException() { rule.connection.addToReceivedBuffer( ErrorFrameCodec.encode(rule.alloc(), 0, new RejectedSetupException("boom"))); - Assertions.assertThatThrownBy(() -> rule.socket.onClose().block()) + assertThatThrownBy(() -> rule.socket.onClose().block()) .isInstanceOf(RejectedSetupException.class); rule.assertHasNoLeaks(); } @@ -373,6 +373,65 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen }); } + @ParameterizedTest + @ValueSource(ints = {128, 256, FRAME_LENGTH_MASK}) + public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmentation1( + int maxFrameLength) { + rule.setMaxFrameLength(maxFrameLength); + prepareCalls() + .forEach( + generator -> { + byte[] metadata = new byte[maxFrameLength]; + byte[] data = new byte[maxFrameLength]; + ThreadLocalRandom.current().nextBytes(metadata); + ThreadLocalRandom.current().nextBytes(data); + + assertThatThrownBy( + () -> { + final Publisher source = + generator.apply(rule.socket, DefaultPayload.create(data, metadata)); + + if (source instanceof Mono) { + ((Mono) source).block(); + } else { + ((Flux) source).blockLast(); + } + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(String.format(INVALID_PAYLOAD_ERROR_MESSAGE, maxFrameLength)); + + rule.assertHasNoLeaks(); + }); + } + + @Test + public void shouldRejectCallOfNoMetadataPayload() { + final ByteBuf data = rule.allocator.buffer(10); + final Payload payload = ByteBufPayload.create(data); + StepVerifier.create(rule.socket.metadataPush(payload)) + .expectSubscription() + .expectErrorSatisfies( + t -> + assertThat(t) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Metadata push should have metadata field present")) + .verify(); + PayloadAssert.assertThat(payload).isReleased(); + rule.assertHasNoLeaks(); + } + + @Test + public void shouldRejectCallOfNoMetadataPayloadBlocking() { + final ByteBuf data = rule.allocator.buffer(10); + final Payload payload = ByteBufPayload.create(data); + + assertThatThrownBy(() -> rule.socket.metadataPush(payload).block()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Metadata push should have metadata field present"); + PayloadAssert.assertThat(payload).isReleased(); + rule.assertHasNoLeaks(); + } + static Stream>> prepareCalls() { return Stream.of( RSocket::fireAndForget,