Skip to content

Commit

Permalink
Update dependencies, cleanup RSocketClient/Server implementation (#466)
Browse files Browse the repository at this point in the history
* Delegate dispose/isDisposed to underlying channel in NettyDuplexConnection

* Update dependencies

* Update NonBlockingHashMap

* Avoid unhandled exception in ClientServerInputMultiplexer
  • Loading branch information
Ryland Degnan authored and robertroeser committed Feb 2, 2018
1 parent 353115f commit 30d3cf2
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 300 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ subprojects {
}

dependencies {
compile "io.projectreactor:reactor-core:3.1.2.RELEASE"
compile "io.netty:netty-buffer:4.1.17.Final"
compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
compile "io.netty:netty-buffer:4.1.20.Final"
compile "org.reactivestreams:reactive-streams:1.0.1"
compile "org.slf4j:slf4j-api:1.7.25"
compile "com.google.code.findbugs:jsr305:3.0.2"
Expand All @@ -90,7 +90,7 @@ subprojects {
testCompile "org.mockito:mockito-core:2.10.0"
testCompile "org.hamcrest:hamcrest-library:1.3"
testCompile "org.slf4j:slf4j-log4j12:1.7.25"
testCompile "io.projectreactor:reactor-test:3.1.2.RELEASE"
testCompile "io.projectreactor:reactor-test:3.1.3.RELEASE"
}

publishing {
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
# limitations under the License.
#

mavenversion=0.9-SNAPSHOT
mavenversion=0.10-SNAPSHOT
release.scope=patch
release.version=0.9-SNAPSHOT
release.version=0.10-SNAPSHOT
100 changes: 26 additions & 74 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -122,22 +121,15 @@ class RSocketClient implements RSocket {
}

private void handleSendProcessorError(Throwable t) {
Collection<UnicastProcessor<Payload>> values;
Collection<LimitableRequestPublisher> values1;
synchronized (RSocketClient.this) {
values = receivers.values();
values1 = senders.values();
}

for (Subscriber subscriber : values) {
for (Subscriber subscriber : receivers.values()) {
try {
subscriber.onError(t);
} catch (Throwable e) {
errorConsumer.accept(e);
}
}

for (LimitableRequestPublisher p : values1) {
for (LimitableRequestPublisher p : senders.values()) {
p.cancel();
}
}
Expand All @@ -146,22 +138,16 @@ private void handleSendProcessorCancel(SignalType t) {
if (SignalType.ON_ERROR == t) {
return;
}
Collection<UnicastProcessor<Payload>> values;
Collection<LimitableRequestPublisher> values1;
synchronized (RSocketClient.this) {
values = receivers.values();
values1 = senders.values();
}

for (Subscriber subscriber : values) {
for (Subscriber subscriber : receivers.values()) {
try {
subscriber.onError(new Throwable("closed connection"));
} catch (Throwable e) {
errorConsumer.accept(e);
}
}

for (LimitableRequestPublisher p : values1) {
for (LimitableRequestPublisher p : senders.values()) {
p.cancel();
}
}
Expand Down Expand Up @@ -255,10 +241,7 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
int streamId = streamIdSupplier.nextStreamId();

UnicastProcessor<Payload> receiver = UnicastProcessor.create();

synchronized (this) {
receivers.put(streamId, receiver);
}
receivers.put(streamId, receiver);

AtomicBoolean first = new AtomicBoolean(false);

Expand Down Expand Up @@ -289,7 +272,7 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
})
.doFinally(
s -> {
removeReceiver(streamId);
receivers.remove(streamId);
});
}));
}
Expand All @@ -304,10 +287,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
payload.release();

UnicastProcessor<Payload> receiver = UnicastProcessor.create();

synchronized (this) {
receivers.put(streamId, receiver);
}
receivers.put(streamId, receiver);

sendProcessor.onNext(requestFrame);

Expand All @@ -317,7 +297,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
.doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId)))
.doFinally(
s -> {
removeReceiver(streamId);
receivers.remove(streamId);
});
}));
}
Expand Down Expand Up @@ -364,10 +344,8 @@ public Flux<Payload> get() {
LimitableRequestPublisher.wrap(f);
// Need to set this to one for first the frame
wrapped.increaseRequestLimit(1);
synchronized (RSocketClient.this) {
senders.put(streamId, wrapped);
receivers.put(streamId, receiver);
}
senders.put(streamId, wrapped);
receivers.put(streamId, receiver);

return wrapped;
})
Expand Down Expand Up @@ -424,39 +402,32 @@ public Frame apply(Payload payload) {
})
.doFinally(
s -> {
removeReceiver(streamId);
removeSender(streamId);
receivers.remove(streamId);
senders.remove(streamId);
});
}
}));
}

