From 617ab3e3b9a85096c9d3ab4da1113f4027332e10 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 3 Jan 2024 14:33:38 -0800 Subject: [PATCH] More clean up after merge Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/api/RedisClient.java | 17 ++++- .../java/glide/managers/CallbackManager.java | 63 ------------------- .../glide/managers/ConnectionManager.java | 14 +++-- 3 files changed, 24 insertions(+), 70 deletions(-) delete mode 100644 java/client/src/main/java/glide/managers/CallbackManager.java diff --git a/java/client/src/main/java/glide/api/RedisClient.java b/java/client/src/main/java/glide/api/RedisClient.java index dedb4df659..3995cb08d2 100644 --- a/java/client/src/main/java/glide/api/RedisClient.java +++ b/java/client/src/main/java/glide/api/RedisClient.java @@ -17,10 +17,22 @@ */ public class RedisClient extends BaseClient { + /** + * Request an async (non-blocking) Redis client in Standalone mode to a Redis service on localhost. + * + * @return a promise to connect and return a RedisClient + */ public static CompletableFuture CreateClient() { return CreateClient(RedisClientConfiguration.builder().build()); } + /** + * Request an async (non-blocking) Redis client in Standalone mode. + * + * @param host - host address of the Redis service + * @param port - port of the Redis service + * @return a promise to connect and return a RedisClient + */ public static CompletableFuture CreateClient(String host, Integer port) { RedisClientConfiguration config = RedisClientConfiguration.builder() @@ -47,6 +59,7 @@ protected static CompletableFuture CreateClient( RedisClientConfiguration config, ConnectionManager connectionManager, CommandManager commandManager) { + // TODO: Support exception throwing, including interrupted exceptions return connectionManager .connectToRedis(config) .thenApplyAsync(ignore -> new RedisClient(connectionManager, commandManager)); @@ -58,7 +71,9 @@ protected RedisClient(ConnectionManager connectionManager, CommandManager comman /** * Closes this resource, relinquishing any underlying resources. This method is invoked - * automatically on objects managed by the try-with-resources statement. see: see: AutoCloseable::close() */ @Override diff --git a/java/client/src/main/java/glide/managers/CallbackManager.java b/java/client/src/main/java/glide/managers/CallbackManager.java deleted file mode 100644 index 5ac6af263c..0000000000 --- a/java/client/src/main/java/glide/managers/CallbackManager.java +++ /dev/null @@ -1,63 +0,0 @@ -package glide.managers; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.Getter; -import org.apache.commons.lang3.tuple.Pair; -import response.ResponseOuterClass.Response; - -/** Holder for resources required to dispatch responses and used by ReadHandler. */ -public class CallbackManager { - /** Unique request ID (callback ID). Thread-safe. */ - private final AtomicInteger requestId = new AtomicInteger(0); - - /** - * Storage of Futures to handle responses. Map key is callback id, which starts from 0. Each - * future is a promise for every submitted by user request. - */ - private final Map> responses = new ConcurrentHashMap<>(); - - /** - * Storage for connection request similar to {@link #responses}. Unfortunately, connection - * requests can't be stored in the same storage, because callback ID = 0 is hardcoded for - * connection requests. - */ - @Getter private final CompletableFuture connectionPromise = new CompletableFuture<>(); - - /** - * Register a new request to be sent. Once response received, the given future completes with it. - * - * @return A pair of unique callback ID which should set into request and a client promise for - * response. - */ - public Pair> registerRequest() { - int callbackId = requestId.incrementAndGet(); - var future = new CompletableFuture(); - responses.put(callbackId, future); - return Pair.of(callbackId, future); - } - - /** - * Complete the corresponding client promise and free resources. - * - * @param response A response received - */ - public void completeRequest(Response response) { - int callbackId = response.getCallbackIdx(); - if (callbackId == 0) { - connectionPromise.completeAsync(() -> response); - } else { - responses.get(callbackId).completeAsync(() -> response); - responses.remove(callbackId); - } - } - - public void shutdownGracefully() { - connectionPromise.completeExceptionally(new InterruptedException()); - responses.forEach( - (callbackId, future) -> future.completeExceptionally(new InterruptedException())); - responses.clear(); - } -} diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 82644e3c66..7a884009ab 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -167,15 +167,17 @@ private Void checkGlideRsResponse(Response response) { String.format( "%s: %s", response.getRequestError().getType(), response.getRequestError().getMessage())); - } else if (response.hasClosingError()) { + } + if (response.hasClosingError()) { throw new RuntimeException("Connection closed: " + response.getClosingError()); - } else if (response.hasConstantResponse()) { - return null; - } else if (response.hasRespPointer()) { + } + if (response.hasRespPointer()) { throw new RuntimeException("Unexpected data in response"); } - // throw new IllegalStateException("A malformed response received: " + response.toString()); - return null; + if (response.hasConstantResponse()) { + return null; + } + throw new RuntimeException("Connection response expects an OK response"); } /** Close the connection and the corresponding channel. */