Skip to content

Commit

Permalink
Replace close() method with dispose()/isDisposed() (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryland Degnan authored and robertroeser committed Dec 23, 2017
1 parent c6fd3f4 commit 9bd625f
Show file tree
Hide file tree
Showing 41 changed files with 307 additions and 319 deletions.
14 changes: 9 additions & 5 deletions rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static class Input {

static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8));

static final Payload HELLO_PAYLOAD = new DefaultPayload(HELLO);
static final Payload HELLO_PAYLOAD = DefaultPayload.create(HELLO);

static final DirectProcessor<Frame> clientReceive = DirectProcessor.create();
static final DirectProcessor<Frame> serverReceive = DirectProcessor.create();
Expand Down Expand Up @@ -121,8 +121,7 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public Mono<Void> close() {
return Mono.empty();
public void dispose() {
}

@Override
Expand All @@ -140,8 +139,13 @@ public Mono<Void> onClose() {
MonoProcessor<Void> onClose = MonoProcessor.create();

@Override
public Mono<Void> close() {
return Mono.empty().doFinally(s -> onClose.onComplete()).then();
public void dispose() {
onClose.onComplete();
}

@Override
public boolean isDisposed() {
return onClose.isDisposed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public double availability() {
}

@Override
public Mono<Void> close() {
return Mono.empty();
public void dispose() {
}

@Override
Expand Down
16 changes: 7 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
/**
* An abstract implementation of {@link RSocket}. All request handling methods emit {@link
* UnsupportedOperationException} and hence must be overridden to provide a valid implementation.
*
* <p>{@link #close()} returns a {@code Publisher} that immediately terminates. That same Publisher
* is returned by the {@link #onClose()} method.
*/
public abstract class AbstractRSocket implements RSocket {

Expand Down Expand Up @@ -58,12 +55,13 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public Mono<Void> close() {
return Mono.defer(
() -> {
onClose.onComplete();
return onClose;
});
public void dispose() {
onClose.onComplete();
}

@Override
public boolean isDisposed() {
return onClose.isDisposed();
}

@Override
Expand Down
16 changes: 3 additions & 13 deletions rsocket-core/src/main/java/io/rsocket/Closeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,14 @@

package io.rsocket;

import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/** */
public interface Closeable {
/**
* Close this {@code RSocket} upon subscribing to the returned {@code Publisher}
*
* <p><em>This method is idempotent and hence can be called as many times at any point with same
* outcome.</em>
*
* @return A {@code Publisher} that triggers the close when subscribed to and that completes when
* this {@code RSocket} close is complete.
*/
Mono<Void> close();

public interface Closeable extends Disposable {
/**
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
* RSocket} can be closed by explicitly calling {@link #close()} or when the underlying transport
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying transport
* connection is closed.
*
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
Expand Down
5 changes: 5 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ default Mono<Void> sendOne(Frame frame) {
* @return Stream of all {@code Frame}s received.
*/
Flux<Frame> receive();

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
}
}
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ public interface RSocket extends Availability, Closeable {

@Override
default double availability() {
return 0.0;
return isDisposed() ? 0.0 : 1.0;
}
}
21 changes: 13 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RSocketClient implements RSocket {
.doOnError(
t -> {
errorConsumer.accept(t);
connection.close().subscribe();
connection.dispose();
})
.subscribe();
}
Expand Down Expand Up @@ -234,8 +234,13 @@ public double availability() {
}

@Override
public Mono<Void> close() {
return connection.close();
public void dispose() {
connection.dispose();
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
}

@Override
Expand All @@ -260,25 +265,25 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
return receiver
.doOnRequest(
l -> {
if (first.compareAndSet(false, true) && !receiver.isTerminated()) {
if (first.compareAndSet(false, true) && !receiver.isDisposed()) {
final Frame requestFrame =
Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, l);
payload.release();
sendProcessor.onNext(requestFrame);
} else if (contains(streamId) && !receiver.isTerminated()) {
} else if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
}
sendProcessor.drain();
})
.doOnError(
t -> {
if (contains(streamId) && !receiver.isTerminated()) {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
})
.doOnCancel(
() -> {
if (contains(streamId) && !receiver.isTerminated()) {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
})
Expand Down Expand Up @@ -326,7 +331,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request, FrameType requestType
boolean firstRequest = true;

boolean isValidToSendFrame() {
return contains(streamId) && !receiver.isTerminated();
return contains(streamId) && !receiver.isDisposed();
}

void sendOneFrame(Frame frame) {
Expand Down
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Mono<? extends Void> processSetupFrame(
return multiplexer
.asStreamZeroConnection()
.sendOne(Frame.Error.from(0, error))
.then(multiplexer.close());
.doFinally(signalType -> multiplexer.dispose());
}

ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
Expand Down
11 changes: 8 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,13 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public Mono<Void> close() {
return connection.close();
public void dispose() {
connection.dispose();
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
}

@Override
Expand All @@ -213,7 +218,7 @@ private void cleanup() {
cleanUpSendingSubscriptions();
cleanUpChannelProcessors();

requestHandler.close().subscribe();
requestHandler.dispose();
}

private synchronized void cleanUpSendingSubscriptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,13 @@ public Flux<Frame> receive() {
}

@Override
public Mono<Void> close() {
return source.close();
public void dispose() {
source.dispose();
}

private synchronized FrameReassembler getFrameReassembler(Frame frame) {
return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame));
}

private synchronized FrameReassembler removeFrameReassembler(int streamId) {
return frameReassemblers.remove(streamId);
}

private synchronized boolean frameReassemblersContain(int streamId) {
return frameReassemblers.containsKey(streamId);
@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
Expand All @@ -114,4 +107,16 @@ public Mono<Void> onClose() {
}
});
}

private synchronized FrameReassembler getFrameReassembler(Frame frame) {
return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame));
}

private synchronized FrameReassembler removeFrameReassembler(int streamId) {
return frameReassemblers.remove(streamId);
}

private synchronized boolean frameReassemblersContain(int streamId) {
return frameReassemblers.containsKey(streamId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.rsocket.internal;

import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.FrameType;
Expand All @@ -41,7 +42,7 @@
* even. Even IDs are for the streams initiated by server and odds are for streams initiated by the
* client.
*/
public class ClientServerInputMultiplexer {
public class ClientServerInputMultiplexer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");

private final DuplexConnection streamZeroConnection;
Expand Down Expand Up @@ -112,8 +113,19 @@ public DuplexConnection asStreamZeroConnection() {
return streamZeroConnection;
}

public Mono<Void> close() {
return source.close();
@Override
public void dispose() {
source.dispose();
}

@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
public Mono<Void> onClose() {
return source.onClose();
}

private static class InternalDuplexConnection implements DuplexConnection {
Expand Down Expand Up @@ -158,8 +170,13 @@ public Flux<Frame> receive() {
}

@Override
public Mono<Void> close() {
return source.close();
public void dispose() {
source.dispose();
}

@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
Expand Down
29 changes: 0 additions & 29 deletions rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java

This file was deleted.

9 changes: 7 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ public double availability() {
}

@Override
public Mono<Void> close() {
return source.close();
public void dispose() {
source.dispose();
}

@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
Expand Down
Loading

0 comments on commit 9bd625f

Please sign in to comment.