From 30d3cf2fb189099801c0e34631fe791721ba7ddb Mon Sep 17 00:00:00 2001 From: Ryland Degnan Date: Thu, 1 Feb 2018 22:24:11 -0800 Subject: [PATCH] Update dependencies, cleanup RSocketClient/Server implementation (#466) * Delegate dispose/isDisposed to underlying channel in NettyDuplexConnection * Update dependencies * Update NonBlockingHashMap * Avoid unhandled exception in ClientServerInputMultiplexer --- build.gradle | 6 +- gradle.properties | 4 +- .../main/java/io/rsocket/RSocketClient.java | 100 ++------ .../main/java/io/rsocket/RSocketServer.java | 81 ++---- .../FragmentationDuplexConnection.java | 16 +- .../ClientServerInputMultiplexer.java | 14 +- .../io/rsocket/util/NonBlockingHashMap.java | 233 ++++++++++-------- rsocket-test/build.gradle | 2 +- rsocket-transport-netty/build.gradle | 6 +- .../netty/NettyDuplexConnection.java | 18 +- .../netty/WebsocketDuplexConnection.java | 18 +- .../netty/client/TcpClientTransport.java | 3 - 12 files changed, 201 insertions(+), 300 deletions(-) diff --git a/build.gradle b/build.gradle index bd892435b..00b468478 100644 --- a/build.gradle +++ b/build.gradle @@ -80,8 +80,8 @@ subprojects { } dependencies { - compile "io.projectreactor:reactor-core:3.1.2.RELEASE" - compile "io.netty:netty-buffer:4.1.17.Final" + compile "io.projectreactor:reactor-core:3.1.3.RELEASE" + compile "io.netty:netty-buffer:4.1.20.Final" compile "org.reactivestreams:reactive-streams:1.0.1" compile "org.slf4j:slf4j-api:1.7.25" compile "com.google.code.findbugs:jsr305:3.0.2" @@ -90,7 +90,7 @@ subprojects { testCompile "org.mockito:mockito-core:2.10.0" testCompile "org.hamcrest:hamcrest-library:1.3" testCompile "org.slf4j:slf4j-log4j12:1.7.25" - testCompile "io.projectreactor:reactor-test:3.1.2.RELEASE" + testCompile "io.projectreactor:reactor-test:3.1.3.RELEASE" } publishing { diff --git a/gradle.properties b/gradle.properties index 75e1b324a..78507c9a6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,6 +14,6 @@ # limitations under the License. # -mavenversion=0.9-SNAPSHOT +mavenversion=0.10-SNAPSHOT release.scope=patch -release.version=0.9-SNAPSHOT +release.version=0.10-SNAPSHOT diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 1805d3eb1..70b2593e0 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -29,7 +29,6 @@ import javax.annotation.Nullable; import java.time.Duration; -import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -122,14 +121,7 @@ class RSocketClient implements RSocket { } private void handleSendProcessorError(Throwable t) { - Collection> values; - Collection values1; - synchronized (RSocketClient.this) { - values = receivers.values(); - values1 = senders.values(); - } - - for (Subscriber subscriber : values) { + for (Subscriber subscriber : receivers.values()) { try { subscriber.onError(t); } catch (Throwable e) { @@ -137,7 +129,7 @@ private void handleSendProcessorError(Throwable t) { } } - for (LimitableRequestPublisher p : values1) { + for (LimitableRequestPublisher p : senders.values()) { p.cancel(); } } @@ -146,14 +138,8 @@ private void handleSendProcessorCancel(SignalType t) { if (SignalType.ON_ERROR == t) { return; } - Collection> values; - Collection values1; - synchronized (RSocketClient.this) { - values = receivers.values(); - values1 = senders.values(); - } - for (Subscriber subscriber : values) { + for (Subscriber subscriber : receivers.values()) { try { subscriber.onError(new Throwable("closed connection")); } catch (Throwable e) { @@ -161,7 +147,7 @@ private void handleSendProcessorCancel(SignalType t) { } } - for (LimitableRequestPublisher p : values1) { + for (LimitableRequestPublisher p : senders.values()) { p.cancel(); } } @@ -255,10 +241,7 @@ public Flux handleRequestStream(final Payload payload) { int streamId = streamIdSupplier.nextStreamId(); UnicastProcessor receiver = UnicastProcessor.create(); - - synchronized (this) { - receivers.put(streamId, receiver); - } + receivers.put(streamId, receiver); AtomicBoolean first = new AtomicBoolean(false); @@ -289,7 +272,7 @@ public Flux handleRequestStream(final Payload payload) { }) .doFinally( s -> { - removeReceiver(streamId); + receivers.remove(streamId); }); })); } @@ -304,10 +287,7 @@ private Mono handleRequestResponse(final Payload payload) { payload.release(); UnicastProcessor receiver = UnicastProcessor.create(); - - synchronized (this) { - receivers.put(streamId, receiver); - } + receivers.put(streamId, receiver); sendProcessor.onNext(requestFrame); @@ -317,7 +297,7 @@ private Mono handleRequestResponse(final Payload payload) { .doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId))) .doFinally( s -> { - removeReceiver(streamId); + receivers.remove(streamId); }); })); } @@ -364,10 +344,8 @@ public Flux get() { LimitableRequestPublisher.wrap(f); // Need to set this to one for first the frame wrapped.increaseRequestLimit(1); - synchronized (RSocketClient.this) { - senders.put(streamId, wrapped); - receivers.put(streamId, receiver); - } + senders.put(streamId, wrapped); + receivers.put(streamId, receiver); return wrapped; }) @@ -424,39 +402,32 @@ public Frame apply(Payload payload) { }) .doFinally( s -> { - removeReceiver(streamId); - removeSender(streamId); + receivers.remove(streamId); + senders.remove(streamId); }); } })); } private boolean contains(int streamId) { - synchronized (RSocketClient.this) { - return receivers.containsKey(streamId); - } + return receivers.containsKey(streamId); } protected void cleanup() { try { - Collection> subscribers; - Collection publishers; - synchronized (RSocketClient.this) { - subscribers = receivers.values(); - publishers = senders.values(); + for (UnicastProcessor subscriber: receivers.values()) { + cleanUpSubscriber(subscriber); + } + for (LimitableRequestPublisher p: senders.values()) { + cleanUpLimitableRequestPublisher(p); } - - subscribers.forEach(this::cleanUpSubscriber); - publishers.forEach(this::cleanUpLimitableRequestPublisher); if (null != keepAliveSendSub) { keepAliveSendSub.dispose(); } } finally { - synchronized (this) { - senders.clear(); - receivers.clear(); - } + senders.clear(); + receivers.clear(); } } @@ -513,17 +484,14 @@ private void handleStreamZero(FrameType type, Frame frame) { } private void handleFrame(int streamId, FrameType type, Frame frame) { - Subscriber receiver; - synchronized (this) { - receiver = receivers.get(streamId); - } + Subscriber receiver = receivers.get(streamId); if (receiver == null) { handleMissingResponseProcessor(streamId, type, frame); } else { switch (type) { case ERROR: receiver.onError(Exceptions.from(frame)); - removeReceiver(streamId); + receivers.remove(streamId); break; case NEXT_COMPLETE: receiver.onNext(frameDecoder.apply(frame)); @@ -531,11 +499,8 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { break; case CANCEL: { - LimitableRequestPublisher sender; - synchronized (this) { - sender = senders.remove(streamId); - removeReceiver(streamId); - } + LimitableRequestPublisher sender = senders.remove(streamId); + receivers.remove(streamId); if (sender != null) { sender.cancel(); } @@ -546,10 +511,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { break; case REQUEST_N: { - LimitableRequestPublisher sender; - synchronized (this) { - sender = senders.get(streamId); - } + LimitableRequestPublisher sender = senders.get(streamId); if (sender != null) { int n = Frame.RequestN.requestN(frame); sender.increaseRequestLimit(n); @@ -559,9 +521,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { } case COMPLETE: receiver.onComplete(); - synchronized (this) { - receivers.remove(streamId); - } + receivers.remove(streamId); break; default: throw new IllegalStateException( @@ -593,12 +553,4 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame // receiving a frame after a given stream has been cancelled/completed, // so ignore (cancellation is async so there is a race condition) } - - private synchronized void removeReceiver(int streamId) { - receivers.remove(streamId); - } - - private synchronized void removeSender(int streamId) { - senders.remove(streamId); - } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 6cb777e22..38067ae21 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -31,8 +31,6 @@ import reactor.core.publisher.SignalType; import reactor.core.publisher.UnicastProcessor; -import javax.annotation.Nullable; -import java.util.Collection; import java.util.function.Consumer; import java.util.function.Function; @@ -103,14 +101,7 @@ class RSocketServer implements RSocket { } private void handleSendProcessorError(Throwable t) { - Collection values; - Collection> values1; - synchronized (RSocketServer.this) { - values = sendingSubscriptions.values(); - values1 = channelProcessors.values(); - } - - for (Subscription subscription : values) { + for (Subscription subscription : sendingSubscriptions.values()) { try { subscription.cancel(); } catch (Throwable e) { @@ -118,7 +109,7 @@ private void handleSendProcessorError(Throwable t) { } } - for (UnicastProcessor subscription : values1) { + for (UnicastProcessor subscription : channelProcessors.values()) { try { subscription.cancel(); } catch (Throwable e) { @@ -131,14 +122,8 @@ private void handleSendProcessorCancel(SignalType t) { if (SignalType.ON_ERROR == t) { return; } - Collection values; - Collection> values1; - synchronized (RSocketServer.this) { - values = sendingSubscriptions.values(); - values1 = channelProcessors.values(); - } - for (Subscription subscription : values) { + for (Subscription subscription : sendingSubscriptions.values()) { try { subscription.cancel(); } catch (Throwable e) { @@ -146,7 +131,7 @@ private void handleSendProcessorCancel(SignalType t) { } } - for (UnicastProcessor subscription : values1) { + for (UnicastProcessor subscription : channelProcessors.values()) { try { subscription.cancel(); } catch (Throwable e) { @@ -262,25 +247,25 @@ private Mono handleFrame(Frame frame) { // leases. return Mono.empty(); case NEXT: - receiver = getChannelProcessor(streamId); + receiver = channelProcessors.get(streamId); if (receiver != null) { receiver.onNext(frameDecoder.apply(frame)); } return Mono.empty(); case COMPLETE: - receiver = getChannelProcessor(streamId); + receiver = channelProcessors.get(streamId); if (receiver != null) { receiver.onComplete(); } return Mono.empty(); case ERROR: - receiver = getChannelProcessor(streamId); + receiver = channelProcessors.get(streamId); if (receiver != null) { receiver.onError(new ApplicationException(Frame.Error.message(frame))); } return Mono.empty(); case NEXT_COMPLETE: - receiver = getChannelProcessor(streamId); + receiver = channelProcessors.get(streamId); if (receiver != null) { receiver.onNext(frameDecoder.apply(frame)); receiver.onComplete(); @@ -304,15 +289,15 @@ private Mono handleFrame(Frame frame) { private Mono handleFireAndForget(int streamId, Mono result) { return result - .doOnSubscribe(subscription -> addSubscription(streamId, subscription)) + .doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription)) .doOnError(errorConsumer) - .doFinally(signalType -> removeSubscription(streamId)) + .doFinally(signalType -> sendingSubscriptions.remove(streamId)) .ignoreElement(); } private Mono handleRequestResponse(int streamId, Mono response) { return response - .doOnSubscribe(subscription -> addSubscription(streamId, subscription)) + .doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription)) .map( payload -> { int flags = FLAGS_C; @@ -329,7 +314,7 @@ private Mono handleRequestResponse(int streamId, Mono response) { .doOnNext(sendProcessor::onNext) .doFinally( signalType -> { - removeSubscription(streamId); + sendingSubscriptions.remove(streamId); }) .then(); } @@ -345,16 +330,14 @@ private Mono handleStream(int streamId, Flux response, int initia .transform( frameFlux -> { LimitableRequestPublisher frames = LimitableRequestPublisher.wrap(frameFlux); - synchronized (RSocketServer.this) { - sendingSubscriptions.put(streamId, frames); - } + sendingSubscriptions.put(streamId, frames); frames.increaseRequestLimit(initialRequestN); return frames; }) .concatWith(Mono.just(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE))) .onErrorResume(t -> Mono.just(Frame.Error.from(streamId, t))) .doOnNext(sendProcessor::onNext) - .doFinally(signalType -> removeSubscription(streamId)) + .doFinally(signalType -> sendingSubscriptions.remove(streamId)) .subscribe(); return Mono.empty(); @@ -362,7 +345,7 @@ private Mono handleStream(int streamId, Flux response, int initia private Mono handleChannel(int streamId, Frame firstFrame) { UnicastProcessor frames = UnicastProcessor.create(); - addChannelProcessor(streamId, frames); + channelProcessors.put(streamId, frames); Flux payloads = frames @@ -378,7 +361,7 @@ private Mono handleChannel(int streamId, Frame firstFrame) { l -> { sendProcessor.onNext(Frame.RequestN.from(streamId, l)); }) - .doFinally(signalType -> removeChannelProcessor(streamId)); + .doFinally(signalType -> channelProcessors.remove(streamId)); // not chained, as the payload should be enqueued in the Unicast processor before this method // returns @@ -401,11 +384,7 @@ private Mono handleKeepAliveFrame(Frame frame) { private Mono handleCancelFrame(int streamId) { return Mono.fromRunnable( () -> { - Subscription subscription; - synchronized (this) { - subscription = sendingSubscriptions.remove(streamId); - } - + Subscription subscription = sendingSubscriptions.remove(streamId); if (subscription != null) { subscription.cancel(); } @@ -421,35 +400,11 @@ private Mono handleError(int streamId, Throwable t) { } private Mono handleRequestN(int streamId, Frame frame) { - final Subscription subscription = getSubscription(streamId); + final Subscription subscription = sendingSubscriptions.get(streamId); if (subscription != null) { int n = Frame.RequestN.requestN(frame); subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n); } return Mono.empty(); } - - private synchronized void addSubscription(int streamId, Subscription subscription) { - sendingSubscriptions.put(streamId, subscription); - } - - private synchronized @Nullable Subscription getSubscription(int streamId) { - return sendingSubscriptions.get(streamId); - } - - private synchronized void removeSubscription(int streamId) { - sendingSubscriptions.remove(streamId); - } - - private synchronized void addChannelProcessor(int streamId, UnicastProcessor processor) { - channelProcessors.put(streamId, processor); - } - - private synchronized @Nullable UnicastProcessor getChannelProcessor(int streamId) { - return channelProcessors.get(streamId); - } - - private synchronized void removeChannelProcessor(int streamId) { - channelProcessors.remove(streamId); - } } diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 30d9a3d8b..f500e9f6a 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -16,10 +16,10 @@ package io.rsocket.fragmentation; -import io.netty.util.collection.IntObjectHashMap; import io.rsocket.DuplexConnection; import io.rsocket.Frame; import io.rsocket.frame.FrameHeaderFlyweight; +import io.rsocket.util.NonBlockingHashMapLong; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,7 +28,7 @@ public class FragmentationDuplexConnection implements DuplexConnection { private final DuplexConnection source; - private final IntObjectHashMap frameReassemblers = new IntObjectHashMap<>(); + private final NonBlockingHashMapLong frameReassemblers = new NonBlockingHashMapLong<>(); private final FrameFragmenter frameFragmenter; public FragmentationDuplexConnection(DuplexConnection source, int mtu) { @@ -108,15 +108,19 @@ public Mono onClose() { }); } - private synchronized FrameReassembler getFrameReassembler(Frame frame) { - return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame)); + private FrameReassembler getFrameReassembler(Frame frame) { + FrameReassembler value, newValue; + int streamId = frame.getStreamId(); + return ((value = frameReassemblers.get(streamId)) == null && + (value = frameReassemblers.putIfAbsent(streamId, newValue = new FrameReassembler(frame))) == null) + ? newValue : value; } - private synchronized FrameReassembler removeFrameReassembler(int streamId) { + private FrameReassembler removeFrameReassembler(int streamId) { return frameReassemblers.remove(streamId); } - private synchronized boolean frameReassemblersContain(int streamId) { + private boolean frameReassemblersContain(int streamId) { return frameReassemblers.containsKey(streamId); } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index 0f9209462..6bc645f60 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -43,7 +43,7 @@ * client. */ public class ClientServerInputMultiplexer implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger"); + private static final Logger logger = LoggerFactory.getLogger("io.rsocket.FrameLogger"); private final DuplexConnection streamZeroConnection; private final DuplexConnection serverConnection; @@ -98,6 +98,10 @@ public ClientServerInputMultiplexer(DuplexConnection source, PluginRegistry plug client.onNext(group); break; } + }, + t -> { + logger.error("Error receiving frame:", t); + dispose(); }); } @@ -136,13 +140,13 @@ private static class InternalDuplexConnection implements DuplexConnection { public InternalDuplexConnection(DuplexConnection source, MonoProcessor> processor) { this.source = source; this.processor = processor; - this.debugEnabled = LOGGER.isDebugEnabled(); + this.debugEnabled = logger.isDebugEnabled(); } @Override public Mono send(Publisher frame) { if (debugEnabled) { - frame = Flux.from(frame).doOnNext(f -> LOGGER.debug("sending -> " + f.toString())); + frame = Flux.from(frame).doOnNext(f -> logger.debug("sending -> " + f.toString())); } return source.send(frame); @@ -151,7 +155,7 @@ public Mono send(Publisher frame) { @Override public Mono sendOne(Frame frame) { if (debugEnabled) { - LOGGER.debug("sending -> " + frame.toString()); + logger.debug("sending -> " + frame.toString()); } return source.sendOne(frame); @@ -162,7 +166,7 @@ public Flux receive() { return processor.flatMapMany( f -> { if (debugEnabled) { - return f.doOnNext(frame -> LOGGER.debug("receiving -> " + frame.toString())); + return f.doOnNext(frame -> logger.debug("receiving -> " + frame.toString())); } else { return f; } diff --git a/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMap.java b/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMap.java index 1a670b736..31671df18 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMap.java +++ b/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMap.java @@ -36,7 +36,7 @@ * 32-way Sun Niagara box, 8-way Intel box and a 4-way Power box. * * This class obeys the same functional specification as {@link - * Hashtable}, and includes versions of methods corresponding to + * java.util.Hashtable}, and includes versions of methods corresponding to * each method of Hashtable. However, even though all operations are * thread-safe, operations do not entail locking and there is * not any support for locking the entire table in a way that @@ -102,7 +102,7 @@ private static long rawIndex(final Object[] ary, final int idx) { static { // Field f = null; try { f = NonBlockingHashMap.class.getDeclaredField("_kvs"); } - catch( NoSuchFieldException e ) { throw new RuntimeException(e); } + catch( java.lang.NoSuchFieldException e ) { throw new RuntimeException(e); } _kvs_offset = UNSAFE.objectFieldOffset(f); } private final boolean CAS_kvs( final Object[] oldkvs, final Object[] newkvs ) { @@ -124,7 +124,7 @@ private static final int hash(final Object key) { int h = key.hashCode(); // The real hashCode call h ^= (h>>>20) ^ (h>>>12); h ^= (h>>> 7) ^ (h>>> 4); - h += h<<7; // smear low bits up high, for hashcodes that only differ by 1 + h += h<<7; // smear low bits up high, for hashcodes that only differ by 1 return h; } @@ -265,7 +265,7 @@ private static int reprobe_limit( int len ) { * appropriate size. Large numbers here when used with a small count of * elements will sacrifice space for a small amount of time gained. The * initial size will be rounded up internally to the next larger power of 2. */ - public NonBlockingHashMap(final int initial_sz ) { initialize(initial_sz); } + public NonBlockingHashMap( final int initial_sz ) { initialize(initial_sz); } private final void initialize( int initial_sz ) { if (initial_sz < 0) { throw new IllegalArgumentException("initial_sz: " + initial_sz + " (expected: >= 0)"); @@ -302,7 +302,7 @@ private final void initialize( int initial_sz ) { /** Legacy method testing if some key maps into the specified value in this * table. This method is identical in functionality to {@link * #containsValue}, and exists solely to ensure full compatibility with - * class {@link Hashtable}, which supported this method prior to + * class {@link java.util.Hashtable}, which supported this method prior to * introduction of the Java Collections framework. * @param val a value to search for * @return true if this map maps one or more keys to the specified value @@ -341,7 +341,9 @@ private final void initialize( int initial_sz ) { /** Atomically do a {@link #remove(Object)} if-and-only-if the key is mapped * to a value which is equals to the given value. * @throws NullPointerException if the specified key or value is null */ - public boolean remove ( Object key,Object val ) { return putIfMatch( key,TOMBSTONE, val ) == val; } + public boolean remove ( Object key,Object val ) { + return Objects.equals(putIfMatch( key,TOMBSTONE, val ), val); + } /** Atomically do a put(key,val) if-and-only-if the key is * mapped to some value already. @@ -354,7 +356,7 @@ private final void initialize( int initial_sz ) { * @throws NullPointerException if the specified key or value is null */ @Override public boolean replace ( TypeK key, TypeV oldValue, TypeV newValue ) { - return putIfMatch( key, newValue, oldValue ) == oldValue; + return Objects.equals(putIfMatch( key, newValue, oldValue ), oldValue); } @@ -392,7 +394,7 @@ public final TypeV putIfMatch( Object key, Object newVal, Object oldVal ) { * @param m mappings to be stored in this map */ @Override public void putAll(Map m) { - for (Entry e : m.entrySet()) + for (Map.Entry e : m.entrySet()) put(e.getKey(), e.getValue()); } @@ -495,20 +497,20 @@ public String toString() { // 'equals' v-call. private static boolean keyeq( Object K, Object key, int[] hashes, int hash, int fullhash ) { return - K==key || // Either keys match exactly OR - // hash exists and matches? hash can be zero during the install of a - // new key/value pair. - ((hashes[hash] == 0 || hashes[hash] == fullhash) && - // Do not call the users' "equals()" call with a Tombstone, as this can - // surprise poorly written "equals()" calls that throw exceptions - // instead of simply returning false. - K != TOMBSTONE && // Do not call users' equals call with a Tombstone - // Do the match the hard way - with the users' key being the loop- - // invariant "this" pointer. I could have flipped the order of - // operands (since equals is commutative), but I'm making mega-morphic - // v-calls in a re-probing loop and nailing down the 'this' argument - // gives both the JIT and the hardware a chance to prefetch the call target. - key.equals(K)); // Finally do the hard match + K==key || // Either keys match exactly OR + // hash exists and matches? hash can be zero during the install of a + // new key/value pair. + ((hashes[hash] == 0 || hashes[hash] == fullhash) && + // Do not call the users' "equals()" call with a Tombstone, as this can + // surprise poorly written "equals()" calls that throw exceptions + // instead of simply returning false. + K != TOMBSTONE && // Do not call users' equals call with a Tombstone + // Do the match the hard way - with the users' key being the loop- + // invariant "this" pointer. I could have flipped the order of + // operands (since equals is commutative), but I'm making mega-morphic + // v-calls in a re-probing loop and nailing down the 'this' argument + // gives both the JIT and the hardware a chance to prefetch the call target. + key.equals(K)); // Finally do the hard match } // --- get ----------------------------------------------------------------- @@ -528,7 +530,7 @@ public TypeV get( Object key ) { return (TypeV)V; } - private static final Object get_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key ) { + private static final Object get_impl( final NonBlockingHashMap topmap, final Object[] kvs, final Object key ) { final int fullhash= hash (key); // throws NullPointerException if key is null final int len = len (kvs); // Count of key/value pairs, reads kvs.length final CHM chm = chm (kvs); // The CHM, for a volatile read below; reads slot 0 of kvs @@ -585,7 +587,7 @@ public TypeK getk( TypeK key ) { return (TypeK)getk_impl(this,_kvs,key); } - private static final Object getk_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key ) { + private static final Object getk_impl( final NonBlockingHashMap topmap, final Object[] kvs, final Object key ) { final int fullhash= hash (key); // throws NullPointerException if key is null final int len = len (kvs); // Count of key/value pairs, reads kvs.length final CHM chm = chm (kvs); // The CHM, for a volatile read below; reads slot 0 of kvs @@ -634,7 +636,7 @@ private static final Object getk_impl(final NonBlockingHashMap topmap, final Obj // the path through copy_slot passes in an expected value of null, and // putIfMatch only returns a null if passed in an expected null. static volatile int DUMMY_VOLATILE; - private static final Object putIfMatch(final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final Object putval, final Object expVal ) { + private static final Object putIfMatch( final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final Object putval, final Object expVal ) { assert putval != null; assert !(putval instanceof Prime); assert !(expVal instanceof Prime); @@ -710,72 +712,87 @@ private static final Object putIfMatch(final NonBlockingHashMap topmap, final Ob idx = (idx+1)&(len-1); // Reprobe! } // End of spinning till we get a Key slot - // --- - // Found the proper Key slot, now update the matching Value slot. We - // never put a null, so Value slots monotonically move from null to - // not-null (deleted Values use Tombstone). Thus if 'V' is null we - // fail this fast cutout and fall into the check for table-full. - if( putval == V ) return V; // Fast cutout for no-change - - // See if we want to move to a new table (to avoid high average re-probe - // counts). We only check on the initial set of a Value from null to - // not-null (i.e., once per key-insert). Of course we got a 'free' check - // of newkvs once per key-compare (not really free, but paid-for by the - // time we get here). - if( newkvs == null && // New table-copy already spotted? - // Once per fresh key-insert check the hard way - ((V == null && chm.tableFull(reprobe_cnt,len)) || - // Or we found a Prime, but the JMM allowed reordering such that we - // did not spot the new table (very rare race here: the writing - // thread did a CAS of _newkvs then a store of a Prime. This thread - // reads the Prime, then reads _newkvs - but the read of Prime was so - // delayed (or the read of _newkvs was so accelerated) that they - // swapped and we still read a null _newkvs. The resize call below - // will do a CAS on _newkvs forcing the read. - V instanceof Prime) ) - newkvs = chm.resize(topmap,kvs); // Force the new table copy to start - // See if we are moving to a new table. - // If so, copy our slot and retry in the new table. - if( newkvs != null ) - return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); + while ( true ) { // Spin till we insert a value + // --- + // Found the proper Key slot, now update the matching Value slot. We + // never put a null, so Value slots monotonically move from null to + // not-null (deleted Values use Tombstone). Thus if 'V' is null we + // fail this fast cutout and fall into the check for table-full. + if( putval == V ) return V; // Fast cutout for no-change + + // See if we want to move to a new table (to avoid high average re-probe + // counts). We only check on the initial set of a Value from null to + // not-null (i.e., once per key-insert). Of course we got a 'free' check + // of newkvs once per key-compare (not really free, but paid-for by the + // time we get here). + if( newkvs == null && // New table-copy already spotted? + // Once per fresh key-insert check the hard way + ((V == null && chm.tableFull(reprobe_cnt,len)) || + // Or we found a Prime, but the JMM allowed reordering such that we + // did not spot the new table (very rare race here: the writing + // thread did a CAS of _newkvs then a store of a Prime. This thread + // reads the Prime, then reads _newkvs - but the read of Prime was so + // delayed (or the read of _newkvs was so accelerated) that they + // swapped and we still read a null _newkvs. The resize call below + // will do a CAS on _newkvs forcing the read. + V instanceof Prime) ) + newkvs = chm.resize(topmap,kvs); // Force the new table copy to start + // See if we are moving to a new table. + // If so, copy our slot and retry in the new table. + if( newkvs != null ) + return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); - // --- - // We are finally prepared to update the existing table - assert !(V instanceof Prime); - - // Must match old, and we do not? Then bail out now. Note that either V - // or expVal might be TOMBSTONE. Also V can be null, if we've never - // inserted a value before. expVal can be null if we are called from - // copy_slot. - - if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? - V != expVal && // No instant match already? - (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && - !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo - (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last - return V; // Do not update! - - // Actually change the Value in the Key,Value pair - if( CAS_val(kvs, idx, V, putval ) ) { - // CAS succeeded - we did the update! - // Both normal put's and table-copy calls putIfMatch, but table-copy - // does not (effectively) increase the number of live k/v pairs. - if( expVal != null ) { - // Adjust sizes - a striped counter - if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); - if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); - } - } else { // Else CAS failed + // --- + // We are finally prepared to update the existing table + assert !(V instanceof Prime); + + // Must match old, and we do not? Then bail out now. Note that either V + // or expVal might be TOMBSTONE. Also V can be null, if we've never + // inserted a value before. expVal can be null if we are called from + // copy_slot. + if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? + V != expVal && // No instant match already? + (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && + !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo + (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last + return V; // Do not update! + + // Actually change the Value in the Key,Value pair + if( CAS_val(kvs, idx, V, putval ) ) break; + + // CAS failed + // Because we have no witness, we do not know why it failed. + // Indeed, by the time we look again the value under test might have flipped + // a thousand times and now be the expected value (despite the CAS failing). + // Check for the never-succeed condition of a Prime value and jump to any + // nested table, or else just re-run. + + // We would not need this load at all if CAS returned the value on which + // the CAS failed (AKA witness). The new CAS semantics are supported via + // VarHandle in JDK9. V = val(kvs,idx); // Get new value + // If a Prime'd value got installed, we need to re-run the put on the // new table. Otherwise we lost the CAS to another racing put. - // Simply retry from the start. if( V instanceof Prime ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); + + // Simply retry from the start. + // NOTE: need the fence, since otherwise 'val(kvs,idx)' load could be hoisted + // out of loop. + int dummy = DUMMY_VOLATILE; } - // Win or lose the CAS, we are done. If we won then we know the update - // happened as expected. If we lost, it means "we won but another thread - // immediately stomped our update with no chance of a reader reading". + + // CAS succeeded - we did the update! + // Both normal put's and table-copy calls putIfMatch, but table-copy + // does not (effectively) increase the number of live k/v pairs. + if( expVal != null ) { + // Adjust sizes - a striped counter + if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); + if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); + } + + // We won; we know the update happened as expected. return (V==null && expVal!=null) ? TOMBSTONE : V; } @@ -825,7 +842,7 @@ private static final class CHM { // null to set (once). volatile Object[] _newkvs; private static final AtomicReferenceFieldUpdater _newkvsUpdater = - AtomicReferenceFieldUpdater.newUpdater(CHM.class,Object[].class, "_newkvs"); + AtomicReferenceFieldUpdater.newUpdater(CHM.class,Object[].class, "_newkvs"); // Set the _next field if we can. boolean CAS_newkvs( Object[] newkvs ) { while( _newkvs == null ) @@ -848,7 +865,7 @@ boolean CAS_newkvs( Object[] newkvs ) { // un-initialized array creation (especially of ref arrays!). volatile long _resizers; // count of threads attempting an initial resize private static final AtomicLongFieldUpdater _resizerUpdater = - AtomicLongFieldUpdater.newUpdater(CHM.class, "_resizers"); + AtomicLongFieldUpdater.newUpdater(CHM.class, "_resizers"); // --- // Simple constructor @@ -868,11 +885,11 @@ boolean CAS_newkvs( Object[] newkvs ) { // slots.estimate_sum >= max_reprobe_cnt >= reprobe_limit(len) private final boolean tableFull( int reprobe_cnt, int len ) { return - // Do the cheap check first: we allow some number of reprobes always - reprobe_cnt >= REPROBE_LIMIT && - (reprobe_cnt >= reprobe_limit(len) || - // More expensive check: see if the table is > 1/2 full. - _slots.estimate_get() >= (len>>1)); + // Do the cheap check first: we allow some number of reprobes always + reprobe_cnt >= REPROBE_LIMIT && + (reprobe_cnt >= reprobe_limit(len) || + // More expensive check: see if the table is > 1/2 full. + _slots.estimate_get() >= (len>>1)); } // --- resize ------------------------------------------------------------ @@ -881,7 +898,7 @@ private final boolean tableFull( int reprobe_cnt, int len ) { // Since this routine has a fast cutout for copy-already-started, callers // MUST 'help_copy' lest we have a path which forever runs through // 'resize' only to discover a copy-in-progress which never progresses. - private final Object[] resize(NonBlockingHashMap topmap, Object[] kvs) { + private final Object[] resize( NonBlockingHashMap topmap, Object[] kvs) { assert chm(kvs) == this; // Check for resize already in progress, probably triggered by another thread @@ -993,20 +1010,20 @@ private final Object[] resize(NonBlockingHashMap topmap, Object[] kvs) { // somewhere completes the count. volatile long _copyIdx = 0; static private final AtomicLongFieldUpdater _copyIdxUpdater = - AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyIdx"); + AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyIdx"); // Work-done reporting. Used to efficiently signal when we can move to // the new table. From 0 to len(oldkvs) refers to copying from the old // table to the new. volatile long _copyDone= 0; static private final AtomicLongFieldUpdater _copyDoneUpdater = - AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyDone"); + AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyDone"); // --- help_copy_impl ---------------------------------------------------- // Help along an existing resize operation. We hope its the top-level // copy (it was when we started) but this CHM might have been promoted out // of the top position. - private final void help_copy_impl(NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all ) { + private final void help_copy_impl( NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all ) { assert chm(oldkvs) == this; Object[] newkvs = _newkvs; assert newkvs != null; // Already checked by caller @@ -1069,7 +1086,7 @@ private final void help_copy_impl(NonBlockingHashMap topmap, Object[] oldkvs, bo // before any Prime appears. So the caller needs to read the _newkvs // field to retry his operation in the new table, but probably has not // read it yet. - private final Object[] copy_slot_and_check(NonBlockingHashMap topmap, Object[] oldkvs, int idx, Object should_help ) { + private final Object[] copy_slot_and_check( NonBlockingHashMap topmap, Object[] oldkvs, int idx, Object should_help ) { assert chm(oldkvs) == this; Object[] newkvs = _newkvs; // VOLATILE READ // We're only here because the caller saw a Prime, which implies a @@ -1082,7 +1099,7 @@ private final Object[] copy_slot_and_check(NonBlockingHashMap topmap, Object[] o } // --- copy_check_and_promote -------------------------------------------- - private final void copy_check_and_promote(NonBlockingHashMap topmap, Object[] oldkvs, int workdone ) { + private final void copy_check_and_promote( NonBlockingHashMap topmap, Object[] oldkvs, int workdone ) { assert chm(oldkvs) == this; int oldlen = len(oldkvs); // We made a slot unusable and so did some of the needed copy work @@ -1116,7 +1133,7 @@ private final void copy_check_and_promote(NonBlockingHashMap topmap, Object[] ol // happened to the old table - so that any transition in the new table from // null to not-null must have been from a copy_slot (or other old-table // overwrite) and not from a thread directly writing in the new table. - private boolean copy_slot(NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs ) { + private boolean copy_slot( NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs ) { // Blindly set the key slot from null to TOMBSTONE, to eagerly stop // fresh put's from inserting new values in the old table when the old // table is mid-resize. We don't need to act on the results here, @@ -1306,7 +1323,7 @@ public Set keySet() { // Estimate size of array; be prepared to see more or fewer elements int sz = size(); T[] r = a.length >= sz ? a : - (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), sz); + (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), sz); // Fast efficient element walk. int j=0; for( int i=0; i keySet() { // --- entrySet ------------------------------------------------------------ // Warning: Each call to 'next' in this iterator constructs a new SimpleImmutableEntry. - private class SnapshotE implements Iterator> { + private class SnapshotE implements Iterator> { final SnapshotV _ss; public SnapshotE() { _ss = new SnapshotV(); } public void remove() { _ss.remove(); } - public Entry next() { _ss.next(); return new SimpleImmutableEntry<>((TypeK)_ss._prevK,_ss._prevV); } + public Map.Entry next() { _ss.next(); return new SimpleImmutableEntry<>((TypeK)_ss._prevK,_ss._prevV); } public boolean hasNext() { return _ss.hasNext(); } } @@ -1357,29 +1374,29 @@ private class SnapshotE implements Iterator> { * reflect any modifications subsequent to construction. * *

Warning: the iterator associated with this Set - * requires the creation of {@link Entry} objects with each + * requires the creation of {@link java.util.Map.Entry} objects with each * iteration. The {@link NonBlockingHashMap} does not normally create or - * using {@link Entry} objects so they will be created soley + * using {@link java.util.Map.Entry} objects so they will be created soley * to support this iteration. Iterating using {@link #keySet} or {@link * #values} will be more efficient. */ @Override - public Set> entrySet() { - return new AbstractSet>() { + public Set> entrySet() { + return new AbstractSet>() { @Override public void clear ( ) { NonBlockingHashMap.this.clear( ); } @Override public int size ( ) { return NonBlockingHashMap.this.size ( ); } @Override public boolean remove( final Object o ) { if( !(o instanceof Map.Entry)) return false; - final Entry e = (Entry)o; + final Map.Entry e = (Map.Entry)o; return NonBlockingHashMap.this.remove(e.getKey(), e.getValue()); } @Override public boolean contains(final Object o) { if( !(o instanceof Map.Entry)) return false; - final Entry e = (Entry)o; + final Map.Entry e = (Map.Entry)o; TypeV v = get(e.getKey()); return v != null && v.equals(e.getValue()); } - @Override public Iterator> iterator() { return new SnapshotE(); } + @Override public Iterator> iterator() { return new SnapshotE(); } }; } diff --git a/rsocket-test/build.gradle b/rsocket-test/build.gradle index 499308547..9cc85e899 100644 --- a/rsocket-test/build.gradle +++ b/rsocket-test/build.gradle @@ -20,5 +20,5 @@ dependencies { compile "org.mockito:mockito-core:2.10.0" compile "org.hamcrest:hamcrest-library:1.3" compile "org.hdrhistogram:HdrHistogram:2.1.9" - compile "io.projectreactor:reactor-test:3.1.2.RELEASE" + compile "io.projectreactor:reactor-test:3.1.3.RELEASE" } diff --git a/rsocket-transport-netty/build.gradle b/rsocket-transport-netty/build.gradle index 0fbd78623..f91adaaa6 100644 --- a/rsocket-transport-netty/build.gradle +++ b/rsocket-transport-netty/build.gradle @@ -16,11 +16,7 @@ dependencies { compile project(':rsocket-core') - compile "io.projectreactor.ipc:reactor-netty:0.7.2.RELEASE" - compile "io.netty:netty-handler:4.1.19.Final" - compile "io.netty:netty-handler-proxy:4.1.19.Final" - compile "io.netty:netty-codec-http:4.1.19.Final" - compile "io.netty:netty-transport-native-epoll:4.1.19.Final" + compile "io.projectreactor.ipc:reactor-netty:0.7.3.RELEASE" testCompile project(':rsocket-test') } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java index 22d27d17f..80902a8b6 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java @@ -20,7 +20,6 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; @@ -29,22 +28,11 @@ public class NettyDuplexConnection implements DuplexConnection { private final NettyInbound in; private final NettyOutbound out; private final NettyContext context; - private final MonoProcessor onClose; public NettyDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) { this.in = in; this.out = out; this.context = context; - this.onClose = MonoProcessor.create(); - - context.onClose(onClose::onComplete); - this.onClose - .doFinally( - s -> { - this.context.dispose(); - this.context.channel().close(); - }) - .subscribe(); } @Override @@ -64,16 +52,16 @@ public Flux receive() { @Override public void dispose() { - onClose.onComplete(); + context.dispose(); } @Override public boolean isDisposed() { - return onClose.isDisposed(); + return context.isDisposed(); } @Override public Mono onClose() { - return onClose; + return context.onClose(); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index e17567bec..d4adb9096 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -27,7 +27,6 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; @@ -43,22 +42,11 @@ public class WebsocketDuplexConnection implements DuplexConnection { private final NettyInbound in; private final NettyOutbound out; private final NettyContext context; - private final MonoProcessor onClose; public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) { this.in = in; this.out = out; this.context = context; - this.onClose = MonoProcessor.create(); - - context.onClose(onClose::onComplete); - this.onClose - .doFinally( - s -> { - this.context.dispose(); - this.context.channel().close(); - }) - .subscribe(); } @Override @@ -87,16 +75,16 @@ public Flux receive() { @Override public void dispose() { - onClose.onComplete(); + context.dispose(); } @Override public boolean isDisposed() { - return onClose.isDisposed(); + return context.isDisposed(); } @Override public Mono onClose() { - return onClose; + return context.onClose(); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java index 8a17b9b09..eb6b766df 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java @@ -21,13 +21,10 @@ import io.rsocket.transport.netty.NettyDuplexConnection; import io.rsocket.transport.netty.RSocketLengthCodec; import java.net.InetSocketAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.ipc.netty.tcp.TcpClient; public class TcpClientTransport implements ClientTransport { - private final Logger logger = LoggerFactory.getLogger(TcpClientTransport.class); private final TcpClient client; private TcpClientTransport(TcpClient client) {