From dfc0ea2d023a99e261bacb1dc157b2303ee1afdd Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 12 Dec 2023 16:44:03 -0800 Subject: [PATCH 01/17] Add java client connection layer. Signed-off-by: Yury-Fridlyand --- java/client/build.gradle | 16 ++- .../BabushkaCoreNativeDefinitions.java | 11 -- .../babushka/connectors/SocketConnection.java | 134 ++++++++++++++++++ .../connectors/handlers/ChannelBuilder.java | 26 ++++ .../connectors/handlers/ChannelHandler.java | 44 ++++++ .../connectors/handlers/ReadHandler.java | 45 ++++++ .../connectors/handlers/WriteHandler.java | 23 +++ .../BabushkaCoreNativeDefinitions.java | 25 ++++ .../babushka/managers/CallbackManager.java | 64 +++++++++ java/src/lib.rs | 6 +- 10 files changed, 378 insertions(+), 16 deletions(-) delete mode 100644 java/client/src/main/java/babushka/BabushkaCoreNativeDefinitions.java create mode 100644 java/client/src/main/java/babushka/connectors/SocketConnection.java create mode 100644 java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java create mode 100644 java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java create mode 100644 java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java create mode 100644 java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java create mode 100644 java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java create mode 100644 java/client/src/main/java/babushka/managers/CallbackManager.java diff --git a/java/client/build.gradle b/java/client/build.gradle index 4f73647f0c..be9120cf3f 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -18,15 +18,25 @@ dependencies { implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: '4.1.100.Final', classifier: 'linux-x86_64' implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-x86_64' implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-aarch_64' + + //lombok + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' + testCompileOnly 'org.projectlombok:lombok:1.18.30' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.30' + + // junit + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' } tasks.register('protobuf', Exec) { doFirst { - project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/babushka/protobuf').toString()) + project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/babushka/models/protobuf').toString()) } commandLine 'protoc', '-Iprotobuf=babushka-core/src/protobuf/', - '--java_out=java/client/src/main/java/babushka/protobuf', + '--java_out=java/client/src/main/java/babushka/models/protobuf', 'babushka-core/src/protobuf/connection_request.proto', 'babushka-core/src/protobuf/redis_request.proto', 'babushka-core/src/protobuf/response.proto' @@ -35,7 +45,7 @@ tasks.register('protobuf', Exec) { tasks.register('cleanProtobuf') { doFirst { - project.delete(Paths.get(project.projectDir.path, 'src/main/java/babushka/protobuf').toString()) + project.delete(Paths.get(project.projectDir.path, 'src/main/java/babushka/models/protobuf').toString()) } } diff --git a/java/client/src/main/java/babushka/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/babushka/BabushkaCoreNativeDefinitions.java deleted file mode 100644 index a16871b99a..0000000000 --- a/java/client/src/main/java/babushka/BabushkaCoreNativeDefinitions.java +++ /dev/null @@ -1,11 +0,0 @@ -package babushka; - -public class BabushkaCoreNativeDefinitions { - public static native String startSocketListenerExternal() throws Exception; - - public static native Object valueFromPointer(long pointer); - - static { - System.loadLibrary("javababushka"); - } -} diff --git a/java/client/src/main/java/babushka/connectors/SocketConnection.java b/java/client/src/main/java/babushka/connectors/SocketConnection.java new file mode 100644 index 0000000000..cd41e00ced --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/SocketConnection.java @@ -0,0 +1,134 @@ +package babushka.connectors; + +import babushka.connectors.handlers.ChannelBuilder; +import babushka.connectors.handlers.ChannelHandler; +import babushka.managers.CallbackManager; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.util.concurrent.DefaultThreadFactory; + +public class SocketConnection { + + /** Thread pool supplied to Netty to perform all async IO. */ + private EventLoopGroup group; + + /** The singleton instance. */ + private static SocketConnection INSTANCE = null; + + private static String socketPath; + + public static void setSocketPath(String socketPath) { + if (SocketConnection.socketPath == null) { + SocketConnection.socketPath = socketPath; + return; + } + throw new RuntimeException("socket path can only be declared once"); + } + + /** + * Creates (if not yet created) and returns the singleton instance of the {@link + * SocketConnection}. + * + * @return a {@link SocketConnection} instance. + */ + public static synchronized SocketConnection getInstance() { + if (INSTANCE == null) { + assert socketPath != null : "socket path must be defined"; + INSTANCE = new SocketConnection(); + } + return INSTANCE; + } + + // At the moment, Windows is not supported + // Probably we should use NIO (NioEventLoopGroup) for Windows. + private static final boolean isMacOs = isKQueueAvailable(); + + // TODO support IO-Uring and NIO + /** + * Detect platform to identify which native implementation to use for UDS interaction. Currently + * supported platforms are: Linux and macOS.
+ * Subject to change in future to support more platforms and implementations. + */ + private static boolean isKQueueAvailable() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + return KQueue.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + /** Constructor for the single instance. */ + private SocketConnection() { + try { + int cpuCount = Runtime.getRuntime().availableProcessors(); + group = + isMacOs + ? new KQueueEventLoopGroup( + cpuCount, new DefaultThreadFactory("SocketConnection-kqueue-elg", true)) + : new EpollEventLoopGroup( + cpuCount, new DefaultThreadFactory("SocketConnection-epoll-elg", true)); + } catch (Exception e) { + System.err.printf( + "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); + e.printStackTrace(System.err); + } + } + + /** Open a new channel for a new client. */ + public ChannelHandler openNewChannel(CallbackManager callbackManager) { + try { + Channel channel = + new Bootstrap() + .group(group) + .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) + .handler(new ChannelBuilder(callbackManager)) + .connect(new DomainSocketAddress(socketPath)) + .sync() + .channel(); + return new ChannelHandler(channel, callbackManager); + } catch (InterruptedException e) { + System.err.printf( + "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); + e.printStackTrace(System.err); + throw new RuntimeException(e); + } + } + + /** + * Closes the UDS connection and frees corresponding resources. A consecutive call to {@link + * #getInstance()} will create a new connection with new resource pool. + */ + public void close() { + group.shutdownGracefully(); + INSTANCE = null; + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources by calling {@link #close()}. It is recommended to use a class instead of lambda to + * ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + private static class ShutdownHook implements Runnable { + @Override + public void run() { + if (INSTANCE != null) { + INSTANCE.close(); + INSTANCE = null; + } + } + } + + static { + Runtime.getRuntime() + .addShutdownHook(new Thread(new ShutdownHook(), "SocketConnection-shutdown-hook")); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java new file mode 100644 index 0000000000..4819254499 --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java @@ -0,0 +1,26 @@ +package babushka.connectors.handlers; + +import babushka.managers.CallbackManager; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.unix.UnixChannel; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +/** Builder for the channel used by {@link babushka.connectors.SocketConnection}. */ +@RequiredArgsConstructor +public class ChannelBuilder extends ChannelInitializer { + + private final CallbackManager callbackManager; + + @Override + public void initChannel(@NonNull UnixChannel ch) { + ch.pipeline() + // https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html + .addLast("protobufDecoder", new ProtobufVarint32FrameDecoder()) + .addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ReadHandler(callbackManager)) + .addLast(new WriteHandler()); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java new file mode 100644 index 0000000000..06a322baad --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -0,0 +1,44 @@ +package babushka.connectors.handlers; + +import babushka.managers.CallbackManager; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import io.netty.channel.Channel; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import redis_request.RedisRequestOuterClass.RedisRequest; +import response.ResponseOuterClass.Response; + +/** + * Class responsible for manipulations with Netty's {@link Channel}.
+ * Uses a {@link CallbackManager} to record callbacks of every request sent. + */ +@RequiredArgsConstructor +public class ChannelHandler { + private final Channel channel; + private final CallbackManager callbackManager; + + /** Write a protobuf message to the socket. */ + public CompletableFuture write(RedisRequest.Builder request, boolean flush) { + var commandId = callbackManager.registerRequest(); + request.setCallbackIdx(commandId.getKey()); + + if (flush) { + channel.writeAndFlush(request.build().toByteArray()); + } else { + channel.write(request.build().toByteArray()); + } + return commandId.getValue(); + } + + /** Write a protobuf message to the socket. */ + public CompletableFuture connect(ConnectionRequest request) { + channel.writeAndFlush(request.toByteArray()); + return callbackManager.getConnectionPromise(); + } + + /** Closes the UDS connection and frees corresponding resources. */ + public void close() { + channel.close(); + callbackManager.shutdownGracefully(); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java new file mode 100644 index 0000000000..05cbbc8cfe --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java @@ -0,0 +1,45 @@ +package babushka.connectors.handlers; + +import babushka.managers.CallbackManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import response.ResponseOuterClass; + +/** Handler for inbound traffic though UDS. Used by Netty. */ +@RequiredArgsConstructor +public class ReadHandler extends ChannelInboundHandlerAdapter { + + private final CallbackManager callbackManager; + + /** + * Handles responses from babushka core: + * + *
    + *
  1. Copy to a buffer; + *
  2. Parse protobuf packet; + *
  3. Find and resolve a corresponding future; + *
+ */ + @Override + public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) + throws Exception { + var buf = (ByteBuf) msg; + var bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + // TODO surround parsing with try-catch, set error to future if parsing failed. + var response = ResponseOuterClass.Response.parseFrom(bytes); + callbackManager.completeRequest(response); + buf.release(); + } + + /** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.printf("=== exceptionCaught %s %s %n", ctx, cause); + cause.printStackTrace(System.err); + super.exceptionCaught(ctx, cause); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java b/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java new file mode 100644 index 0000000000..7e36f49d9b --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java @@ -0,0 +1,23 @@ +package babushka.connectors.handlers; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +/** Handler for outbound traffic though UDS. Used by Netty. */ +public class WriteHandler extends ChannelOutboundHandlerAdapter { + /** + * Converts objects submitted to {@link Channel#write(Object)} and {@link + * Channel#writeAndFlush(Object)} to a {@link ByteBuf}. + */ + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + var bytes = (byte[]) msg; + + super.write(ctx, Unpooled.copiedBuffer(bytes), promise); + } +} diff --git a/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java new file mode 100644 index 0000000000..6d4ec45121 --- /dev/null +++ b/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java @@ -0,0 +1,25 @@ +package babushka.ffi.resolvers; + +public class BabushkaCoreNativeDefinitions { + public static native String startSocketListenerExternal() throws Exception; + + public static native Object valueFromPointer(long pointer); + + static { + System.loadLibrary("javababushka"); + } + + /** + * Make an FFI call to obtain the socket path. + * + * @return A UDS path. + */ + public static String getSocket() { + try { + return startSocketListenerExternal(); + } catch (Exception | UnsatisfiedLinkError e) { + System.err.printf("Failed to create a UDS connection: %s%n%n", e); + throw new RuntimeException(e); + } + } +} diff --git a/java/client/src/main/java/babushka/managers/CallbackManager.java b/java/client/src/main/java/babushka/managers/CallbackManager.java new file mode 100644 index 0000000000..34d95d607c --- /dev/null +++ b/java/client/src/main/java/babushka/managers/CallbackManager.java @@ -0,0 +1,64 @@ +package babushka.managers; + +import babushka.connectors.handlers.ReadHandler; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; +import org.apache.commons.lang3.tuple.Pair; +import response.ResponseOuterClass.Response; + +/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */ +public class CallbackManager { + /** Unique request ID (callback ID). Thread-safe. */ + private final AtomicInteger requestId = new AtomicInteger(0); + + /** + * Storage of Futures to handle responses. Map key is callback id, which starts from 1.
+ * Each future is a promise for every submitted by user request. + */ + private final Map> responses = new ConcurrentHashMap<>(); + + /** + * Storage for connection request similar to {@link #responses}. Unfortunately, connection + * requests can't be stored in the same storage, because callback ID = 0 is hardcoded for + * connection requests. + */ + @Getter private final CompletableFuture connectionPromise = new CompletableFuture<>(); + + /** + * Register a new request to be sent. Once response received, the given future completes with it. + * + * @return A pair of unique callback ID which should set into request and a client promise for + * response. + */ + public Pair> registerRequest() { + int callbackId = requestId.incrementAndGet(); + var future = new CompletableFuture(); + responses.put(callbackId, future); + return Pair.of(callbackId, future); + } + + /** + * Complete the corresponding client promise and free resources. + * + * @param response A response received + */ + public void completeRequest(Response response) { + int callbackId = response.getCallbackIdx(); + if (callbackId == 0) { + connectionPromise.completeAsync(() -> response); + } else { + responses.get(callbackId).completeAsync(() -> response); + responses.remove(callbackId); + } + } + + public void shutdownGracefully() { + connectionPromise.completeExceptionally(new InterruptedException()); + responses.forEach( + (callbackId, future) -> future.completeExceptionally(new InterruptedException())); + responses.clear(); + } +} diff --git a/java/src/lib.rs b/java/src/lib.rs index 3cd7bc5ed7..13577f0805 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -42,7 +42,9 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject { } #[no_mangle] -pub extern "system" fn Java_babushka_BabushkaCoreNativeDefinitions_valueFromPointer<'local>( +pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_valueFromPointer< + 'local, +>( env: JNIEnv<'local>, _class: JClass<'local>, pointer: jlong, @@ -52,7 +54,7 @@ pub extern "system" fn Java_babushka_BabushkaCoreNativeDefinitions_valueFromPoin } #[no_mangle] -pub extern "system" fn Java_babushka_BabushkaCoreNativeDefinitions_startSocketListenerExternal< +pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_startSocketListenerExternal< 'local, >( env: JNIEnv<'local>, From ce166b1d6634dbbcdd4f6b6e812505cc6d26fabd Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 13 Dec 2023 12:35:09 -0800 Subject: [PATCH 02/17] Optimize protobuf and netty integration. Signed-off-by: Yury-Fridlyand --- .../connectors/handlers/ChannelBuilder.java | 12 +++++++--- .../connectors/handlers/ChannelHandler.java | 6 ++--- .../connectors/handlers/ReadHandler.java | 24 ++++--------------- .../connectors/handlers/WriteHandler.java | 23 ------------------ 4 files changed, 16 insertions(+), 49 deletions(-) delete mode 100644 java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java index 4819254499..157aa4f632 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java @@ -2,11 +2,15 @@ import babushka.managers.CallbackManager; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.unix.UnixChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import response.ResponseOuterClass.Response; /** Builder for the channel used by {@link babushka.connectors.SocketConnection}. */ @RequiredArgsConstructor @@ -18,9 +22,11 @@ public class ChannelBuilder extends ChannelInitializer { public void initChannel(@NonNull UnixChannel ch) { ch.pipeline() // https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html - .addLast("protobufDecoder", new ProtobufVarint32FrameDecoder()) - .addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender()) + .addLast("frameDecoder", new ProtobufVarint32FrameDecoder()) + .addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()) + .addLast("protobufDecoder", new ProtobufDecoder(Response.getDefaultInstance())) + .addLast("protobufEncoder", new ProtobufEncoder()) .addLast(new ReadHandler(callbackManager)) - .addLast(new WriteHandler()); + .addLast(new ChannelOutboundHandlerAdapter()); } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 06a322baad..1942b3033b 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -23,16 +23,16 @@ public CompletableFuture write(RedisRequest.Builder request, boolean f request.setCallbackIdx(commandId.getKey()); if (flush) { - channel.writeAndFlush(request.build().toByteArray()); + channel.writeAndFlush(request.build()); } else { - channel.write(request.build().toByteArray()); + channel.write(request.build()); } return commandId.getValue(); } /** Write a protobuf message to the socket. */ public CompletableFuture connect(ConnectionRequest request) { - channel.writeAndFlush(request.toByteArray()); + channel.writeAndFlush(request); return callbackManager.getConnectionPromise(); } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java index 05cbbc8cfe..711119aca8 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java @@ -1,12 +1,11 @@ package babushka.connectors.handlers; import babushka.managers.CallbackManager; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import response.ResponseOuterClass; +import response.ResponseOuterClass.Response; /** Handler for inbound traffic though UDS. Used by Netty. */ @RequiredArgsConstructor @@ -14,25 +13,10 @@ public class ReadHandler extends ChannelInboundHandlerAdapter { private final CallbackManager callbackManager; - /** - * Handles responses from babushka core: - * - *
    - *
  1. Copy to a buffer; - *
  2. Parse protobuf packet; - *
  3. Find and resolve a corresponding future; - *
- */ + /** Submit responses from babushka to an instance {@link CallbackManager} to handle them. */ @Override - public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) - throws Exception { - var buf = (ByteBuf) msg; - var bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - // TODO surround parsing with try-catch, set error to future if parsing failed. - var response = ResponseOuterClass.Response.parseFrom(bytes); - callbackManager.completeRequest(response); - buf.release(); + public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) { + callbackManager.completeRequest((Response) msg); } /** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */ diff --git a/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java b/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java deleted file mode 100644 index 7e36f49d9b..0000000000 --- a/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java +++ /dev/null @@ -1,23 +0,0 @@ -package babushka.connectors.handlers; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; - -/** Handler for outbound traffic though UDS. Used by Netty. */ -public class WriteHandler extends ChannelOutboundHandlerAdapter { - /** - * Converts objects submitted to {@link Channel#write(Object)} and {@link - * Channel#writeAndFlush(Object)} to a {@link ByteBuf}. - */ - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - var bytes = (byte[]) msg; - - super.write(ctx, Unpooled.copiedBuffer(bytes), promise); - } -} From e429be9b5d41c93f77693f11d5395681a66329f6 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 13 Dec 2023 17:02:38 -0800 Subject: [PATCH 03/17] Rework how we open `Channel`. Signed-off-by: Yury-Fridlyand --- .../babushka/connectors/SocketConnection.java | 134 ----------------- .../connectors/handlers/ChannelHandler.java | 28 +++- ... => ProtobufSocketChannelInitializer.java} | 4 +- .../connectors/resources/Platform.java | 139 ++++++++++++++++++ 4 files changed, 165 insertions(+), 140 deletions(-) delete mode 100644 java/client/src/main/java/babushka/connectors/SocketConnection.java rename java/client/src/main/java/babushka/connectors/handlers/{ChannelBuilder.java => ProtobufSocketChannelInitializer.java} (88%) create mode 100644 java/client/src/main/java/babushka/connectors/resources/Platform.java diff --git a/java/client/src/main/java/babushka/connectors/SocketConnection.java b/java/client/src/main/java/babushka/connectors/SocketConnection.java deleted file mode 100644 index cd41e00ced..0000000000 --- a/java/client/src/main/java/babushka/connectors/SocketConnection.java +++ /dev/null @@ -1,134 +0,0 @@ -package babushka.connectors; - -import babushka.connectors.handlers.ChannelBuilder; -import babushka.connectors.handlers.ChannelHandler; -import babushka.managers.CallbackManager; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollDomainSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueDomainSocketChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.channel.unix.DomainSocketAddress; -import io.netty.util.concurrent.DefaultThreadFactory; - -public class SocketConnection { - - /** Thread pool supplied to Netty to perform all async IO. */ - private EventLoopGroup group; - - /** The singleton instance. */ - private static SocketConnection INSTANCE = null; - - private static String socketPath; - - public static void setSocketPath(String socketPath) { - if (SocketConnection.socketPath == null) { - SocketConnection.socketPath = socketPath; - return; - } - throw new RuntimeException("socket path can only be declared once"); - } - - /** - * Creates (if not yet created) and returns the singleton instance of the {@link - * SocketConnection}. - * - * @return a {@link SocketConnection} instance. - */ - public static synchronized SocketConnection getInstance() { - if (INSTANCE == null) { - assert socketPath != null : "socket path must be defined"; - INSTANCE = new SocketConnection(); - } - return INSTANCE; - } - - // At the moment, Windows is not supported - // Probably we should use NIO (NioEventLoopGroup) for Windows. - private static final boolean isMacOs = isKQueueAvailable(); - - // TODO support IO-Uring and NIO - /** - * Detect platform to identify which native implementation to use for UDS interaction. Currently - * supported platforms are: Linux and macOS.
- * Subject to change in future to support more platforms and implementations. - */ - private static boolean isKQueueAvailable() { - try { - Class.forName("io.netty.channel.kqueue.KQueue"); - return KQueue.isAvailable(); - } catch (ClassNotFoundException e) { - return false; - } - } - - /** Constructor for the single instance. */ - private SocketConnection() { - try { - int cpuCount = Runtime.getRuntime().availableProcessors(); - group = - isMacOs - ? new KQueueEventLoopGroup( - cpuCount, new DefaultThreadFactory("SocketConnection-kqueue-elg", true)) - : new EpollEventLoopGroup( - cpuCount, new DefaultThreadFactory("SocketConnection-epoll-elg", true)); - } catch (Exception e) { - System.err.printf( - "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); - e.printStackTrace(System.err); - } - } - - /** Open a new channel for a new client. */ - public ChannelHandler openNewChannel(CallbackManager callbackManager) { - try { - Channel channel = - new Bootstrap() - .group(group) - .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) - .handler(new ChannelBuilder(callbackManager)) - .connect(new DomainSocketAddress(socketPath)) - .sync() - .channel(); - return new ChannelHandler(channel, callbackManager); - } catch (InterruptedException e) { - System.err.printf( - "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); - e.printStackTrace(System.err); - throw new RuntimeException(e); - } - } - - /** - * Closes the UDS connection and frees corresponding resources. A consecutive call to {@link - * #getInstance()} will create a new connection with new resource pool. - */ - public void close() { - group.shutdownGracefully(); - INSTANCE = null; - } - - /** - * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing - * resources by calling {@link #close()}. It is recommended to use a class instead of lambda to - * ensure that it is called.
- * See {@link Runtime#addShutdownHook}. - */ - private static class ShutdownHook implements Runnable { - @Override - public void run() { - if (INSTANCE != null) { - INSTANCE.close(); - INSTANCE = null; - } - } - } - - static { - Runtime.getRuntime() - .addShutdownHook(new Thread(new ShutdownHook(), "SocketConnection-shutdown-hook")); - } -} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 1942b3033b..1cd11f12e8 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -1,10 +1,14 @@ package babushka.connectors.handlers; +import babushka.connectors.resources.Platform; import babushka.managers.CallbackManager; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.unix.DomainSocketAddress; +import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; -import lombok.RequiredArgsConstructor; +import java.util.concurrent.atomic.AtomicBoolean; import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.Response; @@ -12,11 +16,23 @@ * Class responsible for manipulations with Netty's {@link Channel}.
* Uses a {@link CallbackManager} to record callbacks of every request sent. */ -@RequiredArgsConstructor public class ChannelHandler { private final Channel channel; private final CallbackManager callbackManager; + /** Open a new channel for a new client. */ + public ChannelHandler(CallbackManager callbackManager, String socketPath) { + channel = + new Bootstrap() + .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) + .channel(Platform.getClientUdsNettyChannelType()) + .handler(new ProtobufSocketChannelInitializer(callbackManager)) + .connect(new DomainSocketAddress(socketPath)) + // TODO call here .sync() if needed or remove this comment + .channel(); + this.callbackManager = callbackManager; + } + /** Write a protobuf message to the socket. */ public CompletableFuture write(RedisRequest.Builder request, boolean flush) { var commandId = callbackManager.registerRequest(); @@ -36,9 +52,13 @@ public CompletableFuture connect(ConnectionRequest request) { return callbackManager.getConnectionPromise(); } + private final AtomicBoolean closed = new AtomicBoolean(false); + /** Closes the UDS connection and frees corresponding resources. */ public void close() { - channel.close(); - callbackManager.shutdownGracefully(); + if (closed.compareAndSet(false, true)) { + channel.close(); + callbackManager.shutdownGracefully(); + } } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java similarity index 88% rename from java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java rename to java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java index 157aa4f632..eb37b221cc 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java @@ -12,9 +12,9 @@ import lombok.RequiredArgsConstructor; import response.ResponseOuterClass.Response; -/** Builder for the channel used by {@link babushka.connectors.SocketConnection}. */ +/** Builder for the channel used by {@link ChannelHandler}. */ @RequiredArgsConstructor -public class ChannelBuilder extends ChannelInitializer { +public class ProtobufSocketChannelInitializer extends ChannelInitializer { private final CallbackManager callbackManager; diff --git a/java/client/src/main/java/babushka/connectors/resources/Platform.java b/java/client/src/main/java/babushka/connectors/resources/Platform.java new file mode 100644 index 0000000000..b411f04f50 --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/resources/Platform.java @@ -0,0 +1,139 @@ +package babushka.connectors.resources; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.unix.DomainSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.Map; +import java.util.OptionalInt; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.UtilityClass; + +/** + * An auxiliary class purposed to detect platform (OS + JVM) {@link Capabilities} and allocate + * corresponding resources. + */ +@UtilityClass +public class Platform { + + @Getter + @AllArgsConstructor(access = AccessLevel.PRIVATE) + @ToString + public static class Capabilities { + private final boolean isKQueueAvailable; + private final boolean isEPollAvailable; + // TODO support IO-Uring and NIO + private final boolean isIOUringAvailable; + // At the moment, Windows is not supported + // Probably we should use NIO (NioEventLoopGroup) for Windows. + private final boolean isNIOAvailable; + } + + /** Detected platform (OS + JVM) capabilities. Not supposed to be changed in runtime. */ + @Getter + private static final Capabilities capabilities = + new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false); + + /** + * Thread pools supplied to Netty to perform all async IO.
+ * Map key is supposed to be pool name + thread count as a string concat product. + */ + private static final Map groups = new ConcurrentHashMap<>(); + + /** Detect kqueue availability. */ + private static boolean isKQueueAvailable() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + return KQueue.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + /** Detect epoll availability. */ + private static boolean isEPollAvailable() { + try { + Class.forName("io.netty.channel.epoll.Epoll"); + return Epoll.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + /** + * Allocate Netty thread pool required to manage connection. A thread pool could be shared across + * multiple connections. + * + * @return A new thread pool. + */ + public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) { + int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); + if (capabilities.isKQueueAvailable()) { + var name = prefix + "-kqueue-elg"; + return getOrCreate( + name + threadCount, + () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } else if (capabilities.isEPollAvailable()) { + var name = prefix + "-epoll-elg"; + return getOrCreate( + name + threadCount, + () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } + // TODO support IO-Uring and NIO + + throw new RuntimeException("Current platform supports no known thread pool types"); + } + + /** + * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. + */ + private static EventLoopGroup getOrCreate(String name, Supplier supplier) { + if (groups.containsKey(name)) { + return groups.get(name); + } + var group = supplier.get(); + groups.put(name, group); + return group; + } + + /** + * Get a channel class required by Netty to open a client UDS channel. + * + * @return Return a class supported by the current platform. + */ + public static Class getClientUdsNettyChannelType() { + if (capabilities.isKQueueAvailable()) { + return KQueueDomainSocketChannel.class; + } + if (capabilities.isEPollAvailable()) { + return EpollDomainSocketChannel.class; + } + throw new RuntimeException("Current platform supports no known socket types"); + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources. It is recommended to use a class instead of lambda to ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + private static class ShutdownHook implements Runnable { + @Override + public void run() { + groups.values().forEach(EventLoopGroup::shutdownGracefully); + } + } + + static { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook")); + } +} From 708293f1c949ab614ecba1cce0690c90db3a43b0 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 14 Dec 2023 12:52:45 -0800 Subject: [PATCH 04/17] More TODOs for the god of TODOs. Signed-off-by: Yury-Fridlyand --- .../main/java/babushka/connectors/handlers/ChannelHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 1cd11f12e8..80f04839c0 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -24,6 +24,7 @@ public class ChannelHandler { public ChannelHandler(CallbackManager callbackManager, String socketPath) { channel = new Bootstrap() + // TODO let user specify the thread pool or pool size as an option .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) .channel(Platform.getClientUdsNettyChannelType()) .handler(new ProtobufSocketChannelInitializer(callbackManager)) From 12dcaba84cd5a848529593884d49a7aafc86d4c9 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 14 Dec 2023 20:37:57 -0800 Subject: [PATCH 05/17] Rename `CallbackManager` to `CallbackDispatcher`. Signed-off-by: Yury-Fridlyand --- .../handlers/CallbackDispatcher.java} | 17 +++++++++-------- .../connectors/handlers/ChannelHandler.java | 17 ++++++++--------- .../ProtobufSocketChannelInitializer.java | 5 ++--- .../connectors/handlers/ReadHandler.java | 7 +++---- 4 files changed, 22 insertions(+), 24 deletions(-) rename java/client/src/main/java/babushka/{managers/CallbackManager.java => connectors/handlers/CallbackDispatcher.java} (82%) diff --git a/java/client/src/main/java/babushka/managers/CallbackManager.java b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java similarity index 82% rename from java/client/src/main/java/babushka/managers/CallbackManager.java rename to java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java index 34d95d607c..5ebaa03969 100644 --- a/java/client/src/main/java/babushka/managers/CallbackManager.java +++ b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java @@ -1,16 +1,14 @@ -package babushka.managers; +package babushka.connectors.handlers; -import babushka.connectors.handlers.ReadHandler; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; import response.ResponseOuterClass.Response; /** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */ -public class CallbackManager { +public class CallbackDispatcher { /** Unique request ID (callback ID). Thread-safe. */ private final AtomicInteger requestId = new AtomicInteger(0); @@ -25,7 +23,7 @@ public class CallbackManager { * requests can't be stored in the same storage, because callback ID = 0 is hardcoded for * connection requests. */ - @Getter private final CompletableFuture connectionPromise = new CompletableFuture<>(); + private final CompletableFuture connectionPromise = new CompletableFuture<>(); /** * Register a new request to be sent. Once response received, the given future completes with it. @@ -40,6 +38,10 @@ public Pair> registerRequest() { return Pair.of(callbackId, future); } + public CompletableFuture registerConnection() { + return connectionPromise; + } + /** * Complete the corresponding client promise and free resources. * @@ -56,9 +58,8 @@ public void completeRequest(Response response) { } public void shutdownGracefully() { - connectionPromise.completeExceptionally(new InterruptedException()); - responses.forEach( - (callbackId, future) -> future.completeExceptionally(new InterruptedException())); + connectionPromise.cancel(false); + responses.values().forEach(future -> future.cancel(false)); responses.clear(); } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 80f04839c0..adfdcbbbcc 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -1,7 +1,6 @@ package babushka.connectors.handlers; import babushka.connectors.resources.Platform; -import babushka.managers.CallbackManager; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -14,29 +13,29 @@ /** * Class responsible for manipulations with Netty's {@link Channel}.
- * Uses a {@link CallbackManager} to record callbacks of every request sent. + * Uses a {@link CallbackDispatcher} to record callbacks of every request sent. */ public class ChannelHandler { private final Channel channel; - private final CallbackManager callbackManager; + private final CallbackDispatcher callbackDispatcher; /** Open a new channel for a new client. */ - public ChannelHandler(CallbackManager callbackManager, String socketPath) { + public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) { channel = new Bootstrap() // TODO let user specify the thread pool or pool size as an option .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) .channel(Platform.getClientUdsNettyChannelType()) - .handler(new ProtobufSocketChannelInitializer(callbackManager)) + .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) .connect(new DomainSocketAddress(socketPath)) // TODO call here .sync() if needed or remove this comment .channel(); - this.callbackManager = callbackManager; + this.callbackDispatcher = callbackDispatcher; } /** Write a protobuf message to the socket. */ public CompletableFuture write(RedisRequest.Builder request, boolean flush) { - var commandId = callbackManager.registerRequest(); + var commandId = callbackDispatcher.registerRequest(); request.setCallbackIdx(commandId.getKey()); if (flush) { @@ -50,7 +49,7 @@ public CompletableFuture write(RedisRequest.Builder request, boolean f /** Write a protobuf message to the socket. */ public CompletableFuture connect(ConnectionRequest request) { channel.writeAndFlush(request); - return callbackManager.getConnectionPromise(); + return callbackDispatcher.registerConnection(); } private final AtomicBoolean closed = new AtomicBoolean(false); @@ -59,7 +58,7 @@ public CompletableFuture connect(ConnectionRequest request) { public void close() { if (closed.compareAndSet(false, true)) { channel.close(); - callbackManager.shutdownGracefully(); + callbackDispatcher.shutdownGracefully(); } } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java index eb37b221cc..06c4c03f02 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java @@ -1,6 +1,5 @@ package babushka.connectors.handlers; -import babushka.managers.CallbackManager; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.unix.UnixChannel; @@ -16,7 +15,7 @@ @RequiredArgsConstructor public class ProtobufSocketChannelInitializer extends ChannelInitializer { - private final CallbackManager callbackManager; + private final CallbackDispatcher callbackDispatcher; @Override public void initChannel(@NonNull UnixChannel ch) { @@ -26,7 +25,7 @@ public void initChannel(@NonNull UnixChannel ch) { .addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()) .addLast("protobufDecoder", new ProtobufDecoder(Response.getDefaultInstance())) .addLast("protobufEncoder", new ProtobufEncoder()) - .addLast(new ReadHandler(callbackManager)) + .addLast(new ReadHandler(callbackDispatcher)) .addLast(new ChannelOutboundHandlerAdapter()); } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java index 711119aca8..63aedf001e 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java @@ -1,6 +1,5 @@ package babushka.connectors.handlers; -import babushka.managers.CallbackManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.NonNull; @@ -11,12 +10,12 @@ @RequiredArgsConstructor public class ReadHandler extends ChannelInboundHandlerAdapter { - private final CallbackManager callbackManager; + private final CallbackDispatcher callbackDispatcher; - /** Submit responses from babushka to an instance {@link CallbackManager} to handle them. */ + /** Submit responses from babushka to an instance {@link CallbackDispatcher} to handle them. */ @Override public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) { - callbackManager.completeRequest((Response) msg); + callbackDispatcher.completeRequest((Response) msg); } /** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */ From 68b42a583d60496d587d86623710a845bb010607 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 21 Dec 2023 10:47:40 -0800 Subject: [PATCH 06/17] Add managers layer to client (#44) * Managers Signed-off-by: Yury-Fridlyand * Refactor Signed-off-by: Yury-Fridlyand * Split `ClientState` and `Platform`. Signed-off-by: Yury-Fridlyand * Refactor Signed-off-by: Yury-Fridlyand * Address PR feedback. Signed-off-by: Yury-Fridlyand * Minor fix. Signed-off-by: Yury-Fridlyand * Typo fix. Signed-off-by: Yury-Fridlyand * Refactor Signed-off-by: Yury-Fridlyand * javadocs Signed-off-by: Yury-Fridlyand --------- Signed-off-by: Yury-Fridlyand --- java/client/build.gradle | 3 + .../handlers/CallbackDispatcher.java | 58 ++++++++----- .../connectors/handlers/ChannelHandler.java | 14 ++-- .../connectors/resources/Platform.java | 66 --------------- .../resources/ThreadPoolAllocator.java | 72 +++++++++++++++++ .../ffi/resolvers/RedisValueResolver.java | 13 +++ ...tions.java => SocketListenerResolver.java} | 8 +- .../babushka/managers/CommandManager.java | 81 +++++++++++++++++++ .../babushka/managers/ConnectionManager.java | 68 ++++++++++++++++ .../java/babushka/models/RequestBuilder.java | 60 ++++++++++++++ java/src/lib.rs | 6 +- 11 files changed, 348 insertions(+), 101 deletions(-) create mode 100644 java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java create mode 100644 java/client/src/main/java/babushka/ffi/resolvers/RedisValueResolver.java rename java/client/src/main/java/babushka/ffi/resolvers/{BabushkaCoreNativeDefinitions.java => SocketListenerResolver.java} (63%) create mode 100644 java/client/src/main/java/babushka/managers/CommandManager.java create mode 100644 java/client/src/main/java/babushka/managers/ConnectionManager.java create mode 100644 java/client/src/main/java/babushka/models/RequestBuilder.java diff --git a/java/client/build.gradle b/java/client/build.gradle index be9120cf3f..4aa2415723 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -52,16 +52,19 @@ tasks.register('cleanProtobuf') { tasks.register('buildRustRelease', Exec) { commandLine 'cargo', 'build', '--release' workingDir project.rootDir + environment 'CARGO_TERM_COLOR', 'always' } tasks.register('buildRustReleaseStrip', Exec) { commandLine 'cargo', 'build', '--release', '--strip' workingDir project.rootDir + environment 'CARGO_TERM_COLOR', 'always' } tasks.register('buildRust', Exec) { commandLine 'cargo', 'build' workingDir project.rootDir + environment 'CARGO_TERM_COLOR', 'always' } tasks.register('buildWithRust') { diff --git a/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java index 5ebaa03969..17589b826e 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java @@ -1,29 +1,39 @@ package babushka.connectors.handlers; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.tuple.Pair; import response.ResponseOuterClass.Response; /** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */ +@RequiredArgsConstructor public class CallbackDispatcher { - /** Unique request ID (callback ID). Thread-safe. */ - private final AtomicInteger requestId = new AtomicInteger(0); + + /** Client connection status needed to distinguish connection request. */ + private final AtomicBoolean connectionStatus; + + /** Reserved callback ID for connection request. */ + private final Integer CONNECTION_PROMISE_ID = 0; /** * Storage of Futures to handle responses. Map key is callback id, which starts from 1.
- * Each future is a promise for every submitted by user request. + * Each future is a promise for every submitted by user request.
+ * Note: Protobuf packet contains callback ID as uint32, but it stores data as a bit field.
+ * Negative java values would be shown as positive on rust side. Meanwhile, no data loss happen, + * because callback ID remains unique. */ - private final Map> responses = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> responses = + new ConcurrentHashMap<>(); /** - * Storage for connection request similar to {@link #responses}. Unfortunately, connection - * requests can't be stored in the same storage, because callback ID = 0 is hardcoded for - * connection requests. + * Storage of freed callback IDs. It is needed to avoid occupying an ID being used and to speed up + * search for a next free ID.
*/ - private final CompletableFuture connectionPromise = new CompletableFuture<>(); + // TODO: Optimize to avoid growing up to 2e32 (16 Gb) https://github.com/aws/babushka/issues/704 + private final ConcurrentLinkedQueue freeRequestIds = new ConcurrentLinkedQueue<>(); /** * Register a new request to be sent. Once response received, the given future completes with it. @@ -32,14 +42,21 @@ public class CallbackDispatcher { * response. */ public Pair> registerRequest() { - int callbackId = requestId.incrementAndGet(); var future = new CompletableFuture(); - responses.put(callbackId, future); + Integer callbackId = connectionStatus.get() ? freeRequestIds.poll() : CONNECTION_PROMISE_ID; + synchronized (responses) { + if (callbackId == null) { + long size = responses.mappingCount(); + callbackId = (int) (size < Integer.MAX_VALUE ? size : -(size - Integer.MAX_VALUE)); + } + responses.put(callbackId, future); + } return Pair.of(callbackId, future); } public CompletableFuture registerConnection() { - return connectionPromise; + var res = registerRequest(); + return res.getValue(); } /** @@ -48,17 +65,22 @@ public CompletableFuture registerConnection() { * @param response A response received */ public void completeRequest(Response response) { - int callbackId = response.getCallbackIdx(); - if (callbackId == 0) { - connectionPromise.completeAsync(() -> response); + // A connection response doesn't contain a callback id + int callbackId = connectionStatus.get() ? response.getCallbackIdx() : CONNECTION_PROMISE_ID; + CompletableFuture future = responses.get(callbackId); + if (future != null) { + future.completeAsync(() -> response); } else { - responses.get(callbackId).completeAsync(() -> response); + // TODO: log an error. + // probably a response was received after shutdown or `registerRequest` call was missing + } + synchronized (responses) { responses.remove(callbackId); } + freeRequestIds.add(callbackId); } public void shutdownGracefully() { - connectionPromise.cancel(false); responses.values().forEach(future -> future.cancel(false)); responses.clear(); } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index adfdcbbbcc..3a41a3e20c 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -1,13 +1,13 @@ package babushka.connectors.handlers; import babushka.connectors.resources.Platform; +import babushka.connectors.resources.ThreadPoolAllocator; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.unix.DomainSocketAddress; -import java.util.OptionalInt; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.Response; @@ -24,7 +24,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) channel = new Bootstrap() // TODO let user specify the thread pool or pool size as an option - .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) + .group(ThreadPoolAllocator.createNettyThreadPool("babushka-channel", Optional.empty())) .channel(Platform.getClientUdsNettyChannelType()) .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) .connect(new DomainSocketAddress(socketPath)) @@ -52,13 +52,9 @@ public CompletableFuture connect(ConnectionRequest request) { return callbackDispatcher.registerConnection(); } - private final AtomicBoolean closed = new AtomicBoolean(false); - /** Closes the UDS connection and frees corresponding resources. */ public void close() { - if (closed.compareAndSet(false, true)) { - channel.close(); - callbackDispatcher.shutdownGracefully(); - } + channel.close(); + callbackDispatcher.shutdownGracefully(); } } diff --git a/java/client/src/main/java/babushka/connectors/resources/Platform.java b/java/client/src/main/java/babushka/connectors/resources/Platform.java index b411f04f50..4967a9b9f0 100644 --- a/java/client/src/main/java/babushka/connectors/resources/Platform.java +++ b/java/client/src/main/java/babushka/connectors/resources/Platform.java @@ -1,18 +1,10 @@ package babushka.connectors.resources; -import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollDomainSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueDomainSocketChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.unix.DomainSocketChannel; -import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.Map; -import java.util.OptionalInt; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -44,12 +36,6 @@ public static class Capabilities { private static final Capabilities capabilities = new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false); - /** - * Thread pools supplied to Netty to perform all async IO.
- * Map key is supposed to be pool name + thread count as a string concat product. - */ - private static final Map groups = new ConcurrentHashMap<>(); - /** Detect kqueue availability. */ private static boolean isKQueueAvailable() { try { @@ -70,42 +56,6 @@ private static boolean isEPollAvailable() { } } - /** - * Allocate Netty thread pool required to manage connection. A thread pool could be shared across - * multiple connections. - * - * @return A new thread pool. - */ - public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) { - int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); - if (capabilities.isKQueueAvailable()) { - var name = prefix + "-kqueue-elg"; - return getOrCreate( - name + threadCount, - () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); - } else if (capabilities.isEPollAvailable()) { - var name = prefix + "-epoll-elg"; - return getOrCreate( - name + threadCount, - () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); - } - // TODO support IO-Uring and NIO - - throw new RuntimeException("Current platform supports no known thread pool types"); - } - - /** - * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. - */ - private static EventLoopGroup getOrCreate(String name, Supplier supplier) { - if (groups.containsKey(name)) { - return groups.get(name); - } - var group = supplier.get(); - groups.put(name, group); - return group; - } - /** * Get a channel class required by Netty to open a client UDS channel. * @@ -120,20 +70,4 @@ public static Class getClientUdsNettyChannelType( } throw new RuntimeException("Current platform supports no known socket types"); } - - /** - * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing - * resources. It is recommended to use a class instead of lambda to ensure that it is called.
- * See {@link Runtime#addShutdownHook}. - */ - private static class ShutdownHook implements Runnable { - @Override - public void run() { - groups.values().forEach(EventLoopGroup::shutdownGracefully); - } - } - - static { - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook")); - } } diff --git a/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java b/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java new file mode 100644 index 0000000000..daefdb93e5 --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java @@ -0,0 +1,72 @@ +package babushka.connectors.resources; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** A class responsible to allocating and deallocating shared thread pools. */ +public class ThreadPoolAllocator { + + /** + * Thread pools supplied to Netty to perform all async IO.
+ * Map key is supposed to be pool name + thread count as a string concat product. + */ + private static final Map groups = new ConcurrentHashMap<>(); + + /** + * Allocate (create new or share existing) Netty thread pool required to manage connection. A + * thread pool could be shared across multiple connections. + * + * @return A new thread pool. + */ + public static EventLoopGroup createNettyThreadPool(String prefix, Optional threadLimit) { + int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); + if (Platform.getCapabilities().isKQueueAvailable()) { + String name = prefix + "-kqueue-elg"; + return getOrCreate( + name + threadCount, + () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } else if (Platform.getCapabilities().isEPollAvailable()) { + String name = prefix + "-epoll-elg"; + return getOrCreate( + name + threadCount, + () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } + // TODO support IO-Uring and NIO + + throw new RuntimeException("Current platform supports no known thread pool types"); + } + + /** + * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. + */ + private static EventLoopGroup getOrCreate(String name, Supplier supplier) { + if (groups.containsKey(name)) { + return groups.get(name); + } + EventLoopGroup group = supplier.get(); + groups.put(name, group); + return group; + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources. It is recommended to use a class instead of lambda to ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + private static class ShutdownHook implements Runnable { + @Override + public void run() { + groups.values().forEach(EventLoopGroup::shutdownGracefully); + } + } + + static { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook")); + } +} diff --git a/java/client/src/main/java/babushka/ffi/resolvers/RedisValueResolver.java b/java/client/src/main/java/babushka/ffi/resolvers/RedisValueResolver.java new file mode 100644 index 0000000000..133ccef0ab --- /dev/null +++ b/java/client/src/main/java/babushka/ffi/resolvers/RedisValueResolver.java @@ -0,0 +1,13 @@ +package babushka.ffi.resolvers; + +import response.ResponseOuterClass.Response; + +public class RedisValueResolver { + /** + * Resolve a value received from Redis using given C-style pointer. + * + * @param pointer A memory pointer from {@link Response} + * @return A RESP3 value + */ + public static native Object valueFromPointer(long pointer); +} diff --git a/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/babushka/ffi/resolvers/SocketListenerResolver.java similarity index 63% rename from java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java rename to java/client/src/main/java/babushka/ffi/resolvers/SocketListenerResolver.java index 6d4ec45121..112582f9ba 100644 --- a/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java +++ b/java/client/src/main/java/babushka/ffi/resolvers/SocketListenerResolver.java @@ -1,9 +1,9 @@ package babushka.ffi.resolvers; -public class BabushkaCoreNativeDefinitions { - public static native String startSocketListenerExternal() throws Exception; +public class SocketListenerResolver { - public static native Object valueFromPointer(long pointer); + /** Make an FFI call to Babushka to open a UDS socket to connect to. */ + private static native String startSocketListener() throws Exception; static { System.loadLibrary("javababushka"); @@ -16,7 +16,7 @@ public class BabushkaCoreNativeDefinitions { */ public static String getSocket() { try { - return startSocketListenerExternal(); + return startSocketListener(); } catch (Exception | UnsatisfiedLinkError e) { System.err.printf("Failed to create a UDS connection: %s%n%n", e); throw new RuntimeException(e); diff --git a/java/client/src/main/java/babushka/managers/CommandManager.java b/java/client/src/main/java/babushka/managers/CommandManager.java new file mode 100644 index 0000000000..79a7474a3f --- /dev/null +++ b/java/client/src/main/java/babushka/managers/CommandManager.java @@ -0,0 +1,81 @@ +package babushka.managers; + +import babushka.connectors.handlers.ChannelHandler; +import babushka.ffi.resolvers.RedisValueResolver; +import babushka.models.RequestBuilder; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import redis_request.RedisRequestOuterClass.RequestType; +import response.ResponseOuterClass.Response; + +@RequiredArgsConstructor +public class CommandManager { + + /** UDS connection representation. */ + private final ChannelHandler channel; + + /** + * Async (non-blocking) get.
+ * See REDIS docs for GET. + * + * @param key The key name + */ + public CompletableFuture get(String key) { + return submitNewRequest(RequestType.GetString, List.of(key)); + } + + /** + * Async (non-blocking) set.
+ * See REDIS docs for SET. + * + * @param key The key name + * @param value The value to set + */ + public CompletableFuture set(String key, String value) { + return submitNewRequest(RequestType.SetString, List.of(key, value)); + } + + /** + * Build a command and submit it Netty to send. + * + * @param command Command type + * @param args Command arguments + * @return A result promise + */ + private CompletableFuture submitNewRequest(RequestType command, List args) { + // TODO this explicitly uses ForkJoin thread pool. May be we should use another one. + return CompletableFuture.supplyAsync( + () -> channel.write(RequestBuilder.prepareRedisRequest(command, args), true)) + // TODO: is there a better way to execute this? + .thenComposeAsync(f -> f) + .thenApplyAsync(this::extractValueFromResponse); + } + + /** + * Check response and extract data from it. + * + * @param response A response received from Babushka + * @return A String from the Redis RESP2 response, or Ok. Otherwise, returns null + */ + private String extractValueFromResponse(Response response) { + if (response.hasRequestError()) { + // TODO do we need to support different types of exceptions and distinguish them by type? + throw new RuntimeException( + String.format( + "%s: %s", + response.getRequestError().getType(), response.getRequestError().getMessage())); + } else if (response.hasClosingError()) { + CompletableFuture.runAsync(channel::close); + throw new RuntimeException("Connection closed: " + response.getClosingError()); + } else if (response.hasConstantResponse()) { + return response.getConstantResponse().toString(); + } else if (response.hasRespPointer()) { + return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString(); + } + // TODO commented out due to #710 https://github.com/aws/babushka/issues/710 + // empty response means a successful command + // throw new IllegalStateException("A malformed response received: " + response.toString()); + return "OK"; + } +} diff --git a/java/client/src/main/java/babushka/managers/ConnectionManager.java b/java/client/src/main/java/babushka/managers/ConnectionManager.java new file mode 100644 index 0000000000..4bb38c27ee --- /dev/null +++ b/java/client/src/main/java/babushka/managers/ConnectionManager.java @@ -0,0 +1,68 @@ +package babushka.managers; + +import babushka.connectors.handlers.ChannelHandler; +import babushka.ffi.resolvers.RedisValueResolver; +import babushka.models.RequestBuilder; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.RequiredArgsConstructor; +import response.ResponseOuterClass.ConstantResponse; +import response.ResponseOuterClass.Response; + +@RequiredArgsConstructor +public class ConnectionManager { + + /** UDS connection representation. */ + private final ChannelHandler channel; + + /** Client connection status to update when connection established. */ + private final AtomicBoolean connectionStatus; + + /** + * Connect to Redis using a ProtoBuf connection request. + * + * @param host Server address + * @param port Server port + * @param useSsl true if communication with the server or cluster should use Transport Level + * Security + * @param clusterMode true if REDIS instance runs in the cluster mode + */ + // TODO support more parameters and/or configuration object + public CompletableFuture connectToRedis( + String host, int port, boolean useSsl, boolean clusterMode) { + ConnectionRequest request = + RequestBuilder.createConnectionRequest(host, port, useSsl, clusterMode); + return channel.connect(request).thenApplyAsync(this::checkBabushkaResponse); + } + + /** Check a response received from Babushka. */ + private boolean checkBabushkaResponse(Response response) { + // TODO do we need to check callback value? It could be -1 or 0 + if (response.hasRequestError()) { + // TODO do we need to support different types of exceptions and distinguish them by type? + throw new RuntimeException( + String.format( + "%s: %s", + response.getRequestError().getType(), response.getRequestError().getMessage())); + } else if (response.hasClosingError()) { + throw new RuntimeException("Connection closed: " + response.getClosingError()); + } else if (response.hasConstantResponse()) { + return connectionStatus.compareAndSet( + false, response.getConstantResponse() == ConstantResponse.OK); + } else if (response.hasRespPointer()) { + throw new RuntimeException( + "Unexpected response data: " + + RedisValueResolver.valueFromPointer(response.getRespPointer())); + } + // TODO commented out due to #710 https://github.com/aws/babushka/issues/710 + // empty response means a successful connection + // throw new IllegalStateException("A malformed response received: " + response.toString()); + return connectionStatus.compareAndSet(false, true); + } + + /** Close the connection and the corresponding channel. */ + public CompletableFuture closeConnection() { + return CompletableFuture.runAsync(channel::close); + } +} diff --git a/java/client/src/main/java/babushka/models/RequestBuilder.java b/java/client/src/main/java/babushka/models/RequestBuilder.java new file mode 100644 index 0000000000..2ec729e4eb --- /dev/null +++ b/java/client/src/main/java/babushka/models/RequestBuilder.java @@ -0,0 +1,60 @@ +package babushka.models; + +import babushka.connectors.handlers.CallbackDispatcher; +import babushka.managers.CommandManager; +import babushka.managers.ConnectionManager; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import connection_request.ConnectionRequestOuterClass.NodeAddress; +import connection_request.ConnectionRequestOuterClass.ReadFrom; +import connection_request.ConnectionRequestOuterClass.TlsMode; +import java.util.List; +import redis_request.RedisRequestOuterClass.Command; +import redis_request.RedisRequestOuterClass.Command.ArgsArray; +import redis_request.RedisRequestOuterClass.RedisRequest; +import redis_request.RedisRequestOuterClass.RequestType; +import redis_request.RedisRequestOuterClass.Routes; +import redis_request.RedisRequestOuterClass.SimpleRoutes; + +public class RequestBuilder { + + /** + * Build a protobuf connection request.
+ * Used by {@link ConnectionManager#connectToRedis}. + */ + // TODO support more parameters and/or configuration object + public static ConnectionRequest createConnectionRequest( + String host, int port, boolean useSsl, boolean clusterMode) { + return ConnectionRequest.newBuilder() + .addAddresses(NodeAddress.newBuilder().setHost(host).setPort(port).build()) + .setTlsMode(useSsl ? TlsMode.SecureTls : TlsMode.NoTls) + .setClusterModeEnabled(clusterMode) + .setReadFrom(ReadFrom.Primary) + .setDatabaseId(0) + .build(); + } + + /** + * Build a protobuf command/transaction request draft.
+ * Used by {@link CommandManager}. + * + * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by + * adding a callback id. + */ + public static RedisRequest.Builder prepareRedisRequest(RequestType command, List args) { + var commandArgs = ArgsArray.newBuilder(); + for (var arg : args) { + commandArgs.addArgs(arg); + } + + return RedisRequest.newBuilder() + .setSingleCommand( // set command + Command.newBuilder() + .setRequestType(command) // set command name + .setArgsArray(commandArgs.build()) // set arguments + .build()) + .setRoute( // set route + Routes.newBuilder() + .setSimpleRoutes(SimpleRoutes.AllNodes) // set route type + .build()); + } +} diff --git a/java/src/lib.rs b/java/src/lib.rs index 13577f0805..8ff3b684fb 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -42,9 +42,7 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject { } #[no_mangle] -pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_valueFromPointer< - 'local, ->( +pub extern "system" fn Java_babushka_ffi_resolvers_RedisValueResolver_valueFromPointer<'local>( env: JNIEnv<'local>, _class: JClass<'local>, pointer: jlong, @@ -54,7 +52,7 @@ pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions } #[no_mangle] -pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_startSocketListenerExternal< +pub extern "system" fn Java_babushka_ffi_resolvers_SocketListenerResolver_startSocketListener< 'local, >( env: JNIEnv<'local>, From 579c53cf67708a2699f29c17f656f69f22a28d2f Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 21 Dec 2023 12:18:03 -0800 Subject: [PATCH 07/17] submodule update Signed-off-by: Yury-Fridlyand --- submodules/redis-rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/redis-rs b/submodules/redis-rs index 8b3fb0a029..71ff85cb64 160000 --- a/submodules/redis-rs +++ b/submodules/redis-rs @@ -1 +1 @@ -Subproject commit 8b3fb0a0292cdda95f10cf3f771f6ad7ae7b3303 +Subproject commit 71ff85cb64a81da8470f18c9f7ba7c541d292a8e From c78169b0c2cba5cce1bb59a58e9d60cd3d29ebd8 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 21 Dec 2023 12:36:15 -0800 Subject: [PATCH 08/17] typo fix Signed-off-by: Yury-Fridlyand --- .../main/java/glide/ffi/resolvers/SocketListenerResolver.java | 2 +- java/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java b/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java index e6fb91a94c..a97ec82b22 100644 --- a/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java +++ b/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java @@ -6,7 +6,7 @@ public class SocketListenerResolver { private static native String startSocketListener() throws Exception; static { - System.loadLibrary("glide-rs"); + System.loadLibrary("glide_rs"); } /** diff --git a/java/src/lib.rs b/java/src/lib.rs index 05c0e4c78f..31372c46b5 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -52,7 +52,7 @@ pub extern "system" fn Java_glide_ffi_resolvers_RedisValueResolver_valueFromPoin } #[no_mangle] -pub extern "system" fn Java_glide_ffi_resolvers_SocketListenerResolver_startSocketListenerExternal< +pub extern "system" fn Java_glide_ffi_resolvers_SocketListenerResolver_startSocketListener< 'local, >( env: JNIEnv<'local>, From 7d1ba35f732889cf63e58d3a517418ece296d770 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 21 Dec 2023 17:53:18 -0800 Subject: [PATCH 09/17] Renaming Signed-off-by: Yury-Fridlyand --- .../java/glide/connectors/handlers/ChannelHandler.java | 7 +++++-- .../main/java/glide/connectors/handlers/ReadHandler.java | 2 +- .../java/glide/ffi/resolvers/SocketListenerResolver.java | 2 +- .../src/main/java/glide/managers/ConnectionManager.java | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 7b5b780548..1c408f1c1e 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -12,10 +12,13 @@ import response.ResponseOuterClass.Response; /** - * Class responsible for manipulations with Netty's {@link Channel}.
+ * Class responsible for handling calls to/from a netty.io {@link Channel}.
* Uses a {@link CallbackDispatcher} to record callbacks of every request sent. */ public class ChannelHandler { + + private static final String THREAD_POOL_NAME = "glide-channel"; + private final Channel channel; private final CallbackDispatcher callbackDispatcher; @@ -24,7 +27,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) channel = new Bootstrap() // TODO let user specify the thread pool or pool size as an option - .group(ThreadPoolAllocator.createNettyThreadPool("babushka-channel", Optional.empty())) + .group(ThreadPoolAllocator.createNettyThreadPool(THREAD_POOL_NAME, Optional.empty())) .channel(Platform.getClientUdsNettyChannelType()) .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) .connect(new DomainSocketAddress(socketPath)) diff --git a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java index 7b175559b3..dee2abc1b2 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java @@ -12,7 +12,7 @@ public class ReadHandler extends ChannelInboundHandlerAdapter { private final CallbackDispatcher callbackDispatcher; - /** Submit responses from babushka to an instance {@link CallbackDispatcher} to handle them. */ + /** Submit responses from glide to an instance {@link CallbackDispatcher} to handle them. */ @Override public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) { callbackDispatcher.completeRequest((Response) msg); diff --git a/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java b/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java index a97ec82b22..ba4ed4c327 100644 --- a/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java +++ b/java/client/src/main/java/glide/ffi/resolvers/SocketListenerResolver.java @@ -2,7 +2,7 @@ public class SocketListenerResolver { - /** Make an FFI call to Babushka to open a UDS socket to connect to. */ + /** Make an FFI call to Glide to open a UDS socket to connect to. */ private static native String startSocketListener() throws Exception; static { diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 379d92d607..5449c6205f 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -36,7 +36,7 @@ public CompletableFuture connectToRedis( return channel.connect(request).thenApplyAsync(this::checkGlideRsResponse); } - /** Check a response received from Babushka. */ + /** Check a response received from Glide. */ private boolean checkGlideRsResponse(Response response) { // TODO do we need to check callback value? It could be -1 or 0 if (response.hasRequestError()) { From 7c5fc69dbd90938f178ef1e95c9009e251c68fbf Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 21 Dec 2023 17:55:49 -0800 Subject: [PATCH 10/17] Restore javadocs. Signed-off-by: Yury-Fridlyand --- .../glide/connectors/handlers/ChannelHandler.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 1c408f1c1e..c12cc15411 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -36,7 +36,13 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) this.callbackDispatcher = callbackDispatcher; } - /** Write a protobuf message to the socket. */ + /** + * Complete a protobuf message and write it to the channel (to UDS). + * + * @param request Incomplete request, function completes it by setting callback ID + * @param flush True to flush immediately + * @return A response promise + */ public CompletableFuture write(RedisRequest.Builder request, boolean flush) { var commandId = callbackDispatcher.registerRequest(); request.setCallbackIdx(commandId.getKey()); @@ -49,7 +55,12 @@ public CompletableFuture write(RedisRequest.Builder request, boolean f return commandId.getValue(); } - /** Write a protobuf message to the socket. */ + /** + * Write a protobuf message to the channel (to UDS). + * + * @param request A connection request + * @return A connection promise + */ public CompletableFuture connect(ConnectionRequest request) { channel.writeAndFlush(request); return callbackDispatcher.registerConnection(); From cb0f6c44c30a0d860920575a3092a3efd5899125 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 22 Dec 2023 12:49:24 -0800 Subject: [PATCH 11/17] Optimize callback ID allocation. Signed-off-by: Yury-Fridlyand --- .../handlers/CallbackDispatcher.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index e37c0e8b81..70a331d4b4 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -4,6 +4,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.tuple.Pair; import response.ResponseOuterClass.Response; @@ -15,6 +16,9 @@ public class CallbackDispatcher { /** Client connection status needed to distinguish connection request. */ private final AtomicBoolean connectionStatus; + /** Unique request ID (callback ID). Thread-safe and overflow-safe. */ + private final AtomicInteger requestId = new AtomicInteger(0); + /** Reserved callback ID for connection request. */ private final Integer CONNECTION_PROMISE_ID = 0; @@ -44,13 +48,14 @@ public class CallbackDispatcher { public Pair> registerRequest() { var future = new CompletableFuture(); Integer callbackId = connectionStatus.get() ? freeRequestIds.poll() : CONNECTION_PROMISE_ID; - synchronized (responses) { - if (callbackId == null) { - long size = responses.mappingCount(); - callbackId = (int) (size < Integer.MAX_VALUE ? size : -(size - Integer.MAX_VALUE)); + if (callbackId == null) { + callbackId = requestId.incrementAndGet(); + if (callbackId.equals(CONNECTION_PROMISE_ID)) { + throw new RuntimeException( + "Can't submit a new request, too many requests are pending already"); } - responses.put(callbackId, future); } + responses.put(callbackId, future); return Pair.of(callbackId, future); } @@ -74,9 +79,7 @@ public void completeRequest(Response response) { // TODO: log an error. // probably a response was received after shutdown or `registerRequest` call was missing } - synchronized (responses) { - responses.remove(callbackId); - } + responses.remove(callbackId); freeRequestIds.add(callbackId); } From 74a6ce021335c727be3a6c13799da54651a40093 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Thu, 28 Dec 2023 11:12:45 -0800 Subject: [PATCH 12/17] Optimize code for review comments Signed-off-by: Andrew Carbonetto --- .../handlers/CallbackDispatcher.java | 33 +++++++++---------- .../java/glide/managers/CommandManager.java | 10 ++---- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index 70a331d4b4..7ed5b344b2 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -13,14 +13,15 @@ @RequiredArgsConstructor public class CallbackDispatcher { - /** Client connection status needed to distinguish connection request. */ - private final AtomicBoolean connectionStatus; + /** + * Client connection status needed to distinguish connection request. The callback dispatcher only + * submits connection requests until a successful connection response is returned and the status + * changes to true. The callback dispatcher then submits command requests. + */ + private final AtomicBoolean isConnectedStatus; /** Unique request ID (callback ID). Thread-safe and overflow-safe. */ - private final AtomicInteger requestId = new AtomicInteger(0); - - /** Reserved callback ID for connection request. */ - private final Integer CONNECTION_PROMISE_ID = 0; + private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); /** * Storage of Futures to handle responses. Map key is callback id, which starts from 1.
@@ -47,13 +48,11 @@ public class CallbackDispatcher { */ public Pair> registerRequest() { var future = new CompletableFuture(); - Integer callbackId = connectionStatus.get() ? freeRequestIds.poll() : CONNECTION_PROMISE_ID; + Integer callbackId = freeRequestIds.poll(); if (callbackId == null) { - callbackId = requestId.incrementAndGet(); - if (callbackId.equals(CONNECTION_PROMISE_ID)) { - throw new RuntimeException( - "Can't submit a new request, too many requests are pending already"); - } + // on null, we have no available request ids available + // we can add + callbackId = nextAvailableRequestId.incrementAndGet(); } responses.put(callbackId, future); return Pair.of(callbackId, future); @@ -70,17 +69,17 @@ public CompletableFuture registerConnection() { * @param response A response received */ public void completeRequest(Response response) { - // A connection response doesn't contain a callback id - int callbackId = connectionStatus.get() ? response.getCallbackIdx() : CONNECTION_PROMISE_ID; - CompletableFuture future = responses.get(callbackId); + // Complete and return the response at callbackId + // free up the callback ID in the freeRequestIds list + int callbackId = response.getCallbackIdx(); + CompletableFuture future = responses.remove(callbackId); + freeRequestIds.add(callbackId); if (future != null) { future.completeAsync(() -> response); } else { // TODO: log an error. // probably a response was received after shutdown or `registerRequest` call was missing } - responses.remove(callbackId); - freeRequestIds.add(callbackId); } public void shutdownGracefully() { diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 017cd22c23..31f7bafafc 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -44,11 +44,8 @@ public CompletableFuture set(String key, String value) { * @return A result promise */ private CompletableFuture submitNewRequest(RequestType command, List args) { - // TODO this explicitly uses ForkJoin thread pool. May be we should use another one. - return CompletableFuture.supplyAsync( - () -> channel.write(RequestBuilder.prepareRedisRequest(command, args), true)) - // TODO: is there a better way to execute this? - .thenComposeAsync(f -> f) + return channel + .write(RequestBuilder.prepareRedisRequest(command, args), true) .thenApplyAsync(this::extractValueFromGlideRsResponse); } @@ -73,9 +70,6 @@ private String extractValueFromGlideRsResponse(Response response) { } else if (response.hasRespPointer()) { return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString(); } - // TODO commented out due to #710 https://github.com/aws/babushka/issues/710 - // empty response means a successful command - // throw new IllegalStateException("A malformed response received: " + response.toString()); return "OK"; } } From 6dcad763dca31383ff82bd3539212eb6a342953d Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 29 Dec 2023 09:15:05 -0800 Subject: [PATCH 13/17] Get before increment when getting a new callback id; add javadocs Signed-off-by: Andrew Carbonetto --- .../java/glide/connectors/handlers/CallbackDispatcher.java | 6 +++--- .../client/src/main/java/glide/managers/CommandManager.java | 6 +++++- .../src/main/java/glide/managers/ConnectionManager.java | 4 ++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index 7ed5b344b2..ec2711b03f 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -50,9 +50,9 @@ public Pair> registerRequest() { var future = new CompletableFuture(); Integer callbackId = freeRequestIds.poll(); if (callbackId == null) { - // on null, we have no available request ids available - // we can add - callbackId = nextAvailableRequestId.incrementAndGet(); + // on null, we have no available request ids available in freeRequestIds + // instead, get the next available request from counter + callbackId = nextAvailableRequestId.getAndIncrement(); } responses.put(callbackId, future); return Pair.of(callbackId, future); diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 31f7bafafc..0b4b60dfe2 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -9,6 +9,10 @@ import redis_request.RedisRequestOuterClass.RequestType; import response.ResponseOuterClass.Response; +/** + * Service responsible for submitting command requests to a socket channel handler + * and unpack responses from the same socket channel handler. + */ @RequiredArgsConstructor public class CommandManager { @@ -53,7 +57,7 @@ private CompletableFuture submitNewRequest(RequestType command, List Date: Tue, 2 Jan 2024 09:24:40 -0800 Subject: [PATCH 14/17] Update connectionManager arguments Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/managers/ConnectionManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 6456caa771..60fdccfb24 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -28,15 +28,15 @@ public class ConnectionManager { * * @param host Server address * @param port Server port - * @param useSsl true if communication with the server or cluster should use Transport Level + * @param useTls true if communication with the server or cluster should use Transport Level * Security - * @param clusterMode true if REDIS instance runs in the cluster mode + * @param clusterMode true if the client is used for connecting to a Redis Cluster */ // TODO support more parameters and/or configuration object public CompletableFuture connectToRedis( - String host, int port, boolean useSsl, boolean clusterMode) { + String host, int port, boolean useTls, boolean clusterMode) { ConnectionRequest request = - RequestBuilder.createConnectionRequest(host, port, useSsl, clusterMode); + RequestBuilder.createConnectionRequest(host, port, useTls, clusterMode); return channel.connect(request).thenApplyAsync(this::checkGlideRsResponse); } From 596462c1775c3e669319ed8f482cd4f950d68d89 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 2 Jan 2024 16:06:23 -0800 Subject: [PATCH 15/17] Remove connection status from the connection manager Signed-off-by: Andrew Carbonetto --- .../handlers/CallbackDispatcher.java | 8 ----- .../java/glide/managers/CommandManager.java | 4 +-- .../glide/managers/ConnectionManager.java | 30 +++++++------------ .../java/glide/models/RequestBuilder.java | 12 ++------ 4 files changed, 15 insertions(+), 39 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index ec2711b03f..1ded4a2c85 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -3,7 +3,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.tuple.Pair; @@ -13,13 +12,6 @@ @RequiredArgsConstructor public class CallbackDispatcher { - /** - * Client connection status needed to distinguish connection request. The callback dispatcher only - * submits connection requests until a successful connection response is returned and the status - * changes to true. The callback dispatcher then submits command requests. - */ - private final AtomicBoolean isConnectedStatus; - /** Unique request ID (callback ID). Thread-safe and overflow-safe. */ private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 0b4b60dfe2..134373a081 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -10,8 +10,8 @@ import response.ResponseOuterClass.Response; /** - * Service responsible for submitting command requests to a socket channel handler - * and unpack responses from the same socket channel handler. + * Service responsible for submitting command requests to a socket channel handler and unpack + * responses from the same socket channel handler. */ @RequiredArgsConstructor public class CommandManager { diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 60fdccfb24..7bee59c6e2 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -5,14 +5,12 @@ import glide.ffi.resolvers.RedisValueResolver; import glide.models.RequestBuilder; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.RequiredArgsConstructor; -import response.ResponseOuterClass.ConstantResponse; import response.ResponseOuterClass.Response; /** - * Service responsible for submitting connection requests to a socket channel handler - * and unpack responses from the same socket channel handler. + * Service responsible for submitting connection requests to a socket channel handler and unpack + * responses from the same socket channel handler. */ @RequiredArgsConstructor public class ConnectionManager { @@ -20,9 +18,6 @@ public class ConnectionManager { /** UDS connection representation. */ private final ChannelHandler channel; - /** Client connection status to update when connection established. */ - private final AtomicBoolean connectionStatus; - /** * Connect to Redis using a ProtoBuf connection request. * @@ -33,36 +28,31 @@ public class ConnectionManager { * @param clusterMode true if the client is used for connecting to a Redis Cluster */ // TODO support more parameters and/or configuration object - public CompletableFuture connectToRedis( + public CompletableFuture connectToRedis( String host, int port, boolean useTls, boolean clusterMode) { ConnectionRequest request = RequestBuilder.createConnectionRequest(host, port, useTls, clusterMode); - return channel.connect(request).thenApplyAsync(this::checkGlideRsResponse); + return channel.connect(request).thenApply(this::checkGlideRsResponse); } /** Check a response received from Glide. */ - private boolean checkGlideRsResponse(Response response) { - // TODO do we need to check callback value? It could be -1 or 0 + private Void checkGlideRsResponse(Response response) { if (response.hasRequestError()) { // TODO do we need to support different types of exceptions and distinguish them by type? throw new RuntimeException( String.format( "%s: %s", response.getRequestError().getType(), response.getRequestError().getMessage())); - } else if (response.hasClosingError()) { + } + if (response.hasClosingError()) { throw new RuntimeException("Connection closed: " + response.getClosingError()); - } else if (response.hasConstantResponse()) { - return connectionStatus.compareAndSet( - false, response.getConstantResponse() == ConstantResponse.OK); - } else if (response.hasRespPointer()) { + } + if (response.hasRespPointer()) { throw new RuntimeException( "Unexpected response data: " + RedisValueResolver.valueFromPointer(response.getRespPointer())); } - // TODO commented out due to #710 https://github.com/aws/babushka/issues/710 - // empty response means a successful connection - // throw new IllegalStateException("A malformed response received: " + response.toString()); - return connectionStatus.compareAndSet(false, true); + return null; } /** Close the connection and the corresponding channel. */ diff --git a/java/client/src/main/java/glide/models/RequestBuilder.java b/java/client/src/main/java/glide/models/RequestBuilder.java index 53ac918772..cd8e2dc1b6 100644 --- a/java/client/src/main/java/glide/models/RequestBuilder.java +++ b/java/client/src/main/java/glide/models/RequestBuilder.java @@ -47,14 +47,8 @@ public static RedisRequest.Builder prepareRedisRequest(RequestType command, List } return RedisRequest.newBuilder() - .setSingleCommand( // set command - Command.newBuilder() - .setRequestType(command) // set command name - .setArgsArray(commandArgs.build()) // set arguments - .build()) - .setRoute( // set route - Routes.newBuilder() - .setSimpleRoutes(SimpleRoutes.AllNodes) // set route type - .build()); + .setSingleCommand( + Command.newBuilder().setRequestType(command).setArgsArray(commandArgs.build()).build()) + .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build()); } } From 448c6e5f451005db3d1bbf4ab5701c724a06ec1a Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 2 Jan 2024 20:54:28 -0800 Subject: [PATCH 16/17] Clean up code comments Signed-off-by: Andrew Carbonetto --- .../connectors/handlers/CallbackDispatcher.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index 1ded4a2c85..8b2fb572d4 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -16,18 +16,19 @@ public class CallbackDispatcher { private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); /** - * Storage of Futures to handle responses. Map key is callback id, which starts from 1.
- * Each future is a promise for every submitted by user request.
- * Note: Protobuf packet contains callback ID as uint32, but it stores data as a bit field.
- * Negative java values would be shown as positive on rust side. Meanwhile, no data loss happen, - * because callback ID remains unique. + * Storage of Futures to handle responses. Map key is callback id, which starts from 0. The value + * is a CompletableFuture that is returned to the user and completed when the request is done. + * + *

Note: Protobuf packet contains callback ID as uint32, but it stores data as a bit field. + * Negative Java values would be shown as positive on Rust side. There is no data loss, because + * callback ID remains unique. */ private final ConcurrentHashMap> responses = new ConcurrentHashMap<>(); /** * Storage of freed callback IDs. It is needed to avoid occupying an ID being used and to speed up - * search for a next free ID.
+ * search for a next free ID. */ // TODO: Optimize to avoid growing up to 2e32 (16 Gb) https://github.com/aws/babushka/issues/704 private final ConcurrentLinkedQueue freeRequestIds = new ConcurrentLinkedQueue<>(); From 0327a842f6d2fb975882e3b45a91f409ff8f3514 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 3 Jan 2024 07:49:24 -0800 Subject: [PATCH 17/17] Add TODOs Signed-off-by: Andrew Carbonetto --- .../main/java/glide/connectors/handlers/ChannelHandler.java | 4 +++- .../java/glide/connectors/resources/ThreadPoolAllocator.java | 3 ++- java/client/src/main/java/glide/managers/CommandManager.java | 5 +++-- .../src/main/java/glide/managers/ConnectionManager.java | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index c12cc15411..bc38510105 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -24,10 +24,12 @@ public class ChannelHandler { /** Open a new channel for a new client. */ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) { + // TODO: add the ability to pass in group and channel from user channel = new Bootstrap() // TODO let user specify the thread pool or pool size as an option - .group(ThreadPoolAllocator.createNettyThreadPool(THREAD_POOL_NAME, Optional.empty())) + .group( + ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty())) .channel(Platform.getClientUdsNettyChannelType()) .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) .connect(new DomainSocketAddress(socketPath)) diff --git a/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java b/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java index 064b280c94..26a30537cf 100644 --- a/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java +++ b/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java @@ -24,7 +24,8 @@ public class ThreadPoolAllocator { * * @return A new thread pool. */ - public static EventLoopGroup createNettyThreadPool(String prefix, Optional threadLimit) { + public static EventLoopGroup createOrGetNettyThreadPool( + String prefix, Optional threadLimit) { int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); if (Platform.getCapabilities().isKQueueAvailable()) { String name = prefix + "-kqueue-elg"; diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 134373a081..8b460bf2df 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -61,12 +61,13 @@ private CompletableFuture submitNewRequest(RequestType command, List connectToRedis( /** Check a response received from Glide. */ private Void checkGlideRsResponse(Response response) { if (response.hasRequestError()) { - // TODO do we need to support different types of exceptions and distinguish them by type? + // TODO unexpected when establishing a connection throw new RuntimeException( String.format( "%s: %s",