Skip to content

Commit

Permalink
Update dependencies (#533)
Browse files Browse the repository at this point in the history
Update to Californium-RELEASE
  • Loading branch information
rdegnan authored and yschimke committed Oct 2, 2018
1 parent b75d68f commit 9b6609f
Show file tree
Hide file tree
Showing 25 changed files with 198 additions and 191 deletions.
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ subprojects {

dependencyManagement {
imports {
// TODO: Upgrade to latest version
mavenBom 'io.projectreactor:reactor-bom:Bismuth-SR11'
mavenBom 'io.projectreactor:reactor-bom:Californium-RELEASE'
}

dependencies {
Expand Down
1 change: 0 additions & 1 deletion rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ dependencies {
api 'io.netty:netty-buffer'
api 'io.projectreactor:reactor-core'

implementation 'io.projectreactor.addons:reactor-extra'
implementation 'org.jctools:jctools-core'
implementation 'org.slf4j:slf4j-api'

Expand Down
5 changes: 2 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,9 @@ private void handleStreamZero(FrameType type, Frame frame) {
lifecycle.terminate(error);
errorConsumer.accept(error);
connection.dispose();
break;
case LEASE:
{
break;
}
break;
case KEEPALIVE:
if (keepAliveHandler != null) {
keepAliveHandler.receive(frame);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.rsocket.fragmentation.FrameReassembler.createFrameReassembler;
import static io.rsocket.util.AbstractionLeakingFrameUtils.toAbstractionLeakingFrame;
import static reactor.function.TupleUtils.function;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
Expand Down Expand Up @@ -115,7 +114,7 @@ public Flux<Frame> receive() {
return delegate
.receive()
.map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame)
.concatMap(function(this::toReassembledFrames));
.concatMap(t2 -> toReassembledFrames(t2.getT1(), t2.getT2()));
}

@Override
Expand All @@ -125,7 +124,7 @@ public Mono<Void> send(Publisher<Frame> frames) {
return delegate.send(
Flux.from(frames)
.map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame)
.concatMap(function(this::toFragmentedFrames)));
.concatMap(t2 -> toFragmentedFrames(t2.getT1(), t2.getT2())));
}

private Flux<Frame> toFragmentedFrames(int streamId, io.rsocket.framing.Frame frame) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.test.TestSubscriber;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
Expand Down Expand Up @@ -83,7 +83,7 @@ public Mono<Payload> requestResponse(Payload payload) {
};
}

private NettyContextCloseable server;
private CloseableChannel server;
private RSocket client;
private AtomicInteger requestCount;
private CountDownLatch disconnectionCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.rsocket.RSocketFactory;
import io.rsocket.test.SlowTest;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
Expand All @@ -23,7 +23,7 @@ public class InteractionsLoadTest {
public void channel() {
TcpServerTransport serverTransport = TcpServerTransport.create(0);

NettyContextCloseable server =
CloseableChannel server =
RSocketFactory.receive()
.acceptor((setup, rsocket) -> Mono.just(new EchoRSocket()))
.transport(serverTransport)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
Expand All @@ -42,7 +42,7 @@
public class TcpIntegrationTest {
private AbstractRSocket handler;

private NettyContextCloseable server;
private CloseableChannel server;

@Before
public void startup() {
Expand Down
9 changes: 8 additions & 1 deletion rsocket-transport-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ plugins {
id 'maven-publish'
id 'com.jfrog.artifactory'
id 'com.jfrog.bintray'
id "com.google.osdetector" version "1.4.0"
}

def os_suffix = ""
if (osdetector.classifier in ["linux-x86_64"] || ["osx-x86_64"] || ["windows-x86_64"]) {
os_suffix = ":" + osdetector.classifier
}

dependencies {
api project(':rsocket-core')
api 'io.projectreactor.ipc:reactor-netty'
api 'io.projectreactor.netty:reactor-netty'

compileOnly 'com.google.code.findbugs:jsr305'

Expand All @@ -35,6 +41,7 @@ dependencies {

testRuntimeOnly 'ch.qos.logback:logback-classic'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'io.netty:netty-tcnative-boringssl-static:2.0.14.Final' + os_suffix
}

description = 'Reactor Netty RSocket transport implementations (TCP, Websocket)'
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,42 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.netty.Connection;

import java.util.Objects;

/** An implementation of {@link DuplexConnection} that connects via TCP. */
public final class TcpDuplexConnection implements DuplexConnection {

private final NettyContext context;

private final NettyInbound in;

private final NettyOutbound out;
private final Connection connection;

/**
* Creates a new instance
*
* @param in the {@link NettyInbound} to listen on
* @param out the {@link NettyOutbound} to send with
* @param context the {@link NettyContext} to for managing the server
* @param connection the {@link Connection} to for managing the server
*/
public TcpDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = in;
this.out = out;
this.context = context;
public TcpDuplexConnection(Connection connection) {
this.connection = Objects.requireNonNull(connection, "connection must not be null");
}

@Override
public void dispose() {
context.dispose();
connection.dispose();
}

@Override
public boolean isDisposed() {
return context.isDisposed();
return connection.isDisposed();
}

@Override
public Mono<Void> onClose() {
return context.onClose();
return connection.onDispose();
}

@Override
public Flux<Frame> receive() {
return in.receive().map(buf -> Frame.from(buf.retain()));
return connection.inbound().receive().map(buf -> Frame.from(buf.retain()));
}

@Override
Expand All @@ -74,6 +66,6 @@ public Mono<Void> send(Publisher<Frame> frames) {

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(frame.content()).then();
return connection.outbound().sendObject(frame.content()).then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import reactor.ipc.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpServer;

/**
* An implementation of {@link UriHandler} that creates {@link TcpClientTransport}s and {@link
Expand Down Expand Up @@ -53,6 +53,9 @@ public Optional<ServerTransport> buildServer(URI uri) {
return Optional.empty();
}

return Optional.of(TcpServerTransport.create(TcpServer.create(uri.getHost(), uri.getPort())));
return Optional.of(TcpServerTransport.create(
TcpServer.create()
.host(uri.getHost())
.port(uri.getPort())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.netty.Connection;

/**
* An implementation of {@link DuplexConnection} that connects via a Websocket.
Expand All @@ -41,46 +39,38 @@
*/
public final class WebsocketDuplexConnection implements DuplexConnection {

private final NettyContext context;

private final NettyInbound in;

private final NettyOutbound out;
private final Connection connection;

/**
* Creates a new instance
*
* @param in the {@link NettyInbound} to listen on
* @param out the {@link NettyOutbound} to send with
* @param context the {@link NettyContext} to for managing the server
* @param connection the {@link Connection} to for managing the server
*/
public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = Objects.requireNonNull(in, "in must not be null");
this.out = Objects.requireNonNull(out, "out must not be null");
this.context = Objects.requireNonNull(context, "context must not be null");
public WebsocketDuplexConnection(Connection connection) {
this.connection = Objects.requireNonNull(connection, "connection must not be null");
}

@Override
public void dispose() {
context.dispose();
connection.dispose();
}

@Override
public boolean isDisposed() {
return context.isDisposed();
return connection.isDisposed();
}

@Override
public Mono<Void> onClose() {
return context.onClose();
return connection.onDispose();
}

@Override
public Flux<Frame> receive() {
return in.receive()
return connection.inbound().receive()
.map(
buf -> {
CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
CompositeByteBuf composite = connection.channel().alloc().compositeBuffer();
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
composite.addComponents(true, length, buf.retain());
Expand All @@ -95,7 +85,7 @@ public Mono<Void> send(Publisher<Frame> frames) {

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
return connection.outbound().sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.net.InetSocketAddress;
import java.util.Objects;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpClient;

/**
* An implementation of {@link ClientTransport} that connects to a {@link ServerTransport} via TCP.
Expand All @@ -44,7 +44,7 @@ private TcpClientTransport(TcpClient client) {
* @return a new instance
*/
public static TcpClientTransport create(int port) {
TcpClient tcpClient = TcpClient.create(port);
TcpClient tcpClient = TcpClient.create().port(port);
return create(tcpClient);
}

Expand All @@ -59,7 +59,7 @@ public static TcpClientTransport create(int port) {
public static TcpClientTransport create(String bindAddress, int port) {
Objects.requireNonNull(bindAddress, "bindAddress must not be null");

TcpClient tcpClient = TcpClient.create(bindAddress, port);
TcpClient tcpClient = TcpClient.create().host(bindAddress).port(port);
return create(tcpClient);
}

Expand All @@ -73,7 +73,7 @@ public static TcpClientTransport create(String bindAddress, int port) {
public static TcpClientTransport create(InetSocketAddress address) {
Objects.requireNonNull(address, "address must not be null");

TcpClient tcpClient = TcpClient.create(address.getHostString(), address.getPort());
TcpClient tcpClient = TcpClient.create().addressSupplier(() -> address);
return create(tcpClient);
}

Expand All @@ -92,20 +92,9 @@ public static TcpClientTransport create(TcpClient client) {

@Override
public Mono<DuplexConnection> connect() {
return Mono.create(
sink ->
client
.newHandler(
(in, out) -> {
in.context().addHandler(new RSocketLengthCodec());

TcpDuplexConnection connection =
new TcpDuplexConnection(in, out, in.context());

sink.success(connection);
return connection.onClose();
})
.doOnError(sink::error)
.subscribe());
return client
.doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
.connect()
.map(TcpDuplexConnection::new);
}
}
Loading

0 comments on commit 9b6609f

Please sign in to comment.