diff --git a/.travis.yml b/.travis.yml index c1e802935..116d2d2ff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ dist: trusty matrix: include: - - jdk: oraclejdk8 + - jdk: openjdk8 - jdk: openjdk11 env: SKIP_RELEASE=true - jdk: openjdk12 diff --git a/AUTHORS b/AUTHORS index 89f6e3696..ef7dd9dda 100644 --- a/AUTHORS +++ b/AUTHORS @@ -18,3 +18,4 @@ somasun = somasun stevegury = Steve Gury tmontgomery = Todd L. Montgomery yschimke = Yuri Schimke +OlegDokuka = Oleh Dokuka diff --git a/README.md b/README.md index 69e0e65e2..7b7bc94be 100644 --- a/README.md +++ b/README.md @@ -23,10 +23,10 @@ Example: ```groovy dependencies { - implementation 'io.rsocket:rsocket-core:0.12.2-RC4' - implementation 'io.rsocket:rsocket-transport-netty:0.12.2-RC4' -// implementation 'io.rsocket:rsocket-core:1.0.0-RC1-SNAPSHOT' -// implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC1-SNAPSHOT' + implementation 'io.rsocket:rsocket-core:1.0.0-RC3' + implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC3' +// implementation 'io.rsocket:rsocket-core:1.0.0-RC4-SNAPSHOT' +// implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC4-SNAPSHOT' } ``` diff --git a/build.gradle b/build.gradle index 3ce473f9c..ed66c41cc 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,13 @@ subprojects { googleJavaFormat { toolVersion = '1.6' } - + + ext { + if (project.hasProperty('versionSuffix')) { + project.version += project.getProperty('versionSuffix') + } + } + dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:${ext['reactor-bom.version']}" @@ -177,6 +183,16 @@ subprojects { name 'Yuri Schimke' email 'yuri@schimke.ee' } + developer { + id 'OlegDokuka' + name 'Oleh Dokuka' + email 'oleh@netifi.com' + } + developer { + id 'mostroverkhov' + name 'Maksym Ostroverkhov' + email 'm.ostroverkhov@gmail.com' + } } scm { diff --git a/ci/travis.sh b/ci/travis.sh index 372c01070..9154da33b 100755 --- a/ci/travis.sh +++ b/ci/travis.sh @@ -11,6 +11,7 @@ elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ] && [ "$bin ./gradlew \ -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \ -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \ + -PversionSuffix="-SNAPSHOT" \ build artifactoryPublish --stacktrace elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ] && [ "$bintrayUser" != "" ] ; then diff --git a/gradle.properties b/gradle.properties index c6d02bbef..433d1c709 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=1.0.0-RC2 +version=1.0.0-RC3 diff --git a/rsocket-bom/build.gradle b/rsocket-bom/build.gradle new file mode 100755 index 000000000..046b8f30d --- /dev/null +++ b/rsocket-bom/build.gradle @@ -0,0 +1,121 @@ +/* + * Copyright 2015-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +plugins { + id 'java-library' + id 'maven' + id 'maven-publish' + id 'com.jfrog.artifactory' + id 'com.jfrog.bintray' + id 'io.morethan.jmhreport' + id 'me.champeau.gradle.jmh' +} + +description = 'Bill of materials to make sure a consistent set of versions is used for RSocket-Java.' +configurations.archives.artifacts.clear() + +build.doLast { + pom { + customizePom(it, project) + } +} + +dependencies { + compile project(':rsocket-core') + compile project(':rsocket-test') + compile project(':rsocket-load-balancer') + compile project(':rsocket-micrometer') + compile project(':rsocket-transport-local') + compile project(':rsocket-transport-netty') +} + +def customizePom(generatedPom, gradleProject) { + //make sure that dependencies are under + generatedPom.withXml { + if (generatedPom.generatedDependencies.size > 0) { + asNode().appendNode('dependencyManagement', asNode().dependencies) + asNode().dependencies.replaceNode {} + } + } + + generatedPom.project { + name = 'RSocket-Java Release Train - BOM' + description = gradleProject.description + url = 'http://rsocket.io' + groupId = group + packaging = "pom" + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + developers { + developer { + id 'robertroeser' + name 'Robert Roeser' + email 'robert@netifi.com' + } + developer { + id 'rdegnan' + name 'Ryland Degnan' + email 'ryland@netifi.com' + } + developer { + id 'yschimke' + name 'Yuri Schimke' + email 'yuri@schimke.ee' + } + developer { + id 'OlegDokuka' + name 'Oleh Dokuka' + email 'oleh@netifi.com' + } + developer { + id 'mostroverkhov' + name 'Maksym Ostroverkhov' + email 'm.ostroverkhov@gmail.com' + } + } + scm { + connection = 'scm:git:git://github.com/rsocket/rsocket-java.git' + developerConnection = 'scm:git:ssh://github.com/rsocket/rsocket-java.git' + url = 'http://github.com/rsocket/rsocket-java/' + } + issueManagement { + system = "GitHub Issues" + url = "https://github.com/rsocket/rsocket-java/issues" + } + } + + generatedPom.writeTo("$buildDir/poms/rsocket-bom-${version}.xml") +} +plugins.withType(MavenPublishPlugin) { + publishing { + publications { + mavenJava(MavenPublication) { + pom.withXml { + def sb = asString() + sb.setLength 0 + sb.append file("$buildDir/poms/rsocket-bom-${version}.xml").text + println(sb.toString()) + } + } + } + } +} + +publish.dependsOn("build") +publishToMavenLocal.dependsOn("build") \ No newline at end of file diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 8a3e3ae25..b6c268464 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -31,11 +31,11 @@ import io.rsocket.internal.ClientSetup; import io.rsocket.internal.ServerSetup; import io.rsocket.keepalive.KeepAliveHandler; -import io.rsocket.lease.*; -import io.rsocket.plugins.DuplexConnectionInterceptor; -import io.rsocket.plugins.PluginRegistry; -import io.rsocket.plugins.Plugins; -import io.rsocket.plugins.RSocketInterceptor; +import io.rsocket.lease.LeaseStats; +import io.rsocket.lease.Leases; +import io.rsocket.lease.RequesterLeaseHandler; +import io.rsocket.lease.ResponderLeaseHandler; +import io.rsocket.plugins.*; import io.rsocket.resume.*; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; @@ -44,7 +44,10 @@ import io.rsocket.util.MultiSubscriberRSocket; import java.time.Duration; import java.util.Objects; -import java.util.function.*; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import reactor.core.publisher.Mono; /** Factory for creating RSocket clients and servers. */ @@ -93,10 +96,7 @@ default Start transport(ServerTransport transport) { public static class ClientRSocketFactory implements ClientTransportAcceptor { private static final String CLIENT_TAG = "client"; - private Supplier> acceptor = - () -> rSocket -> new AbstractRSocket() {}; - - private BiFunction biAcceptor; + private SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(new AbstractRSocket() {}); private Consumer errorConsumer = Throwable::printStackTrace; private int mtu = 0; @@ -161,6 +161,11 @@ public ClientRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) { return this; } + public ClientRSocketFactory addSocketAcceptorPlugin(SocketAcceptorInterceptor interceptor) { + plugins.addSocketAcceptorPlugin(interceptor); + return this; + } + /** * Deprecated as Keep-Alive is not optional according to spec * @@ -268,18 +273,25 @@ public Start transport(Supplier transportClient) { } public ClientTransportAcceptor acceptor(Function acceptor) { - this.acceptor = () -> acceptor; - return StartClient::new; + return acceptor(() -> acceptor); } public ClientTransportAcceptor acceptor(Supplier> acceptor) { - this.acceptor = acceptor; - return StartClient::new; + return acceptor( + (SocketAcceptor) + (setup, sendingSocket) -> Mono.just(acceptor.get().apply(sendingSocket))); } + @Deprecated public ClientTransportAcceptor acceptor( BiFunction biAcceptor) { - this.biAcceptor = biAcceptor; + return acceptor( + (SocketAcceptor) + (setup, sendingSocket) -> Mono.just(biAcceptor.apply(setup, sendingSocket))); + } + + public ClientTransportAcceptor acceptor(SocketAcceptor acceptor) { + this.acceptor = acceptor; return StartClient::new; } @@ -346,6 +358,8 @@ public Mono start() { rSocketRequester = new MultiSubscriberRSocket(rSocketRequester); } + RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester); + ByteBuf setupFrame = SetupFrameFlyweight.encode( allocator, @@ -357,34 +371,38 @@ public Mono start() { dataMimeType, setupPayload); - RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester); - - RSocket rSocketHandler; - if (biAcceptor != null) { - ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame); - rSocketHandler = biAcceptor.apply(setup, wrappedRSocketRequester); - } else { - rSocketHandler = acceptor.get().apply(wrappedRSocketRequester); - } - - RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler); - - ResponderLeaseHandler responderLeaseHandler = - isLeaseEnabled - ? new ResponderLeaseHandler.Impl<>( - CLIENT_TAG, allocator, leases.sender(), errorConsumer, leases.stats()) - : ResponderLeaseHandler.None; - - RSocket rSocketResponder = - new RSocketResponder( - allocator, - multiplexer.asServerConnection(), - wrappedRSocketHandler, - payloadDecoder, - errorConsumer, - responderLeaseHandler); + ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame); + + return plugins + .applySocketAcceptorInterceptor(acceptor) + .accept(setup, wrappedRSocketRequester) + .flatMap( + rSocketHandler -> { + RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler); + + ResponderLeaseHandler responderLeaseHandler = + isLeaseEnabled + ? new ResponderLeaseHandler.Impl<>( + CLIENT_TAG, + allocator, + leases.sender(), + errorConsumer, + leases.stats()) + : ResponderLeaseHandler.None; + + RSocket rSocketResponder = + new RSocketResponder( + allocator, + multiplexer.asServerConnection(), + wrappedRSocketHandler, + payloadDecoder, + errorConsumer, + responderLeaseHandler); - return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketRequester); + return wrappedConnection + .sendOne(setupFrame) + .thenReturn(wrappedRSocketRequester); + }); }); } @@ -476,6 +494,11 @@ public ServerRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) { return this; } + public ServerRSocketFactory addSocketAcceptorPlugin(SocketAcceptorInterceptor interceptor) { + plugins.addSocketAcceptorPlugin(interceptor); + return this; + } + public ServerTransportAcceptor acceptor(SocketAcceptor acceptor) { this.acceptor = acceptor; return new ServerStart<>(); @@ -644,7 +667,8 @@ private Mono acceptSetup( } RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester); - return acceptor + return plugins + .applySocketAcceptorInterceptor(acceptor) .accept(setupPayload, wrappedRSocketRequester) .onErrorResume( err -> sendError(multiplexer, rejectedSetupError(err)).then(Mono.error(err))) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java index ec4c8cec2..f921365da 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java @@ -27,7 +27,7 @@ import io.rsocket.exceptions.Exceptions; import io.rsocket.frame.*; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.internal.LimitableRequestPublisher; +import io.rsocket.internal.RateLimitableRequestPublisher; import io.rsocket.internal.SynchronizedIntObjectHashMap; import io.rsocket.internal.UnboundedProcessor; import io.rsocket.internal.UnicastMonoProcessor; @@ -47,6 +47,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.*; +import reactor.util.concurrent.Queues; /** * Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer @@ -60,7 +61,7 @@ class RSocketRequester implements RSocket { private final PayloadDecoder payloadDecoder; private final Consumer errorConsumer; private final StreamIdSupplier streamIdSupplier; - private final IntObjectMap senders; + private final IntObjectMap senders; private final IntObjectMap> receivers; private final UnboundedProcessor sendProcessor; private final RequesterLeaseHandler leaseHandler; @@ -131,7 +132,7 @@ private void handleSendProcessorError(Throwable t) { } }); - senders.values().forEach(LimitableRequestPublisher::cancel); + senders.values().forEach(RateLimitableRequestPublisher::cancel); } private void handleSendProcessorCancel(SignalType t) { @@ -150,7 +151,7 @@ private void handleSendProcessorCancel(SignalType t) { } }); - senders.values().forEach(LimitableRequestPublisher::cancel); + senders.values().forEach(RateLimitableRequestPublisher::cancel); } @Override @@ -343,8 +344,8 @@ public void accept(long n) { request .transform( f -> { - LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f); + RateLimitableRequestPublisher wrapped = + RateLimitableRequestPublisher.wrap(f, Queues.SMALL_BUFFER_SIZE); // Need to set this to one for first the frame wrapped.request(1); senders.put(streamId, wrapped); @@ -421,7 +422,7 @@ protected void hookOnError(Throwable t) { .doFinally( s -> { receivers.remove(streamId); - LimitableRequestPublisher sender = senders.remove(streamId); + RateLimitableRequestPublisher sender = senders.remove(streamId); if (sender != null) { sender.cancel(); } @@ -489,7 +490,7 @@ private void setTerminationError(Throwable error) { } private synchronized void cleanUpLimitableRequestPublisher( - LimitableRequestPublisher limitableRequestPublisher) { + RateLimitableRequestPublisher limitableRequestPublisher) { try { limitableRequestPublisher.cancel(); } catch (Throwable t) { @@ -561,7 +562,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { break; case CANCEL: { - LimitableRequestPublisher sender = senders.remove(streamId); + RateLimitableRequestPublisher sender = senders.remove(streamId); if (sender != null) { sender.cancel(); } @@ -572,7 +573,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { break; case REQUEST_N: { - LimitableRequestPublisher sender = senders.get(streamId); + RateLimitableRequestPublisher sender = senders.get(streamId); if (sender != null) { int n = RequestNFrameFlyweight.requestN(frame); sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java index 3bd221d64..490b00967 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java @@ -23,7 +23,7 @@ import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.frame.*; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.internal.LimitableRequestPublisher; +import io.rsocket.internal.RateLimitableRequestPublisher; import io.rsocket.internal.SynchronizedIntObjectHashMap; import io.rsocket.internal.UnboundedProcessor; import io.rsocket.lease.ResponderLeaseHandler; @@ -35,6 +35,7 @@ import reactor.core.Disposable; import reactor.core.Exceptions; import reactor.core.publisher.*; +import reactor.util.concurrent.Queues; /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ class RSocketResponder implements ResponderRSocket { @@ -46,7 +47,7 @@ class RSocketResponder implements ResponderRSocket { private final Consumer errorConsumer; private final ResponderLeaseHandler leaseHandler; - private final IntObjectMap sendingLimitableSubscriptions; + private final IntObjectMap sendingLimitableSubscriptions; private final IntObjectMap sendingSubscriptions; private final IntObjectMap> channelProcessors; @@ -435,8 +436,8 @@ private void handleStream(int streamId, Flux response, int initialReque response .transform( frameFlux -> { - LimitableRequestPublisher payloads = - LimitableRequestPublisher.wrap(frameFlux); + RateLimitableRequestPublisher payloads = + RateLimitableRequestPublisher.wrap(frameFlux, Queues.SMALL_BUFFER_SIZE); sendingLimitableSubscriptions.put(streamId, payloads); payloads.request( initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); diff --git a/rsocket-core/src/main/java/io/rsocket/SocketAcceptor.java b/rsocket-core/src/main/java/io/rsocket/SocketAcceptor.java index 0f6b99d0e..85c731eea 100644 --- a/rsocket-core/src/main/java/io/rsocket/SocketAcceptor.java +++ b/rsocket-core/src/main/java/io/rsocket/SocketAcceptor.java @@ -20,20 +20,21 @@ import reactor.core.publisher.Mono; /** - * {@code RSocket} is a full duplex protocol where a client and server are identical in terms of - * both having the capability to initiate requests to their peer. This interface provides the - * contract where a server accepts a new {@code RSocket} for sending requests to the peer and - * returns a new {@code RSocket} that will be used to accept requests from it's peer. + * RSocket is a full duplex protocol where a client and server are identical in terms of both having + * the capability to initiate requests to their peer. This interface provides the contract where a + * client or server handles the {@code setup} for a new connection and creates a responder {@code + * RSocket} for accepting requests from the remote peer. */ public interface SocketAcceptor { /** - * Accepts a new {@code RSocket} used to send requests to the peer and returns another {@code - * RSocket} that is used for accepting requests from the peer. + * Handle the {@code SETUP} frame for a new connection and create a responder {@code RSocket} for + * handling requests from the remote peer. * - * @param setup Setup as sent by the client. - * @param sendingSocket Socket used to send requests to the peer. - * @return Socket to accept requests from the peer. + * @param setup the {@code setup} received from a client in a server scenario, or in a client + * scenario this is the setup about to be sent to the server. + * @param sendingSocket socket for sending requests to the remote peer. + * @return {@code RSocket} to accept requests with. * @throws SetupException If the acceptor needs to reject the setup of this socket. */ Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java b/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java new file mode 100755 index 000000000..cdb0d0c0c --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java @@ -0,0 +1,242 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.internal; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import javax.annotation.Nullable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Operators; + +/** */ +public class RateLimitableRequestPublisher extends Flux implements Subscription { + + private static final int NOT_CANCELED_STATE = 0; + private static final int CANCELED_STATE = 1; + + private final Publisher source; + + private volatile int canceled; + private static final AtomicIntegerFieldUpdater CANCELED = + AtomicIntegerFieldUpdater.newUpdater(RateLimitableRequestPublisher.class, "canceled"); + + private final long prefetch; + private final long limit; + + private long externalRequested; // need sync + private int pendingToFulfil; // need sync since should be checked/zerroed in onNext + // and increased in request + private int deliveredElements; // no need to sync since increased zerroed only in + // the request method + + private boolean subscribed; + + private @Nullable Subscription internalSubscription; + + private RateLimitableRequestPublisher(Publisher source, long prefetch) { + this.source = source; + this.prefetch = prefetch; + this.limit = prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : (prefetch - (prefetch >> 2)); + } + + public static RateLimitableRequestPublisher wrap(Publisher source, long prefetch) { + return new RateLimitableRequestPublisher<>(source, prefetch); + } + + @Override + public void subscribe(CoreSubscriber destination) { + synchronized (this) { + if (subscribed) { + throw new IllegalStateException("only one subscriber at a time"); + } + + subscribed = true; + } + final InnerOperator s = new InnerOperator(destination); + + source.subscribe(s); + destination.onSubscribe(s); + } + + @Override + public void request(long n) { + synchronized (this) { + long requested = externalRequested; + if (requested == Long.MAX_VALUE) { + return; + } + externalRequested = Operators.addCap(n, requested); + } + + requestN(); + } + + private void requestN() { + final long r; + final Subscription s; + + synchronized (this) { + s = internalSubscription; + if (s == null) { + return; + } + + final long er = externalRequested; + final long p = prefetch; + final int pendingFulfil = pendingToFulfil; + + if (er != Long.MAX_VALUE || p != Integer.MAX_VALUE) { + // shortcut + if (pendingFulfil == p) { + return; + } + + r = Math.min(p - pendingFulfil, er); + if (er != Long.MAX_VALUE) { + externalRequested -= r; + } + if (p != Integer.MAX_VALUE) { + pendingToFulfil += r; + } + } else { + r = Long.MAX_VALUE; + } + } + + if (r > 0) { + s.request(r); + } + } + + public void cancel() { + if (!isCanceled() && CANCELED.compareAndSet(this, NOT_CANCELED_STATE, CANCELED_STATE)) { + Subscription s; + + synchronized (this) { + s = internalSubscription; + internalSubscription = null; + subscribed = false; + } + + if (s != null) { + s.cancel(); + } + } + } + + private boolean isCanceled() { + return canceled == CANCELED_STATE; + } + + private class InnerOperator implements CoreSubscriber, Subscription { + final Subscriber destination; + + private InnerOperator(Subscriber destination) { + this.destination = destination; + } + + @Override + public void onSubscribe(Subscription s) { + synchronized (RateLimitableRequestPublisher.this) { + RateLimitableRequestPublisher.this.internalSubscription = s; + + if (isCanceled()) { + s.cancel(); + subscribed = false; + RateLimitableRequestPublisher.this.internalSubscription = null; + } + } + + requestN(); + } + + @Override + public void onNext(T t) { + try { + destination.onNext(t); + + if (prefetch == Integer.MAX_VALUE) { + return; + } + + final long l = limit; + int d = deliveredElements + 1; + + if (d == l) { + d = 0; + final long r; + final Subscription s; + + synchronized (RateLimitableRequestPublisher.this) { + long er = externalRequested; + s = internalSubscription; + + if (s == null) { + return; + } + + if (er >= l) { + er -= l; + // keep pendingToFulfil as is since it is eq to prefetch + r = l; + } else { + pendingToFulfil -= l; + if (er > 0) { + r = er; + er = 0; + pendingToFulfil += r; + } else { + r = 0; + } + } + + externalRequested = er; + } + + if (r > 0) { + s.request(r); + } + } + + deliveredElements = d; + } catch (Throwable e) { + onError(e); + } + } + + @Override + public void onError(Throwable t) { + destination.onError(t); + } + + @Override + public void onComplete() { + destination.onComplete(); + } + + @Override + public void request(long n) {} + + @Override + public void cancel() { + RateLimitableRequestPublisher.this.cancel(); + } + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/metadata/RoutingMetadata.java b/rsocket-core/src/main/java/io/rsocket/metadata/RoutingMetadata.java new file mode 100644 index 000000000..d1f2643dc --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/metadata/RoutingMetadata.java @@ -0,0 +1,18 @@ +package io.rsocket.metadata; + +import io.netty.buffer.ByteBuf; + +/** + * Routing Metadata extension from + * https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md + * + * @author linux_china + */ +public class RoutingMetadata extends TaggingMetadata { + private static final WellKnownMimeType ROUTING_MIME_TYPE = + WellKnownMimeType.MESSAGE_RSOCKET_ROUTING; + + public RoutingMetadata(ByteBuf content) { + super(ROUTING_MIME_TYPE.getString(), content); + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/metadata/TaggingMetadata.java b/rsocket-core/src/main/java/io/rsocket/metadata/TaggingMetadata.java new file mode 100644 index 000000000..e22d97106 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/metadata/TaggingMetadata.java @@ -0,0 +1,64 @@ +package io.rsocket.metadata; + +import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Tagging metadata from https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md + * + * @author linux_china + */ +public class TaggingMetadata implements Iterable, CompositeMetadata.Entry { + /** Tag max length in bytes */ + private static int TAG_LENGTH_MAX = 0xFF; + + private String mimeType; + private ByteBuf content; + + public TaggingMetadata(String mimeType, ByteBuf content) { + this.mimeType = mimeType; + this.content = content; + } + + public Stream stream() { + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + iterator(), Spliterator.DISTINCT | Spliterator.NONNULL | Spliterator.ORDERED), + false); + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return content.readerIndex() < content.capacity(); + } + + @Override + public String next() { + int tagLength = TAG_LENGTH_MAX & content.readByte(); + if (tagLength > 0) { + return content.readSlice(tagLength).toString(StandardCharsets.UTF_8); + } else { + return ""; + } + } + }; + } + + @Override + public ByteBuf getContent() { + return this.content; + } + + @Override + public String getMimeType() { + return this.mimeType; + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/metadata/TaggingMetadataFlyweight.java b/rsocket-core/src/main/java/io/rsocket/metadata/TaggingMetadataFlyweight.java new file mode 100644 index 000000000..c7870bf0d --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/metadata/TaggingMetadataFlyweight.java @@ -0,0 +1,76 @@ +package io.rsocket.metadata; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import java.nio.charset.StandardCharsets; +import java.util.Collection; + +/** + * A flyweight class that can be used to encode/decode tagging metadata information to/from {@link + * ByteBuf}. This is intended for low-level efficient manipulation of such buffers. See {@link + * TaggingMetadata} for an Iterator-like approach to decoding entries. + * + * @author linux_china + */ +public class TaggingMetadataFlyweight { + /** Tag max length in bytes */ + private static int TAG_LENGTH_MAX = 0xFF; + + /** + * create routing metadata + * + * @param allocator the {@link ByteBufAllocator} to use to create intermediate buffers as needed. + * @param tags tag values + * @return routing metadata + */ + public static RoutingMetadata createRoutingMetadata( + ByteBufAllocator allocator, Collection tags) { + return new RoutingMetadata(createTaggingContent(allocator, tags)); + } + + /** + * create tagging metadata from composite metadata entry + * + * @param entry composite metadata entry + * @return tagging metadata + */ + public static TaggingMetadata createTaggingMetadata(CompositeMetadata.Entry entry) { + return new TaggingMetadata(entry.getMimeType(), entry.getContent()); + } + + /** + * create tagging metadata + * + * @param allocator the {@link ByteBufAllocator} to use to create intermediate buffers as needed. + * @param knownMimeType the {@link WellKnownMimeType} to encode. + * @param tags tag values + * @return Tagging Metadata + */ + public static TaggingMetadata createTaggingMetadata( + ByteBufAllocator allocator, String knownMimeType, Collection tags) { + return new TaggingMetadata(knownMimeType, createTaggingContent(allocator, tags)); + } + + /** + * create tagging content + * + * @param allocator the {@link ByteBufAllocator} to use to create intermediate buffers as needed. + * @param tags tag values + * @return tagging content + */ + public static ByteBuf createTaggingContent(ByteBufAllocator allocator, Collection tags) { + CompositeByteBuf taggingContent = allocator.compositeBuffer(); + for (String key : tags) { + int length = ByteBufUtil.utf8Bytes(key); + if (length == 0 || length > TAG_LENGTH_MAX) { + continue; + } + ByteBuf byteBuf = allocator.buffer().writeByte(length); + byteBuf.writeCharSequence(key, StandardCharsets.UTF_8); + taggingContent.addComponent(true, byteBuf); + } + return taggingContent; + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/plugins/PluginRegistry.java b/rsocket-core/src/main/java/io/rsocket/plugins/PluginRegistry.java index 676cfc19c..e3a19367c 100644 --- a/rsocket-core/src/main/java/io/rsocket/plugins/PluginRegistry.java +++ b/rsocket-core/src/main/java/io/rsocket/plugins/PluginRegistry.java @@ -18,6 +18,7 @@ import io.rsocket.DuplexConnection; import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; import java.util.ArrayList; import java.util.List; @@ -25,6 +26,7 @@ public class PluginRegistry { private List connections = new ArrayList<>(); private List requesters = new ArrayList<>(); private List responders = new ArrayList<>(); + private List socketAcceptorInterceptors = new ArrayList<>(); public PluginRegistry() {} @@ -58,6 +60,10 @@ public void addResponderPlugin(RSocketInterceptor interceptor) { responders.add(interceptor); } + public void addSocketAcceptorPlugin(SocketAcceptorInterceptor interceptor) { + socketAcceptorInterceptors.add(interceptor); + } + /** Deprecated. Use {@link #applyRequester(RSocket)} instead */ @Deprecated public RSocket applyClient(RSocket rSocket) { @@ -86,6 +92,14 @@ public RSocket applyResponder(RSocket rSocket) { return rSocket; } + public SocketAcceptor applySocketAcceptorInterceptor(SocketAcceptor acceptor) { + for (SocketAcceptorInterceptor i : socketAcceptorInterceptors) { + acceptor = i.apply(acceptor); + } + + return acceptor; + } + public DuplexConnection applyConnection( DuplexConnectionInterceptor.Type type, DuplexConnection connection) { for (DuplexConnectionInterceptor i : connections) { diff --git a/rsocket-core/src/main/java/io/rsocket/plugins/SocketAcceptorInterceptor.java b/rsocket-core/src/main/java/io/rsocket/plugins/SocketAcceptorInterceptor.java new file mode 100644 index 000000000..c9201ca5b --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/plugins/SocketAcceptorInterceptor.java @@ -0,0 +1,29 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.plugins; + +import io.rsocket.SocketAcceptor; +import java.util.function.Function; + +/** + * Contract to decorate a {@link SocketAcceptor}, providing access to connection {@code setup} + * information and the ability to also decorate the sockets for requesting and responding. + * + *

This can be used as an alternative to individual requester and responder {@link + * RSocketInterceptor} plugins. + */ +public @FunctionalInterface interface SocketAcceptorInterceptor + extends Function {} diff --git a/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java b/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java new file mode 100644 index 000000000..af4c528e9 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java @@ -0,0 +1,140 @@ +package io.rsocket.internal; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +class RateLimitableRequestPublisherTest { + + @Test + public void testThatRequest1WillBePropagatedUpstream() { + Flux source = + Flux.just(1) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, 128); + + StepVerifier.create(rateLimitableRequestPublisher) + .then(() -> rateLimitableRequestPublisher.request(1)) + .expectNext(1) + .expectComplete() + .verify(Duration.ofMillis(1000)); + } + + @Test + public void testThatRequest256WillBePropagatedToUpstreamWithLimitedRate() { + Flux source = + Flux.range(0, 256) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, 128); + + StepVerifier.create(rateLimitableRequestPublisher) + .then(() -> rateLimitableRequestPublisher.request(256)) + .expectNextCount(256) + .expectComplete() + .verify(Duration.ofMillis(1000)); + } + + @Test + public void testThatRequest256WillBePropagatedToUpstreamWithLimitedRateInFewSteps() { + Flux source = + Flux.range(0, 256) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, 128); + + StepVerifier.create(rateLimitableRequestPublisher) + .then(() -> rateLimitableRequestPublisher.request(10)) + .expectNextCount(5) + .then(() -> rateLimitableRequestPublisher.request(128)) + .expectNextCount(133) + .expectNoEvent(Duration.ofMillis(10)) + .then(() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE)) + .expectNextCount(118) + .expectComplete() + .verify(Duration.ofMillis(1000)); + } + + @Test + public void testThatRequestInRandomFashionWillBePropagatedToUpstreamWithLimitedRateInFewSteps() { + Flux source = + Flux.range(0, 10000000) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, 128); + + StepVerifier.create(rateLimitableRequestPublisher) + .then( + () -> + Flux.interval(Duration.ofMillis(1000)) + .onBackpressureDrop() + .subscribe( + new Consumer() { + int count = 10000000; + + @Override + public void accept(Long __) { + int random = ThreadLocalRandom.current().nextInt(1, 512); + + long request = Math.min(random, count); + + count -= request; + + rateLimitableRequestPublisher.request(count); + } + })) + .expectNextCount(10000000) + .expectComplete() + .verify(Duration.ofMillis(30000)); + } + + @Test + public void testThatRequestLongMaxValueWillBeDeliveredInSeparateChunks() { + Flux source = + Flux.range(0, 10000000) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, 128); + + StepVerifier.create(rateLimitableRequestPublisher) + .then(() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE)) + .expectNextCount(10000000) + .expectComplete() + .verify(Duration.ofMillis(30000)); + } + + @Test + public void testThatRequestLongMaxWithIntegerMaxValuePrefetchWillBeDeliveredAsLongMaxValue() { + Flux source = + Flux.range(0, 10000000) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isEqualTo(Long.MAX_VALUE)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, Integer.MAX_VALUE); + + StepVerifier.create(rateLimitableRequestPublisher) + .then(() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE)) + .expectNextCount(10000000) + .expectComplete() + .verify(Duration.ofMillis(30000)); + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/metadata/TaggingMetadataTest.java b/rsocket-core/src/test/java/io/rsocket/metadata/TaggingMetadataTest.java new file mode 100644 index 000000000..d1fbb50b0 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/metadata/TaggingMetadataTest.java @@ -0,0 +1,47 @@ +package io.rsocket.metadata; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.netty.buffer.ByteBufAllocator; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; + +/** + * Tagging metadata test + * + * @author linux_china + */ +public class TaggingMetadataTest { + private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; + + @Test + public void testParseTags() { + List tags = + Arrays.asList( + "ws://localhost:8080/rsocket", String.join("", Collections.nCopies(129, "x"))); + TaggingMetadata taggingMetadata = + TaggingMetadataFlyweight.createTaggingMetadata( + byteBufAllocator, "message/x.rsocket.routing.v0", tags); + TaggingMetadata taggingMetadataCopy = + new TaggingMetadata("message/x.rsocket.routing.v0", taggingMetadata.getContent()); + assertThat(tags) + .containsExactlyElementsOf(taggingMetadataCopy.stream().collect(Collectors.toList())); + } + + @Test + public void testEmptyTagAndOverLengthTag() { + List tags = + Arrays.asList( + "ws://localhost:8080/rsocket", "", String.join("", Collections.nCopies(256, "x"))); + TaggingMetadata taggingMetadata = + TaggingMetadataFlyweight.createTaggingMetadata( + byteBufAllocator, "message/x.rsocket.routing.v0", tags); + TaggingMetadata taggingMetadataCopy = + new TaggingMetadata("message/x.rsocket.routing.v0", taggingMetadata.getContent()); + assertThat(tags.subList(0, 1)) + .containsExactlyElementsOf(taggingMetadataCopy.stream().collect(Collectors.toList())); + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index fd48cd9d3..6298b0c3a 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -40,7 +41,9 @@ public class TestDuplexConnection implements DuplexConnection { private final LinkedBlockingQueue sent; private final DirectProcessor sentPublisher; + private final FluxSink sendSink; private final DirectProcessor received; + private final FluxSink receivedSink; private final MonoProcessor onClose; private final ConcurrentLinkedQueue> sendSubscribers; private volatile double availability = 1; @@ -49,7 +52,9 @@ public class TestDuplexConnection implements DuplexConnection { public TestDuplexConnection() { sent = new LinkedBlockingQueue<>(); received = DirectProcessor.create(); + receivedSink = received.sink(); sentPublisher = DirectProcessor.create(); + sendSink = sentPublisher.sink(); sendSubscribers = new ConcurrentLinkedQueue<>(); onClose = MonoProcessor.create(); } @@ -65,7 +70,7 @@ public Mono send(Publisher frames) { .doOnNext( frame -> { sent.offer(frame); - sentPublisher.onNext(frame); + sendSink.next(frame); }) .doOnError(throwable -> logger.error("Error in send stream on test connection.", throwable)) .subscribe(subscriber); @@ -116,7 +121,7 @@ public Publisher getSentAsPublisher() { public void addToReceivedBuffer(ByteBuf... received) { for (ByteBuf frame : received) { - this.received.onNext(frame); + this.receivedSink.next(frame); } } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java index 627b1d7da..19c29061b 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java @@ -29,6 +29,7 @@ import io.rsocket.RSocketFactory; import io.rsocket.plugins.DuplexConnectionInterceptor; import io.rsocket.plugins.RSocketInterceptor; +import io.rsocket.plugins.SocketAcceptorInterceptor; import io.rsocket.test.TestSubscriber; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.CloseableChannel; @@ -48,34 +49,52 @@ public class IntegrationTest { - private static final RSocketInterceptor clientPlugin; - private static final RSocketInterceptor serverPlugin; + private static final RSocketInterceptor requesterPlugin; + private static final RSocketInterceptor responderPlugin; + private static final SocketAcceptorInterceptor clientAcceptorPlugin; + private static final SocketAcceptorInterceptor serverAcceptorPlugin; private static final DuplexConnectionInterceptor connectionPlugin; - public static volatile boolean calledClient = false; - public static volatile boolean calledServer = false; + public static volatile boolean calledRequester = false; + public static volatile boolean calledResponder = false; + public static volatile boolean calledClientAcceptor = false; + public static volatile boolean calledServerAcceptor = false; public static volatile boolean calledFrame = false; static { - clientPlugin = + requesterPlugin = reactiveSocket -> new RSocketProxy(reactiveSocket) { @Override public Mono requestResponse(Payload payload) { - calledClient = true; + calledRequester = true; return reactiveSocket.requestResponse(payload); } }; - serverPlugin = + responderPlugin = reactiveSocket -> new RSocketProxy(reactiveSocket) { @Override public Mono requestResponse(Payload payload) { - calledServer = true; + calledResponder = true; return reactiveSocket.requestResponse(payload); } }; + clientAcceptorPlugin = + acceptor -> + (setup, sendingSocket) -> { + calledClientAcceptor = true; + return acceptor.accept(setup, sendingSocket); + }; + + serverAcceptorPlugin = + acceptor -> + (setup, sendingSocket) -> { + calledServerAcceptor = true; + return acceptor.accept(setup, sendingSocket); + }; + connectionPlugin = (type, connection) -> { calledFrame = true; @@ -95,11 +114,12 @@ public void startup() { requestCount = new AtomicInteger(); disconnectionCounter = new CountDownLatch(1); - TcpServerTransport serverTransport = TcpServerTransport.create(0); + TcpServerTransport serverTransport = TcpServerTransport.create("localhost", 0); server = RSocketFactory.receive() - .addServerPlugin(serverPlugin) + .addResponderPlugin(responderPlugin) + .addSocketAcceptorPlugin(serverAcceptorPlugin) .addConnectionPlugin(connectionPlugin) .errorConsumer( t -> { @@ -138,7 +158,8 @@ public Flux requestChannel(Publisher payloads) { client = RSocketFactory.connect() - .addClientPlugin(clientPlugin) + .addRequesterPlugin(requesterPlugin) + .addSocketAcceptorPlugin(clientAcceptorPlugin) .addConnectionPlugin(connectionPlugin) .transport(TcpClientTransport.create(server.address())) .start() @@ -154,8 +175,10 @@ public void teardown() { public void testRequest() { client.requestResponse(DefaultPayload.create("REQUEST", "META")).block(); assertThat("Server did not see the request.", requestCount.get(), is(1)); - assertTrue(calledClient); - assertTrue(calledServer); + assertTrue(calledRequester); + assertTrue(calledResponder); + assertTrue(calledClientAcceptor); + assertTrue(calledServerAcceptor); assertTrue(calledFrame); } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java index 6c8f0e8fa..7a30a7fd1 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java @@ -21,7 +21,7 @@ public class InteractionsLoadTest { @Test @SlowTest public void channel() { - TcpServerTransport serverTransport = TcpServerTransport.create(0); + TcpServerTransport serverTransport = TcpServerTransport.create("localhost", 0); CloseableChannel server = RSocketFactory.receive() diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java index 41e437fee..9e7f5b0a7 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java @@ -46,7 +46,7 @@ public class TcpIntegrationTest { @Before public void startup() { - TcpServerTransport serverTransport = TcpServerTransport.create(0); + TcpServerTransport serverTransport = TcpServerTransport.create("localhost", 0); server = RSocketFactory.receive() .acceptor((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler))) diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/FragmentTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/FragmentTest.java index 62d7da336..575993c18 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/FragmentTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/FragmentTest.java @@ -57,7 +57,7 @@ public void startup() { this.responseMessage = responseMessage.toString(); this.metaData = metaData.toString(); - TcpServerTransport serverTransport = TcpServerTransport.create(randomPort); + TcpServerTransport serverTransport = TcpServerTransport.create("localhost", randomPort); server = RSocketFactory.receive() .fragment(frameSize) diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPongServer.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPongServer.java index 53b164247..b40f35e51 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPongServer.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPongServer.java @@ -38,7 +38,7 @@ public static void main(String... args) { serverRSocketFactory .frameDecoder(PayloadDecoder.ZERO_COPY) .acceptor(new PingHandler()) - .transport(TcpServerTransport.create(port)) + .transport(TcpServerTransport.create("localhost", port)) .start() .block() .onClose() diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java index 84c185e26..b6cbfea34 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java @@ -70,7 +70,7 @@ void createNullTcpClient() { @DisplayName("creates server with port") @Test void createPort() { - assertThat(TcpServerTransport.create(8000)).isNotNull(); + assertThat(TcpServerTransport.create("localhost", 8000)).isNotNull(); } @DisplayName("creates client with TcpServer") @@ -97,7 +97,7 @@ void start() { @Test void startNullAcceptor() { assertThatNullPointerException() - .isThrownBy(() -> TcpServerTransport.create(8000).start(null, 0)) + .isThrownBy(() -> TcpServerTransport.create("localhost", 8000).start(null, 0)) .withMessage("acceptor must not be null"); } } diff --git a/settings.gradle b/settings.gradle index 16630076a..625633774 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,3 +23,4 @@ include 'rsocket-micrometer' include 'rsocket-test' include 'rsocket-transport-local' include 'rsocket-transport-netty' +include 'rsocket-bom'