Skip to content

Commit

Permalink
Release 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka committed May 12, 2020
2 parents f1c3198 + 8cb9e74 commit c019173
Show file tree
Hide file tree
Showing 212 changed files with 5,384 additions and 9,315 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ repositories {
mavenCentral()
}
dependencies {
implementation 'io.rsocket:rsocket-core:1.0.0-RC6'
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC6'
implementation 'io.rsocket:rsocket-core:1.0.0-RC7'
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC7'
}
```

Expand All @@ -40,8 +40,8 @@ repositories {
maven { url 'https://oss.jfrog.org/oss-snapshot-local' }
}
dependencies {
implementation 'io.rsocket:rsocket-core:1.0.0-RC7-SNAPSHOT'
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC7-SNAPSHOT'
implementation 'io.rsocket:rsocket-core:1.0.0-RC8-SNAPSHOT'
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC8-SNAPSHOT'
}
```

Expand Down Expand Up @@ -120,7 +120,6 @@ RSocket clientRSocket =
// Enable Zero Copy
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create(7878))
.start()
.block();
```

Expand Down
6 changes: 5 additions & 1 deletion benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ task jmhBaseline(type: JmhExecTask, description: 'Executing JMH baseline benchma
classpath = sourceSets.main.runtimeClasspath + configurations.baseline
}

clean {
delete "${projectDir}/src/main/generated"
}

class JmhExecTask extends JavaExec {

private String include;
Expand Down Expand Up @@ -160,4 +164,4 @@ class JmhExecTask extends JavaExec {

super.exec();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.rsocket;

package io.rsocket.core;

import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.PayloadsMaxPerfSubscriber;
import io.rsocket.PayloadsPerfSubscriber;
import io.rsocket.RSocket;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.local.LocalServerTransport;
Expand Down Expand Up @@ -59,9 +65,7 @@ public void awaitToBeConsumed() {
@Setup
public void setUp() throws NoSuchFieldException, IllegalAccessException {
server =
RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(
RSocketServer.create(
(setup, sendingSocket) ->
Mono.just(
new AbstractRSocket() {
Expand Down Expand Up @@ -89,16 +93,14 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads);
}
}))
.transport(LocalServerTransport.create("server"))
.start()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(LocalServerTransport.create("server"))
.block();

client =
RSocketFactory.connect()
.singleSubscriberRequester()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(LocalClientTransport.create("server"))
.start()
RSocketConnector.create()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(LocalClientTransport.create("server"))
.block();

Field sendProcessorField = RSocketRequester.class.getDeclaredField("sendProcessor");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package io.rsocket;
package io.rsocket.core;

import io.netty.util.collection.IntObjectMap;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@BenchmarkMode(Mode.Throughput)
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ subprojects {

dependencies {
dependency "ch.qos.logback:logback-classic:${ext['logback.version']}"
dependency "com.google.code.findbugs:jsr305:${ext['findbugs.version']}"
dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}"
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
version=1.0.0-RC7
perfBaselineVersion=1.0.0-RC6
version=1.0.0
perfBaselineVersion=1.0.0-RC7
2 changes: 0 additions & 2 deletions rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ dependencies {

implementation 'org.slf4j:slf4j-api'

compileOnly 'com.google.code.findbugs:jsr305'

testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter-api'
Expand Down
35 changes: 4 additions & 31 deletions rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,21 @@

package io.rsocket;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/**
* An abstract implementation of {@link RSocket}. All request handling methods emit {@link
* UnsupportedOperationException} and hence must be overridden to provide a valid implementation.
*
* @deprecated as of 1.0 in favor of implementing {@link RSocket} directly which has default
* methods.
*/
@Deprecated
public abstract class AbstractRSocket implements RSocket {

private final MonoProcessor<Void> onClose = MonoProcessor.create();

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
}

@Override
public Mono<Void> metadataPush(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
}

@Override
public void dispose() {
onClose.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.AbstractReferenceCounted;
import io.rsocket.core.DefaultConnectionSetupPayload;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* Exposes information from the {@code SETUP} frame to a server, as well as to client responders.
Expand Down
37 changes: 32 additions & 5 deletions rsocket-core/src/main/java/io/rsocket/RSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public interface RSocket extends Availability, Closeable {
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
* handled, otherwise errors.
*/
Mono<Void> fireAndForget(Payload payload);
default Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Fire-and-Forget not implemented."));
}

/**
* Request-Response interaction model of {@code RSocket}.
Expand All @@ -42,23 +45,31 @@ public interface RSocket extends Availability, Closeable {
* @return {@code Publisher} containing at most a single {@code Payload} representing the
* response.
*/
Mono<Payload> requestResponse(Payload payload);
default Mono<Payload> requestResponse(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
}

/**
* Request-Stream interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
*/
Flux<Payload> requestStream(Payload payload);
default Flux<Payload> requestStream(Payload payload) {
payload.release();
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
}

/**
* Request-Channel interaction model of {@code RSocket}.
*
* @param payloads Stream of request payloads.
* @return Stream of response payloads.
*/
Flux<Payload> requestChannel(Publisher<Payload> payloads);
default Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
}

/**
* Metadata-Push interaction model of {@code RSocket}.
Expand All @@ -67,10 +78,26 @@ public interface RSocket extends Availability, Closeable {
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
* handled, otherwise errors.
*/
Mono<Void> metadataPush(Payload payload);
default Mono<Void> metadataPush(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
}

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
}

@Override
default void dispose() {}

@Override
default boolean isDisposed() {
return false;
}

@Override
default Mono<Void> onClose() {
return Mono.never();
}
}
20 changes: 14 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
private Resume resume;

public ClientRSocketFactory() {
this(RSocketConnector.create().errorConsumer(Throwable::printStackTrace));
this(RSocketConnector.create());
}

public ClientRSocketFactory(RSocketConnector connector) {
Expand Down Expand Up @@ -393,9 +393,13 @@ public ClientRSocketFactory fragment(int mtu) {
return this;
}

/** @deprecated this is deprecated with no replacement. */
/**
* @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In
* order to observe errors, it is recommended to add error handler using {@code doOnError}
* on the specific logical stream. In order to observe connection, or RSocket terminal
* errors, it is recommended to hook on {@link Closeable#onClose()} handler.
*/
public ClientRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
connector.errorConsumer(errorConsumer);
return this;
}

Expand All @@ -417,7 +421,7 @@ public static class ServerRSocketFactory implements ServerTransportAcceptor {
private Resume resume;

public ServerRSocketFactory() {
this(RSocketServer.create().errorConsumer(Throwable::printStackTrace));
this(RSocketServer.create());
}

public ServerRSocketFactory(RSocketServer server) {
Expand Down Expand Up @@ -497,9 +501,13 @@ public ServerRSocketFactory fragment(int mtu) {
return this;
}

/** @deprecated this is deprecated with no replacement. */
/**
* @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In
* order to observe errors, it is recommended to add error handler using {@code doOnError}
* on the specific logical stream. In order to observe connection, or RSocket terminal
* errors, it is recommended to hook on {@link Closeable#onClose()} handler.
*/
public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
server.errorConsumer(errorConsumer);
return this;
}

Expand Down
5 changes: 5 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.rsocket;

import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/**
* Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a
* channel.
*
* @deprecated as of 1.0 RC7 in favor of using {@link RSocket#requestChannel(Publisher)} with {@link
* Flux#switchOnFirst(BiFunction)}
*/
@Deprecated
public interface ResponderRSocket extends RSocket {
/**
* Implement this method to peak at the first payload of the incoming request stream without
Expand Down
Loading

0 comments on commit c019173

Please sign in to comment.