Skip to content

Commit

Permalink
Simplify RSocketClient/Server implementation and avoid unnecessary fl…
Browse files Browse the repository at this point in the history
…atMap (#467)
  • Loading branch information
Ryland Degnan authored and robertroeser committed Feb 5, 2018
1 parent 30d3cf2 commit ec42c83
Show file tree
Hide file tree
Showing 18 changed files with 1,838 additions and 1,480 deletions.
6 changes: 1 addition & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,12 @@ subprojects {
}

repositories {
maven { url 'http://repo.spring.io/milestone' }
maven { url 'https://oss.jfrog.org/libs-snapshot' }
maven { url 'https://dl.bintray.com/rsocket/RSocket' }
maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' }
jcenter()
}

dependencies {
compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
compile "io.netty:netty-buffer:4.1.20.Final"
compile "org.reactivestreams:reactive-streams:1.0.1"
compile "org.slf4j:slf4j-api:1.7.25"
compile "com.google.code.findbugs:jsr305:3.0.2"

Expand Down
3 changes: 1 addition & 2 deletions rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public void dispose() {
}
public void dispose() {}

@Override
public Mono<Void> onClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public double availability() {
}

@Override
public void dispose() {
}
public void dispose() {}

@Override
public Mono<Void> onClose() {
Expand Down
4 changes: 2 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/Closeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
public interface Closeable extends Disposable {
/**
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying transport
* connection is closed.
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying
* transport connection is closed.
*
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
*/
Expand Down
81 changes: 32 additions & 49 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.util.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.*;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.*;

/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
class RSocketClient implements RSocket {
Expand Down Expand Up @@ -88,36 +87,25 @@ class RSocketClient implements RSocket {
started
.thenMany(Flux.interval(tickPeriod))
.doOnSubscribe(s -> timeLastTickSentMs = System.currentTimeMillis())
.concatMap(i -> sendKeepAlive(ackTimeoutMs, missedAcks))
.doOnError(
.subscribe(
i -> sendKeepAlive(ackTimeoutMs, missedAcks),
t -> {
errorConsumer.accept(t);
connection.dispose();
})
.subscribe();
});
}

connection
.onClose()
.doFinally(
signalType -> {
cleanup();
})
.doOnError(errorConsumer)
.subscribe();
connection.onClose().doFinally(signalType -> cleanup()).subscribe(null, errorConsumer);

connection
.send(sendProcessor)
.doOnError(this::handleSendProcessorError)
.doFinally(this::handleSendProcessorCancel)
.subscribe();
.subscribe(null, this::handleSendProcessorError);

connection
.receive()
.doOnSubscribe(subscription -> started.onComplete())
.doOnNext(this::handleIncomingFrames)
.doOnError(errorConsumer)
.subscribe();
.subscribe(this::handleIncomingFrames, errorConsumer);
}

private void handleSendProcessorError(Throwable t) {
Expand Down Expand Up @@ -152,23 +140,20 @@ private void handleSendProcessorCancel(SignalType t) {
}
}

private Mono<Void> sendKeepAlive(long ackTimeoutMs, int missedAcks) {
return Mono.fromRunnable(
() -> {
long now = System.currentTimeMillis();
if (now - timeLastTickSentMs > ackTimeoutMs) {
int count = missedAckCounter.incrementAndGet();
if (count >= missedAcks) {
String message =
String.format(
"Missed %d keep-alive acks with a threshold of %d and a ack timeout of %d ms",
count, missedAcks, ackTimeoutMs);
throw new ConnectionException(message);
}
}
private void sendKeepAlive(long ackTimeoutMs, int missedAcks) {
long now = System.currentTimeMillis();
if (now - timeLastTickSentMs > ackTimeoutMs) {
int count = missedAckCounter.incrementAndGet();
if (count >= missedAcks) {
String message =
String.format(
"Missed %d keep-alive acks with a threshold of %d and a ack timeout of %d ms",
count, missedAcks, ackTimeoutMs);
throw new ConnectionException(message);
}
}

sendProcessor.onNext(Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true));
});
sendProcessor.onNext(Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true));
}

@Override
Expand Down Expand Up @@ -380,14 +365,12 @@ public Frame apply(Payload payload) {
}
});

requestFrames
.doOnNext(sendProcessor::onNext)
.doOnError(
t -> {
errorConsumer.accept(t);
receiver.dispose();
})
.subscribe();
requestFrames.subscribe(
sendProcessor::onNext,
t -> {
errorConsumer.accept(t);
receiver.dispose();
});
} else {
sendOneFrame(Frame.RequestN.from(streamId, l));
}
Expand Down Expand Up @@ -415,10 +398,10 @@ private boolean contains(int streamId) {

protected void cleanup() {
try {
for (UnicastProcessor<Payload> subscriber: receivers.values()) {
for (UnicastProcessor<Payload> subscriber : receivers.values()) {
cleanUpSubscriber(subscriber);
}
for (LimitableRequestPublisher p: senders.values()) {
for (LimitableRequestPublisher p : senders.values()) {
cleanUpLimitableRequestPublisher(p);
}

Expand Down
Loading

0 comments on commit ec42c83

Please sign in to comment.