Skip to content

Commit

Permalink
Fix multiple calls to dispose in FragmentationDuplexConnection, allow…
Browse files Browse the repository at this point in the history
… fragmentation to be disabled until leaks are fixed
  • Loading branch information
rdegnan committed Aug 2, 2018
1 parent 83335d1 commit 3e89238
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
4 changes: 3 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ public Mono<RSocket> start() {
dataMimeType,
setupPayload);

connection = new FragmentationDuplexConnection(connection, mtu);
if (mtu > 0) {
connection = new FragmentationDuplexConnection(connection, mtu);
}

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(connection, plugins);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public FragmentationDuplexConnection(
NumberUtils.requireNonNegative(maxFragmentSize, "maxFragmentSize must be positive");

this.frameFragmenter = new FrameFragmenter(byteBufAllocator, maxFragmentSize);

delegate
.onClose()
.doFinally(signalType -> frameReassemblers.values().forEach(FrameReassembler::dispose))
.subscribe();
}

@Override
Expand All @@ -102,9 +107,7 @@ public boolean isDisposed() {

@Override
public Mono<Void> onClose() {
return delegate
.onClose()
.doAfterTerminate(() -> frameReassemblers.values().forEach(FrameReassembler::dispose));
return delegate.onClose();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.mockito.ArgumentCaptor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

final class FragmentationDuplexConnectionTest {
Expand Down Expand Up @@ -93,6 +94,7 @@ void reassembleData() {
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, data.slice(4, 2)));

when(delegate.receive()).thenReturn(Flux.just(fragment1, fragment2, fragment3));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2)
.receive()
Expand Down Expand Up @@ -123,6 +125,7 @@ void reassembleMetadata() {
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, metadata.slice(4, 2), null));

when(delegate.receive()).thenReturn(Flux.just(fragment1, fragment2, fragment3));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2)
.receive()
Expand Down Expand Up @@ -165,6 +168,7 @@ void reassembleMetadataAndData() {

when(delegate.receive())
.thenReturn(Flux.just(fragment1, fragment2, fragment3, fragment4, fragment5));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2)
.receive()
Expand All @@ -181,6 +185,7 @@ void reassembleNonFragment() {
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, (ByteBuf) null, null));

when(delegate.receive()).thenReturn(Flux.just(frame));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2)
.receive()
Expand All @@ -195,6 +200,7 @@ void reassembleNonFragmentableFrame() {
Frame frame = toAbstractionLeakingFrame(DEFAULT, 1, createTestCancelFrame());

when(delegate.receive()).thenReturn(Flux.just(frame));
when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2)
.receive()
Expand Down Expand Up @@ -224,6 +230,8 @@ void sendData() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, data.slice(4, 2)));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand All @@ -241,6 +249,8 @@ void sendEqualToMaxFragmentLength() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2)));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand All @@ -254,6 +264,8 @@ void sendFragment() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, true, true, (ByteBuf) null, null));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand All @@ -267,6 +279,8 @@ void sendLessThanMaxFragmentLength() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(1)));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand Down Expand Up @@ -294,6 +308,8 @@ void sendMetadata() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, metadata.slice(4, 2), null));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand Down Expand Up @@ -336,6 +352,8 @@ void sendMetadataAndData() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, data.slice(3, 2)));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand All @@ -353,6 +371,8 @@ void sendMetadataAndData() {
void sendNonFragmentable() {
Frame frame = toAbstractionLeakingFrame(DEFAULT, 1, createTestCancelFrame());

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand All @@ -362,6 +382,8 @@ void sendNonFragmentable() {
@DisplayName("send throws NullPointerException with null frames")
@Test
void sendNullFrames() {
when(delegate.onClose()).thenReturn(Mono.never());

assertThatNullPointerException()
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, 2).send(null))
.withMessage("frames must not be null");
Expand All @@ -374,6 +396,8 @@ void sendZeroMaxFragmentLength() {
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2)));

when(delegate.onClose()).thenReturn(Mono.never());

new FragmentationDuplexConnection(DEFAULT, delegate, 0).sendOne(frame);
verify(delegate).send(publishers.capture());

Expand Down

0 comments on commit 3e89238

Please sign in to comment.