Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java - Add custom command interface; and BaseCommands #798

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.commands.BaseCommands;
import glide.api.commands.Command;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
Expand All @@ -13,7 +15,7 @@
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
*/
public class RedisClient extends BaseClient {
public class RedisClient extends BaseClient implements BaseCommands {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
Expand Down Expand Up @@ -47,4 +49,17 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle
protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single custom command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @param args command and arguments for the custom command call
* @return CompletableFuture with the response
*/
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseCommands::handleObjectResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package glide.api.commands;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
* Response resolver responsible for evaluating the Redis response object with a success or failure.
*/
@AllArgsConstructor
public class BaseCommandResponseResolver
implements RedisExceptionCheckedFunction<Response, Object> {

private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;

/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
*
* @return A generic Object with the Response | null if the response is empty
*/
public Object apply(Response response) throws RedisException {
// TODO: handle object if the object is small
// TODO: handle RESP2 object if configuration is set
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return respPointerResolver.apply(response.getRespPointer());
}
// if no response payload is provided, assume null
return null;
}
}
49 changes: 49 additions & 0 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package glide.api.commands;

import glide.ffi.resolvers.RedisValueResolver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import response.ResponseOuterClass.Response;

/** Base Commands interface to handle generic command and transaction requests. */
public interface BaseCommands {

/**
* default Object handler from response
*
* @return BaseCommandResponseResolver to deliver the response
*/
static BaseCommandResponseResolver applyBaseCommandResponseResolver() {
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer);
}

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response has an Object
*
* @param response Redis protobuf message
* @return Response Object
*/
static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return BaseCommands.applyBaseCommandResponseResolver().apply(response);
}

public static List<Object> handleTransactionResponse(Response response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the interface is public to the use, this will also be public to the user. Why do we want to expose this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I can hide this from the user, like RedisValueResolver::valueFromPointer is not being exposed.
This also applies to BaseCommandResponseResolver class?

// return function to convert protobuf.Response into the response object by
// calling valueFromPointer

List<Object> transactionResponse =
(List<Object>) BaseCommands.applyBaseCommandResponseResolver().apply(response);
return transactionResponse;
}

/**
* Execute a @see{Command} by sending command via socket manager
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
}
24 changes: 24 additions & 0 deletions java/client/src/main/java/glide/api/commands/Command.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package glide.api.commands;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;

/** Base Command class to send a single request to Redis. */
@Builder
@Getter
@EqualsAndHashCode
public class Command {

/** Redis command request type */
@NonNull final RequestType requestType;

/** List of Arguments for the Redis command request */
@Builder.Default final String[] arguments = new String[] {};

public enum RequestType {
/** Call a custom command with list of string arguments */
CUSTOM_COMMAND,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package glide.api.commands;

import glide.api.models.exceptions.RedisException;

@FunctionalInterface
public interface RedisExceptionCheckedFunction<R, T> {

/**
* Functional response handler that takes a protobuf Response object. <br>
* Returns a typed object on a successful Redis response. <br>
* Throws RedisException when receiving a Redis error response. <br>
*
* @param response - Redis Response
* @return T - response payload type
* @throws RedisException
*/
T apply(R response) throws RedisException;
}
118 changes: 41 additions & 77 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package glide.managers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import glide.api.commands.Command;
import glide.api.commands.RedisExceptionCheckedFunction;
import glide.connectors.handlers.ChannelHandler;
import glide.ffi.resolvers.RedisValueResolver;
import glide.models.RequestBuilder;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass.RequestType;
import response.ResponseOuterClass.RequestError;
import redis_request.RedisRequestOuterClass;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -27,82 +19,54 @@ public class CommandManager {
private final ChannelHandler channel;

/**
* Async (non-blocking) get.<br>
* See <a href="https://redis.io/commands/get/">REDIS docs for GET</a>.
* Build a command and send.
*
* @param key The key name
* @param command
* @param responseHandler - to handle the response object
* @return A result promise of type T
*/
public CompletableFuture<String> get(String key) {
return submitNewRequest(RequestType.GetString, List.of(key));
public <T> CompletableFuture<T> submitNewCommand(
Command command, RedisExceptionCheckedFunction<Response, T> responseHandler) {
// register callback
// create protobuf message from command
// submit async call
return channel
.write(prepareRedisRequest(command.getRequestType(), command.getArguments()), true)
.thenApplyAsync(response -> responseHandler.apply(response));
}

/**
* Async (non-blocking) set.<br>
* See <a href="https://redis.io/commands/set/">REDIS docs for SET</a>.
* Build a protobuf command/transaction request object.<br>
* Used by {@link CommandManager}.
*
* @param key The key name
* @param value The value to set
* @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a
* callback id.
*/
public CompletableFuture<String> set(String key, String value) {
return submitNewRequest(RequestType.SetString, List.of(key, value));
}
private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest(
Command.RequestType command, String[] args) {
RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs =
RedisRequestOuterClass.Command.ArgsArray.newBuilder();
for (var arg : args) {
commandArgs.addArgs(arg);
}

/**
* Build a command and submit it Netty to send.
*
* @param command Command type
* @param args Command arguments
* @return A result promise
*/
private CompletableFuture<String> submitNewRequest(RequestType command, List<String> args) {
return channel
.write(RequestBuilder.prepareRedisRequest(command, args), true)
.thenApplyAsync(this::extractValueFromGlideRsResponse);
return RedisRequestOuterClass.RedisRequest.newBuilder()
.setSingleCommand(
RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
.setArgsArray(commandArgs.build())
.build())
.setRoute(
RedisRequestOuterClass.Routes.newBuilder()
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes)
.build());
}

/**
* Check response and extract data from it.
*
* @param response A response received from rust core lib
* @return A String from the Redis response, or Ok. Otherwise, returns null
*/
private String extractValueFromGlideRsResponse(Response response) {
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RedisException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString();
private RedisRequestOuterClass.RequestType mapRequestTypes(Command.RequestType inType) {
switch (inType) {
case CUSTOM_COMMAND:
return RedisRequestOuterClass.RequestType.CustomCommand;
}
// if no response payload is provided, assume null
return null;
throw new RuntimeException("Unsupported request type");
}
}
Loading