Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: API layer with RedisClient #737

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8312b0b
Java: API layer with RedisClient and basic commands (#46)
acarbonetto Dec 28, 2023
62836d8
Remove command calls from API
acarbonetto Dec 29, 2023
9e95ce6
Remove exception models from PR
acarbonetto Dec 29, 2023
114ed94
Update ConnectionManager to receive configuration
acarbonetto Dec 29, 2023
de3c4f3
Remove unwanted file
acarbonetto Dec 29, 2023
1a31d3b
Update JavaDocs and comments
acarbonetto Jan 2, 2024
2a9e6ff
Update comments; Add CreateClient unit tests
acarbonetto Jan 3, 2024
75d2494
Clean up RedisClient and ConnectionManager after mergre
acarbonetto Jan 3, 2024
617ab3e
More clean up after merge
acarbonetto Jan 3, 2024
1e1842c
Fix failed connection test
acarbonetto Jan 4, 2024
26dd364
Spotless fix
acarbonetto Jan 4, 2024
3ae1f07
Update CreateClient tests
acarbonetto Jan 4, 2024
098122f
Clean up tests and some minor comments
acarbonetto Jan 5, 2024
76b5cd7
Cleaning up tests and adding TODOs
acarbonetto Jan 5, 2024
8092b62
Revert lib.rs change
acarbonetto Jan 8, 2024
30a8d64
Add comment to make connectionManager static
acarbonetto Jan 8, 2024
6f1e3da
Spotless
acarbonetto Jan 10, 2024
818a704
Apply default test name replacer
acarbonetto Jan 10, 2024
8546d9f
Update comments from review comments
acarbonetto Jan 8, 2024
31bc4c5
Spotless
acarbonetto Jan 10, 2024
9b02892
Add javadoc
acarbonetto Jan 10, 2024
1d18311
Merge branch 'main' into java/integ_acarbo_refactor_api_createclient
acarbonetto Jan 10, 2024
0c7d03b
Update for comments
acarbonetto Jan 11, 2024
572fc7b
Spotless
acarbonetto Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testAnnotationProcessor 'org.projectlombok:lombok:1.18.30'

// junit
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}
Expand Down
33 changes: 33 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package glide.api;

import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable {

protected ConnectionManager connectionManager;
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
protected CommandManager commandManager;
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
*
* <p>see: <a
* href="https://docs.oracle.com/javase/8/docs/api/java/lang/AutoCloseable.html#close--">AutoCloseable::close()</a>
*/
@Override
public void close() throws ExecutionException {
try {
connectionManager.closeConnection().get();
} catch (InterruptedException interruptedException) {
// AutoCloseable classes are strongly advised to avoid throwing InterruptedExceptions
// TODO: marking resources as closed:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore interrupt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is right.
see: https://github.com/orgs/Bit-Quill/projects/4/views/6?pane=issue&itemId=48063887
Ultimately, it would be best to throw a RedisException/ClosingException after interrupting on all existing responses.

// https://github.com/orgs/Bit-Quill/projects/4/views/6?pane=issue&itemId=48063887
throw new RuntimeException(interruptedException);
}
}
}
50 changes: 50 additions & 0 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package glide.api;

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
*/
public class RedisClient extends BaseClient {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all exeptions are properly propagated from that async pipeline. They are wrapped by ExectionException. Please, re-check. Maybe you can remove this TODO and/or add a test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a task to add exception handling. It would be 'nicer' if the API supported throwing RedisException, rather than an ExecutionException with a RedisException.
I'm pretty certain we can extend CompletableFutures to handle RedisException, in parallel with ExcutionExceptions and that would provide a better experience for our users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, tests are already included in the follow-up PR, that adds RedisExceptions.

return connectionManager
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
.thenApplyAsync(ignore -> new RedisClient(connectionManager, commandManager));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to run this in a separate thread?

}

protected static ChannelHandler buildChannelHandler() {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
}

protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

