Skip to content

Commit

Permalink
Bi-Directional Requester/Responder
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Sep 18, 2015
1 parent 9f0b516 commit 7b1e789
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 83 deletions.
258 changes: 221 additions & 37 deletions src/main/java/io/reactivesocket/ReactiveSocket.java

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions src/main/java/io/reactivesocket/internal/BooleanDisposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.reactivesocket.internal;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import io.reactivesocket.observable.Disposable;

public final class BooleanDisposable implements Disposable {
volatile Runnable run;

static final AtomicReferenceFieldUpdater<BooleanDisposable, Runnable> RUN =
AtomicReferenceFieldUpdater.newUpdater(BooleanDisposable.class, Runnable.class, "run");

static final Runnable DISPOSED = () -> { };

public BooleanDisposable() {
this(() -> { });
}

public BooleanDisposable(Runnable run) {
RUN.lazySet(this, run);
}

@Override
public void dispose() {
Runnable r = run;
if (r != DISPOSED) {
r = RUN.getAndSet(this, DISPOSED);
if (r != DISPOSED) {
r.run();
}
}
}

public boolean isDisposed() {
return run == DISPOSED;
}
}
85 changes: 85 additions & 0 deletions src/main/java/io/reactivesocket/internal/CompositeCompletable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.reactivesocket.internal;

import java.util.HashSet;
import java.util.Set;

import io.reactivesocket.Completable;

/**
* A Completable container that can hold onto multiple other Completables.
*/
public final class CompositeCompletable implements Completable {

// protected by synchronized
private boolean completed = false;
private Throwable error = null;
final Set<Completable> resources = new HashSet<>();

public CompositeCompletable() {

}

public void add(Completable d) {
boolean terminal = false;
synchronized (this) {
if (error != null || completed) {
terminal = true;
} else {
resources.add(d);
}
}
if (terminal) {
if (error != null) {
d.error(error);
} else {
d.success();
}
}
}

public void remove(Completable d) {
synchronized (this) {
resources.remove(d);
}
}

public void clear() {
synchronized (this) {
resources.clear();
}
}

@Override
public void success() {
Completable[] cs = null;
synchronized (this) {
if (error == null) {
completed = true;
cs = resources.toArray(new Completable[] {});
resources.clear();
}
}
if (cs != null) {
for (Completable c : cs) {
c.success();
}
}
}

@Override
public void error(Throwable e) {
Completable[] cs = null;
synchronized (this) {
if (error == null && !completed) {
error = e;
cs = resources.toArray(new Completable[] {});
resources.clear();
}
}
if (cs != null) {
for (Completable c : cs) {
c.error(e);
}
}
}
}
61 changes: 61 additions & 0 deletions src/main/java/io/reactivesocket/internal/CompositeDisposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.reactivesocket.internal;

import java.util.HashSet;
import java.util.Set;

import io.reactivesocket.Completable;
import io.reactivesocket.observable.Disposable;