private boolean contains(int streamId) {
synchronized (RSocketClient.this) {
return receivers.containsKey(streamId);
}
return receivers.containsKey(streamId);
}

protected void cleanup() {
try {
Collection<UnicastProcessor<Payload>> subscribers;
Collection<LimitableRequestPublisher> publishers;
synchronized (RSocketClient.this) {
subscribers = receivers.values();
publishers = senders.values();
for (UnicastProcessor<Payload> subscriber: receivers.values()) {
cleanUpSubscriber(subscriber);
}
for (LimitableRequestPublisher p: senders.values()) {
cleanUpLimitableRequestPublisher(p);
}

subscribers.forEach(this::cleanUpSubscriber);
publishers.forEach(this::cleanUpLimitableRequestPublisher);

if (null != keepAliveSendSub) {
keepAliveSendSub.dispose();
}
} finally {
synchronized (this) {
senders.clear();
receivers.clear();
}
senders.clear();
receivers.clear();
}
}

Expand Down Expand Up @@ -513,29 +484,23 @@ private void handleStreamZero(FrameType type, Frame frame) {
}

private void handleFrame(int streamId, FrameType type, Frame frame) {
Subscriber<Payload> receiver;
synchronized (this) {
receiver = receivers.get(streamId);
}
Subscriber<Payload> receiver = receivers.get(streamId);
if (receiver == null) {
handleMissingResponseProcessor(streamId, type, frame);
} else {
switch (type) {
case ERROR:
receiver.onError(Exceptions.from(frame));
removeReceiver(streamId);
receivers.remove(streamId);
break;
case NEXT_COMPLETE:
receiver.onNext(frameDecoder.apply(frame));
receiver.onComplete();
break;
case CANCEL:
{
LimitableRequestPublisher sender;
synchronized (this) {
sender = senders.remove(streamId);
removeReceiver(streamId);
}
LimitableRequestPublisher sender = senders.remove(streamId);
receivers.remove(streamId);
if (sender != null) {
sender.cancel();
}
Expand All @@ -546,10 +511,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
break;
case REQUEST_N:
{
LimitableRequestPublisher sender;
synchronized (this) {
sender = senders.get(streamId);
}
LimitableRequestPublisher sender = senders.get(streamId);
if (sender != null) {
int n = Frame.RequestN.requestN(frame);
sender.increaseRequestLimit(n);
Expand All @@ -559,9 +521,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
}
case COMPLETE:
receiver.onComplete();
synchronized (this) {
receivers.remove(streamId);
}
receivers.remove(streamId);
break;
default:
throw new IllegalStateException(
Expand Down Expand Up @@ -593,12 +553,4 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
// receiving a frame after a given stream has been cancelled/completed,
// so ignore (cancellation is async so there is a race condition)
}

private synchronized void removeReceiver(int streamId) {
receivers.remove(streamId);
}

private synchronized void removeSender(int streamId) {
senders.remove(streamId);
}
}
Loading

0 comments on commit 30d3cf2

Please sign in to comment.