/**
* Represents the strategy used to determine how and when to reconnect, in case of connection
* failures. The time between attempts grows exponentially, to the formula <code>rand(0 ... factor *
* (exponentBase ^ N))</code>, where <code>N</code> is the number of failed attempts.
*
* <p>Once the maximum value is reached, that will remain the time between retry attempts until a
* reconnect attempt is successful. The client will attempt to reconnect indefinitely.
*/
@Getter
@Builder
public class BackoffStrategy {
/**
* Number of retry attempts that the client should perform when disconnected from the server,
* where the time between retries increases. Once the retries have reached the maximum value, the
* time between retries will remain constant until a reconnect attempt is successful.
*/
@NonNull private final Integer numOfRetries;

/** The multiplier that will be applied to the waiting time between each retry. */
@NonNull private final Integer factor;

/** The exponent base configured for the strategy. */
@NonNull private final Integer exponentBase;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package glide.api.models.configuration;

import java.util.List;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Singular;
import lombok.experimental.SuperBuilder;

/**
* Configuration settings class for creating a Redis Client. Shared settings for standalone and
* cluster clients.
*/
@Getter
@SuperBuilder
public abstract class BaseClientConfiguration {
/**
* DNS Addresses and ports of known nodes in the cluster. If the server is in cluster mode the
* list can be partial, as the client will attempt to map out the cluster and find all nodes. If
* the server is in standalone mode, only nodes whose addresses were provided will be used by the
* client. For example: <code>[ {address:sample-address-0001.use1.cache.amazonaws.com, port:6379},
* {address: sample-address-0002.use2.cache.amazonaws.com, port:6379} ]</code>. If none are set, a
* default address localhost:6379 will be used.
*/
@Singular private final List<NodeAddress> addresses;

/**
* True if communication with the cluster should use Transport Level Security.
*
* <p>If the server/cluster requires TLS, not setting this will cause the connection attempt to
* fail.
*
* <p>If the server/cluster doesn't require TLS, setting this will also cause the connection
* attempt to fail.
*/
@Builder.Default private final boolean useTLS = false;

/** Represents the client's read from strategy. */
@NonNull @Builder.Default private final ReadFrom readFrom = ReadFrom.PRIMARY;

/**
* Credentials for authentication process. If none are set, the client will not authenticate
* itself with the server.
*/
private final RedisCredentials credentials;

/**
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

/** Represents the address and port of a node in the cluster. */
@Getter
@Builder
public class NodeAddress {
public static String DEFAULT_HOST = "localhost";
public static Integer DEFAULT_PORT = 6379;

@NonNull @Builder.Default private final String host = DEFAULT_HOST;
@NonNull @Builder.Default private final Integer port = DEFAULT_PORT;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package glide.api.models.configuration;

/** Represents the client's read from strategy. */
public enum ReadFrom {
/** Always get from primary, in order to get the freshest data. */
PRIMARY,
/**
* Spread the requests between all replicas in a round-robin manner. If no replica is available,
* route the requests to the primary.
*/
PREFER_REPLICA
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package glide.api.models.configuration;

import lombok.Getter;
import lombok.experimental.SuperBuilder;

/** Represents the configuration settings for a Standalone Redis client. */
@Getter
@SuperBuilder
public class RedisClientConfiguration extends BaseClientConfiguration {
/** Strategy used to determine how and when to reconnect, in case of connection failures. */
private final BackoffStrategy reconnectStrategy;

/** Index of the logical database to connect to. */
private final Integer databaseId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package glide.api.models.configuration;

import lombok.experimental.SuperBuilder;

/**
* Represents the configuration settings for a Cluster Redis client. Notes: Currently, the
* reconnection strategy in cluster mode is not configurable, and exponential backoff with fixed
* values is used.
*/
@SuperBuilder
public class RedisClusterClientConfiguration extends BaseClientConfiguration {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

/** Represents the credentials for connecting to a Redis server. */
@Getter
@Builder
public class RedisCredentials {
/** The password that will be used for authenticating connections to the Redis servers. */
@NonNull private final String password;

/**
* The username that will be used for authenticating connections to the Redis servers. If not
* supplied, "default" will be used.
*/
private final String username;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
import glide.connectors.resources.ThreadPoolAllocator;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.UnixChannel;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;

/**
* Class responsible for handling calls to/from a netty.io {@link Channel}.<br>
* Uses a {@link CallbackDispatcher} to record callbacks of every request sent.
* Class responsible for handling calls to/from a netty.io {@link Channel}. Uses a {@link
shachlanAmazon marked this conversation as resolved.
Show resolved Hide resolved
* CallbackDispatcher} to record callbacks of every request sent.
*/
public class ChannelHandler {

Expand All @@ -24,15 +28,35 @@ public class ChannelHandler {

/** Open a new channel for a new client. */
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) {
// TODO: add the ability to pass in group and channel from user
this(
ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()),
Platform.getClientUdsNettyChannelType(),
new ProtobufSocketChannelInitializer(callbackDispatcher),
new DomainSocketAddress(socketPath),
callbackDispatcher);
}

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup
*
* @param eventLoopGroup - ELG to run handler on
* @param domainSocketChannelClass - socket channel class for Handler
* @param channelInitializer - UnixChannel initializer
* @param domainSocketAddress - address to connect
* @param callbackDispatcher - dispatcher to handle callbacks
*/
public ChannelHandler(
EventLoopGroup eventLoopGroup,
Class<? extends DomainSocketChannel> domainSocketChannelClass,
ChannelInitializer<UnixChannel> channelInitializer,
DomainSocketAddress domainSocketAddress,
CallbackDispatcher callbackDispatcher) {
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
.group(
ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
.group(eventLoopGroup)
.channel(domainSocketChannelClass)
.handler(channelInitializer)
.connect(domainSocketAddress)
// TODO call here .sync() if needed or remove this comment
.channel();
this.callbackDispatcher = callbackDispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ public class ReadHandler extends ChannelInboundHandlerAdapter {

/** Submit responses from glide to an instance {@link CallbackDispatcher} to handle them. */
@Override
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) {
callbackDispatcher.completeRequest((Response) msg);
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
throws RuntimeException {
if (msg instanceof Response) {
Response response = (Response) msg;
callbackDispatcher.completeRequest(response);
ctx.fireChannelRead(msg);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not fireChannelReadComplete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that doesn't pass the message to other read handlers. That tells the read pipeline to complete, I believe (documentation on that method is somewhat lacking).

return;
}
throw new RuntimeException("Unexpected message in socket");
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
}

/** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import response.ResponseOuterClass.Response;

public class RedisValueResolver {

// TODO: consider lazy loading the glide_rs library
static {
System.loadLibrary("glide_rs");
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Resolve a value received from Redis using given C-style pointer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class SocketListenerResolver {
/** Make an FFI call to Glide to open a UDS socket to connect to. */
private static native String startSocketListener() throws Exception;

// TODO: consider lazy loading the glide_rs library
static {
System.loadLibrary("glide_rs");
}
Expand Down
Loading