/**
* A Disposable container that can hold onto multiple other Disposables.
*/
public final class CompositeDisposable implements Disposable {

// protected by synchronized
private boolean disposed = false;
final Set<Disposable> resources = new HashSet<>();

public CompositeDisposable() {

}

public void add(Disposable d) {
boolean isDisposed = false;
synchronized (this) {
if (disposed) {
isDisposed = true;
} else {
resources.add(d);
}
}
if (isDisposed) {
d.dispose();
}
}

public void remove(Completable d) {
synchronized (this) {
resources.remove(d);
}
}

public void clear() {
synchronized (this) {
resources.clear();
}
}

@Override
public void dispose() {
Disposable[] cs = null;
synchronized (this) {
disposed = true;
cs = resources.toArray(new Disposable[] {});
resources.clear();
}
for (Disposable d : cs) {
d.dispose();
}
}

}
66 changes: 35 additions & 31 deletions src/main/java/io/reactivesocket/internal/Requester.java
Original file line number Diff line number Diff line change
Expand Up @@ -753,38 +753,42 @@ private void start(Completable onComplete) {
connection.getInput().subscribe(new Observer<Frame>() {
public void onSubscribe(Disposable d) {
if (connectionSubscription.compareAndSet(null, d)) {
// now that we are connected, send SETUP frame (asynchronously, other messages can continue being written after this)
connection.addOutput(PublisherUtils.just(Frame.Setup.from(setupPayload.getFlags(), KEEPALIVE_INTERVAL_MS, 0, setupPayload.metadataMimeType(), setupPayload.dataMimeType(), setupPayload)),
new Completable() {

@Override
public void success() {
onComplete.success();
requesterStarted = true;
}

@Override
public void error(Throwable e) {
onComplete.error(e);
tearDown(e);
}

});

connection.addOutput(PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS),
new Completable()
{
public void success()
{
}

public void error(Throwable e)
if(isServer) {
requesterStarted = true;
onComplete.success();
} else {
// now that we are connected, send SETUP frame (asynchronously, other messages can continue being written after this)
connection.addOutput(PublisherUtils.just(Frame.Setup.from(setupPayload.getFlags(), KEEPALIVE_INTERVAL_MS, 0, setupPayload.metadataMimeType(), setupPayload.dataMimeType(), setupPayload)),
new Completable() {

@Override
public void success() {
requesterStarted = true;
onComplete.success();
}

@Override
public void error(Throwable e) {
onComplete.error(e);
tearDown(e);
}

});

connection.addOutput(PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS),
new Completable()
{
onComplete.error(e);
tearDown(e);
}
});

public void success()
{
}

public void error(Throwable e)
{
onComplete.error(e);
tearDown(e);
}
});
}
} else {
// means we already were cancelled
d.dispose();
Expand Down
33 changes: 26 additions & 7 deletions src/main/java/io/reactivesocket/internal/Responder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,23 @@
*/
public class Responder {
private final DuplexConnection connection;
private final ConnectionSetupHandler connectionHandler;
private final ConnectionSetupHandler connectionHandler; // for server
private final RequestHandler clientRequestHandler; // for client
private final Consumer<Throwable> errorStream;
private volatile LeaseGovernor leaseGovernor;
private long timeOfLastKeepalive;
private final Consumer<ConnectionSetupPayload> setupCallback;
private final boolean isServer;

private Responder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream) {
private Responder(boolean isServer, DuplexConnection connection, ConnectionSetupHandler connectionHandler, RequestHandler requestHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Consumer<ConnectionSetupPayload> setupCallback) {
this.isServer = isServer;
this.connection = connection;
this.connectionHandler = connectionHandler;
this.clientRequestHandler = requestHandler;
this.leaseGovernor = leaseGovernor;
this.errorStream = errorStream;
this.timeOfLastKeepalive = System.nanoTime();
this.setupCallback = setupCallback;
}

/**
Expand All @@ -62,8 +68,18 @@ private Responder(DuplexConnection connection, ConnectionSetupHandler connection
* This include fireAndForget which ONLY emit errors server-side via this mechanism.
* @return responder instance
*/
public static <T> Responder create(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
Responder responder = new Responder(connection, connectionHandler, leaseGovernor, errorStream);
public static <T> Responder createServerResponder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable, Consumer<ConnectionSetupPayload> setupCallback) {
Responder responder = new Responder(true, connection, connectionHandler, null, leaseGovernor, errorStream, setupCallback);
responder.start(responderCompletable);
return responder;
}

public static <T> Responder createServerResponder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
return createServerResponder(connection, connectionHandler, leaseGovernor, errorStream, responderCompletable, s -> {});
}

public static <T> Responder createClientResponder(DuplexConnection connection, RequestHandler requestHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
Responder responder = new Responder(false, connection, null, requestHandler, leaseGovernor, errorStream, s -> {});
responder.start(responderCompletable);
return responder;
}
Expand Down Expand Up @@ -123,12 +139,12 @@ public void onSubscribe(Disposable d) {
}
}

volatile RequestHandler requestHandler = null; // null until after first Setup frame

volatile RequestHandler requestHandler = !isServer ? clientRequestHandler : null; // null until after first Setup frame
@Override
public void onNext(Frame requestFrame) {
final int streamId = requestFrame.getStreamId();
if (requestHandler == null) {
if (requestHandler == null) { // this will only happen when isServer==true
if (childTerminated.get()) {
// already terminated, but still receiving latent messages ... ignore them while shutdown occurs
return;
Expand All @@ -141,6 +157,9 @@ public void onNext(Frame requestFrame) {
throw new SetupException("unsupported protocol version: " + Frame.Setup.version(requestFrame));
}

// accept setup for ReactiveSocket/Requester usage
setupCallback.accept(connectionSetupPayload);
// handle setup
requestHandler = connectionHandler.apply(connectionSetupPayload);
} catch (SetupException setupException) {
setupErrorAndTearDown(connection, setupException);
Expand Down
Loading

0 comments on commit 7b1e789

Please sign in to comment.