diff --git a/build.gradle b/build.gradle
index 679b93135..342a94954 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,8 +30,7 @@ subprojects {
dependencyManagement {
imports {
- // TODO: Upgrade to latest version
- mavenBom 'io.projectreactor:reactor-bom:Bismuth-SR11'
+ mavenBom 'io.projectreactor:reactor-bom:Californium-RELEASE'
}
dependencies {
diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle
index 0cdb6103c..7743715a3 100644
--- a/rsocket-core/build.gradle
+++ b/rsocket-core/build.gradle
@@ -27,7 +27,6 @@ dependencies {
api 'io.netty:netty-buffer'
api 'io.projectreactor:reactor-core'
- implementation 'io.projectreactor.addons:reactor-extra'
implementation 'org.jctools:jctools-core'
implementation 'org.slf4j:slf4j-api'
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
index 052b5b0e5..44460a938 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
@@ -430,10 +430,9 @@ private void handleStreamZero(FrameType type, Frame frame) {
lifecycle.terminate(error);
errorConsumer.accept(error);
connection.dispose();
+ break;
case LEASE:
- {
- break;
- }
+ break;
case KEEPALIVE:
if (keepAliveHandler != null) {
keepAliveHandler.receive(frame);
diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
index e765d54e2..e70f45d9f 100644
--- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
+++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
@@ -18,7 +18,6 @@
import static io.rsocket.fragmentation.FrameReassembler.createFrameReassembler;
import static io.rsocket.util.AbstractionLeakingFrameUtils.toAbstractionLeakingFrame;
-import static reactor.function.TupleUtils.function;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
@@ -115,7 +114,7 @@ public Flux receive() {
return delegate
.receive()
.map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame)
- .concatMap(function(this::toReassembledFrames));
+ .concatMap(t2 -> toReassembledFrames(t2.getT1(), t2.getT2()));
}
@Override
@@ -125,7 +124,7 @@ public Mono send(Publisher frames) {
return delegate.send(
Flux.from(frames)
.map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame)
- .concatMap(function(this::toFragmentedFrames)));
+ .concatMap(t2 -> toFragmentedFrames(t2.getT1(), t2.getT2())));
}
private Flux toFragmentedFrames(int streamId, io.rsocket.framing.Frame frame) {
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java
index bd9231e43..627b1d7da 100644
--- a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java
@@ -31,7 +31,7 @@
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.test.TestSubscriber;
import io.rsocket.transport.netty.client.TcpClientTransport;
-import io.rsocket.transport.netty.server.NettyContextCloseable;
+import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
@@ -83,7 +83,7 @@ public Mono requestResponse(Payload payload) {
};
}
- private NettyContextCloseable server;
+ private CloseableChannel server;
private RSocket client;
private AtomicInteger requestCount;
private CountDownLatch disconnectionCounter;
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java
index 1c488db4f..6c8f0e8fa 100644
--- a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java
@@ -6,7 +6,7 @@
import io.rsocket.RSocketFactory;
import io.rsocket.test.SlowTest;
import io.rsocket.transport.netty.client.TcpClientTransport;
-import io.rsocket.transport.netty.server.NettyContextCloseable;
+import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
@@ -23,7 +23,7 @@ public class InteractionsLoadTest {
public void channel() {
TcpServerTransport serverTransport = TcpServerTransport.create(0);
- NettyContextCloseable server =
+ CloseableChannel server =
RSocketFactory.receive()
.acceptor((setup, rsocket) -> Mono.just(new EchoRSocket()))
.transport(serverTransport)
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java
index 9aadb491d..f5d048508 100644
--- a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java
@@ -24,7 +24,7 @@
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
-import io.rsocket.transport.netty.server.NettyContextCloseable;
+import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
@@ -42,7 +42,7 @@
public class TcpIntegrationTest {
private AbstractRSocket handler;
- private NettyContextCloseable server;
+ private CloseableChannel server;
@Before
public void startup() {
diff --git a/rsocket-transport-netty/build.gradle b/rsocket-transport-netty/build.gradle
index f50aa9752..71d0d5088 100644
--- a/rsocket-transport-netty/build.gradle
+++ b/rsocket-transport-netty/build.gradle
@@ -19,11 +19,17 @@ plugins {
id 'maven-publish'
id 'com.jfrog.artifactory'
id 'com.jfrog.bintray'
+ id "com.google.osdetector" version "1.4.0"
+}
+
+def os_suffix = ""
+if (osdetector.classifier in ["linux-x86_64"] || ["osx-x86_64"] || ["windows-x86_64"]) {
+ os_suffix = ":" + osdetector.classifier
}
dependencies {
api project(':rsocket-core')
- api 'io.projectreactor.ipc:reactor-netty'
+ api 'io.projectreactor.netty:reactor-netty'
compileOnly 'com.google.code.findbugs:jsr305'
@@ -35,6 +41,7 @@ dependencies {
testRuntimeOnly 'ch.qos.logback:logback-classic'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+ testRuntimeOnly 'io.netty:netty-tcnative-boringssl-static:2.0.14.Final' + os_suffix
}
description = 'Reactor Netty RSocket transport implementations (TCP, Websocket)'
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java
index dca183bef..6ac2dbe0a 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java
@@ -21,50 +21,42 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.ipc.netty.NettyContext;
-import reactor.ipc.netty.NettyInbound;
-import reactor.ipc.netty.NettyOutbound;
+import reactor.netty.Connection;
+
+import java.util.Objects;
/** An implementation of {@link DuplexConnection} that connects via TCP. */
public final class TcpDuplexConnection implements DuplexConnection {
- private final NettyContext context;
-
- private final NettyInbound in;
-
- private final NettyOutbound out;
+ private final Connection connection;
/**
* Creates a new instance
*
- * @param in the {@link NettyInbound} to listen on
- * @param out the {@link NettyOutbound} to send with
- * @param context the {@link NettyContext} to for managing the server
+ * @param connection the {@link Connection} to for managing the server
*/
- public TcpDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
- this.in = in;
- this.out = out;
- this.context = context;
+ public TcpDuplexConnection(Connection connection) {
+ this.connection = Objects.requireNonNull(connection, "connection must not be null");
}
@Override
public void dispose() {
- context.dispose();
+ connection.dispose();
}
@Override
public boolean isDisposed() {
- return context.isDisposed();
+ return connection.isDisposed();
}
@Override
public Mono onClose() {
- return context.onClose();
+ return connection.onDispose();
}
@Override
public Flux receive() {
- return in.receive().map(buf -> Frame.from(buf.retain()));
+ return connection.inbound().receive().map(buf -> Frame.from(buf.retain()));
}
@Override
@@ -74,6 +66,6 @@ public Mono send(Publisher frames) {
@Override
public Mono sendOne(Frame frame) {
- return out.sendObject(frame.content()).then();
+ return connection.outbound().sendObject(frame.content()).then();
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpUriHandler.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpUriHandler.java
index 9821f5fa7..952a1e398 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpUriHandler.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpUriHandler.java
@@ -24,7 +24,7 @@
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
-import reactor.ipc.netty.tcp.TcpServer;
+import reactor.netty.tcp.TcpServer;
/**
* An implementation of {@link UriHandler} that creates {@link TcpClientTransport}s and {@link
@@ -53,6 +53,9 @@ public Optional buildServer(URI uri) {
return Optional.empty();
}
- return Optional.of(TcpServerTransport.create(TcpServer.create(uri.getHost(), uri.getPort())));
+ return Optional.of(TcpServerTransport.create(
+ TcpServer.create()
+ .host(uri.getHost())
+ .port(uri.getPort())));
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
index 7c71d2692..efb47c2a5 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
@@ -28,9 +28,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.ipc.netty.NettyContext;
-import reactor.ipc.netty.NettyInbound;
-import reactor.ipc.netty.NettyOutbound;
+import reactor.netty.Connection;
/**
* An implementation of {@link DuplexConnection} that connects via a Websocket.
@@ -41,46 +39,38 @@
*/
public final class WebsocketDuplexConnection implements DuplexConnection {
- private final NettyContext context;
-
- private final NettyInbound in;
-
- private final NettyOutbound out;
+ private final Connection connection;
/**
* Creates a new instance
*
- * @param in the {@link NettyInbound} to listen on
- * @param out the {@link NettyOutbound} to send with
- * @param context the {@link NettyContext} to for managing the server
+ * @param connection the {@link Connection} to for managing the server
*/
- public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
- this.in = Objects.requireNonNull(in, "in must not be null");
- this.out = Objects.requireNonNull(out, "out must not be null");
- this.context = Objects.requireNonNull(context, "context must not be null");
+ public WebsocketDuplexConnection(Connection connection) {
+ this.connection = Objects.requireNonNull(connection, "connection must not be null");
}
@Override
public void dispose() {
- context.dispose();
+ connection.dispose();
}
@Override
public boolean isDisposed() {
- return context.isDisposed();
+ return connection.isDisposed();
}
@Override
public Mono onClose() {
- return context.onClose();
+ return connection.onDispose();
}
@Override
public Flux receive() {
- return in.receive()
+ return connection.inbound().receive()
.map(
buf -> {
- CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
+ CompositeByteBuf composite = connection.channel().alloc().compositeBuffer();
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
composite.addComponents(true, length, buf.retain());
@@ -95,7 +85,7 @@ public Mono send(Publisher frames) {
@Override
public Mono sendOne(Frame frame) {
- return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
+ return connection.outbound().sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
.then();
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java
index 36829c907..aca238d31 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java
@@ -24,7 +24,7 @@
import java.net.InetSocketAddress;
import java.util.Objects;
import reactor.core.publisher.Mono;
-import reactor.ipc.netty.tcp.TcpClient;
+import reactor.netty.tcp.TcpClient;
/**
* An implementation of {@link ClientTransport} that connects to a {@link ServerTransport} via TCP.
@@ -44,7 +44,7 @@ private TcpClientTransport(TcpClient client) {
* @return a new instance
*/
public static TcpClientTransport create(int port) {
- TcpClient tcpClient = TcpClient.create(port);
+ TcpClient tcpClient = TcpClient.create().port(port);
return create(tcpClient);
}
@@ -59,7 +59,7 @@ public static TcpClientTransport create(int port) {
public static TcpClientTransport create(String bindAddress, int port) {
Objects.requireNonNull(bindAddress, "bindAddress must not be null");
- TcpClient tcpClient = TcpClient.create(bindAddress, port);
+ TcpClient tcpClient = TcpClient.create().host(bindAddress).port(port);
return create(tcpClient);
}
@@ -73,7 +73,7 @@ public static TcpClientTransport create(String bindAddress, int port) {
public static TcpClientTransport create(InetSocketAddress address) {
Objects.requireNonNull(address, "address must not be null");
- TcpClient tcpClient = TcpClient.create(address.getHostString(), address.getPort());
+ TcpClient tcpClient = TcpClient.create().addressSupplier(() -> address);
return create(tcpClient);
}
@@ -92,20 +92,9 @@ public static TcpClientTransport create(TcpClient client) {
@Override
public Mono connect() {
- return Mono.create(
- sink ->
- client
- .newHandler(
- (in, out) -> {
- in.context().addHandler(new RSocketLengthCodec());
-
- TcpDuplexConnection connection =
- new TcpDuplexConnection(in, out, in.context());
-
- sink.success(connection);
- return connection.onClose();
- })
- .doOnError(sink::error)
- .subscribe());
+ return client
+ .doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
+ .connect()
+ .map(TcpDuplexConnection::new);
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java
index 01851f82a..8f4dac7eb 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java
@@ -30,8 +30,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
+
import reactor.core.publisher.Mono;
-import reactor.ipc.netty.http.client.HttpClient;
+import reactor.netty.Connection;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.tcp.TcpClient;
/**
* An implementation of {@link ClientTransport} that connects to a {@link ServerTransport} via a
@@ -57,8 +60,8 @@ private WebsocketClientTransport(HttpClient client, String path) {
* @return a new instance
*/
public static WebsocketClientTransport create(int port) {
- HttpClient httpClient = HttpClient.create(port);
- return create(httpClient, "/");
+ TcpClient client = TcpClient.create().port(port);
+ return create(client);
}
/**
@@ -72,8 +75,8 @@ public static WebsocketClientTransport create(int port) {
public static WebsocketClientTransport create(String bindAddress, int port) {
Objects.requireNonNull(bindAddress, "bindAddress must not be null");
- HttpClient httpClient = HttpClient.create(bindAddress, port);
- return create(httpClient, "/");
+ TcpClient client = TcpClient.create().host(bindAddress).port(port);
+ return create(client);
}
/**
@@ -86,7 +89,8 @@ public static WebsocketClientTransport create(String bindAddress, int port) {
public static WebsocketClientTransport create(InetSocketAddress address) {
Objects.requireNonNull(address, "address must not be null");
- return create(address.getHostName(), address.getPort());
+ TcpClient client = TcpClient.create().addressSupplier(() -> address);
+ return create(client);
}
/**
@@ -99,8 +103,21 @@ public static WebsocketClientTransport create(InetSocketAddress address) {
public static WebsocketClientTransport create(URI uri) {
Objects.requireNonNull(uri, "uri must not be null");
- HttpClient httpClient = createClient(uri);
- return create(httpClient, uri.getPath());
+ TcpClient client = createClient(uri);
+ return create(HttpClient.from(client), uri.getPath());
+ }
+
+ /**
+ * Creates a new instance
+ *
+ * @param client the {@link TcpClient} to use
+ * @return a new instance
+ * @throws NullPointerException if {@code client} or {@code path} is {@code null}
+ */
+ public static WebsocketClientTransport create(TcpClient client) {
+ Objects.requireNonNull(client, "client must not be null");
+
+ return create(HttpClient.from(client), "/");
}
/**
@@ -120,21 +137,12 @@ public static WebsocketClientTransport create(HttpClient client, String path) {
@Override
public Mono connect() {
- return Mono.create(
- sink ->
- client
- .ws(path, hb -> transportHeaders.get().forEach(hb::set))
- .flatMap(
- response ->
- response.receiveWebsocket(
- (in, out) -> {
- WebsocketDuplexConnection connection =
- new WebsocketDuplexConnection(in, out, in.context());
- sink.success(connection);
- return connection.onClose();
- }))
- .doOnError(sink::error)
- .subscribe());
+ return client
+ .headers(headers -> transportHeaders.get().forEach(headers::set))
+ .websocket()
+ .uri(path)
+ .connect()
+ .map(WebsocketDuplexConnection::new);
}
@Override
@@ -143,16 +151,16 @@ public void setTransportHeaders(Supplier