Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add managers layer to client #717

Merged
merged 18 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ tasks.register('cleanProtobuf') {
tasks.register('buildRustRelease', Exec) {
commandLine 'cargo', 'build', '--release'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildRustReleaseStrip', Exec) {
commandLine 'cargo', 'build', '--release', '--strip'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildRust', Exec) {
commandLine 'cargo', 'build'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildWithRust') {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
package glide.connectors.handlers;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import response.ResponseOuterClass.Response;

/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */
@RequiredArgsConstructor
public class CallbackDispatcher {
/** Unique request ID (callback ID). Thread-safe. */

/** Client connection status needed to distinguish connection request. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment only explains where this is used, not what does the value mean.

private final AtomicBoolean connectionStatus;

/** Unique request ID (callback ID). Thread-safe and overflow-safe. */
private final AtomicInteger requestId = new AtomicInteger(0);

/** Reserved callback ID for connection request. */
private final Integer CONNECTION_PROMISE_ID = 0;

/**
* Storage of Futures to handle responses. Map key is callback id, which starts from 1.<br>
* Each future is a promise for every submitted by user request.
* Each future is a promise for every submitted by user request.<br>
shachlanAmazon marked this conversation as resolved.
Show resolved Hide resolved
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* Note: Protobuf packet contains callback ID as uint32, but it stores data as a bit field.<br>
* Negative java values would be shown as positive on rust side. Meanwhile, no data loss happen,
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* because callback ID remains unique.
*/
private final Map<Integer, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses =
new ConcurrentHashMap<>();

/**
* Storage for connection request similar to {@link #responses}. Unfortunately, connection
* requests can't be stored in the same storage, because callback ID = 0 is hardcoded for
* connection requests.
* Storage of freed callback IDs. It is needed to avoid occupying an ID being used and to speed up
* search for a next free ID.<br>
*/
private final CompletableFuture<Response> connectionPromise = new CompletableFuture<>();
// TODO: Optimize to avoid growing up to 2e32 (16 Gb) https://github.com/aws/babushka/issues/704
private final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>();

/**
* Register a new request to be sent. Once response received, the given future completes with it.
Expand All @@ -32,14 +46,22 @@ public class CallbackDispatcher {
* response.
*/
public Pair<Integer, CompletableFuture<Response>> registerRequest() {
int callbackId = requestId.incrementAndGet();
var future = new CompletableFuture<Response>();
Integer callbackId = connectionStatus.get() ? freeRequestIds.poll() : CONNECTION_PROMISE_ID;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not following why do we need to have a reserved ID for the connection promise.
The client creation call shouldn't return to the user until the connection is set with the connection configurations. So, why can't we just assign the first callback index to the connection info? we don't need to save this index to a later stage - we won't configure the connection again in the client lifetime.

if (callbackId == null) {
callbackId = requestId.incrementAndGet();
if (callbackId.equals(CONNECTION_PROMISE_ID)) {
throw new RuntimeException(
"Can't submit a new request, too many requests are pending already");
}
}
responses.put(callbackId, future);
return Pair.of(callbackId, future);
}

public CompletableFuture<Response> registerConnection() {
return connectionPromise;
var res = registerRequest();
return res.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you created a future, you saved it in responses, and now you ignore the key? how will you remove this future from responses?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connection requests are always defined with key callbackIdx of 0.
When the connection completes, we call completeRequest and the response will be removed from the responses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why build in this assumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this in-person, and it doesn't look like our connection requests accept a idx so we have to assume that the callback idx will be 0.

}

/**
Expand All @@ -48,17 +70,20 @@ public CompletableFuture<Response> registerConnection() {
* @param response A response received
*/
public void completeRequest(Response response) {
int callbackId = response.getCallbackIdx();
if (callbackId == 0) {
connectionPromise.completeAsync(() -> response);
// A connection response doesn't contain a callback id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rust core does return it with the callback index it was sent with. If java doesn't support 0 in the callback index, you can simply start all callback indexes from 1

int callbackId = connectionStatus.get() ? response.getCallbackIdx() : CONNECTION_PROMISE_ID;
CompletableFuture<Response> future = responses.get(callbackId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use remove here

if (future != null) {
future.completeAsync(() -> response);
} else {
responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
// TODO: log an error.
// probably a response was received after shutdown or `registerRequest` call was missing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error in this case and close the client

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really should only ever happen if the client is shutting down. In extreme cases, there are no threads available to call compete... and this operation is cancelled.

}
responses.remove(callbackId);
freeRequestIds.add(callbackId);
}

public void shutdownGracefully() {
connectionPromise.cancel(false);
responses.values().forEach(future -> future.cancel(false));
responses.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolAllocator;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.unix.DomainSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;

Expand All @@ -27,7 +27,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
.group(Platform.createNettyThreadPool(THREAD_POOL_NAME, Optional.empty()))
.group(ThreadPoolAllocator.createNettyThreadPool(THREAD_POOL_NAME, Optional.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
Expand Down Expand Up @@ -66,13 +66,9 @@ public CompletableFuture<Response> connect(ConnectionRequest request) {
return callbackDispatcher.registerConnection();
}

private final AtomicBoolean closed = new AtomicBoolean(false);

/** Closes the UDS connection and frees corresponding resources. */
public void close() {
if (closed.compareAndSet(false, true)) {
channel.close();
callbackDispatcher.shutdownGracefully();
}
channel.close();
callbackDispatcher.shutdownGracefully();
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package glide.connectors.resources;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -29,7 +21,7 @@ public class Platform {
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@ToString
private static class Capabilities {
public static class Capabilities {
private final boolean isKQueueAvailable;
private final boolean isEPollAvailable;
// TODO support IO-Uring and NIO
Expand All @@ -44,12 +36,6 @@ private static class Capabilities {
private static final Capabilities capabilities =
new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false);

/**
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();

/** Detect <em>kqueue</em> availability. */
private static boolean isKQueueAvailable() {
try {
Expand All @@ -70,42 +56,6 @@ private static boolean isEPollAvailable() {
}
}

/**
* Allocate Netty thread pool required to manage connection. A thread pool could be shared across
* multiple connections.
*
* @return A new thread pool.
*/
public static EventLoopGroup createNettyThreadPool(String prefix, Optional<Integer> threadLimit) {
int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
if (capabilities.isKQueueAvailable()) {
var name = prefix + "-kqueue-elg";
return getOrCreate(
name + threadCount,
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
} else if (capabilities.isEPollAvailable()) {
var name = prefix + "-epoll-elg";
return getOrCreate(
name + threadCount,
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
}
// TODO support IO-Uring and NIO

throw new RuntimeException("Current platform supports no known thread pool types");
}

/**
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
*/
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) {
if (groups.containsKey(name)) {
return groups.get(name);
}
EventLoopGroup group = supplier.get();
groups.put(name, group);
return group;
}

/**
* Get a channel class required by Netty to open a client UDS channel.
*
Expand All @@ -120,20 +70,4 @@ public static Class<? extends DomainSocketChannel> getClientUdsNettyChannelType(
}
throw new RuntimeException("Current platform supports no known socket types");
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
@Override
public void run() {
groups.values().forEach(EventLoopGroup::shutdownGracefully);
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package glide.connectors.resources;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/** A class responsible to allocating and deallocating shared thread pools. */
public class ThreadPoolAllocator {
shachlanAmazon marked this conversation as resolved.
Show resolved Hide resolved

/**
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which scenario will we need more than a ELG? will the user be able to register ELGs with different names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We will accept user-provided ELG in the configurations.

Copy link
Contributor

@acarbonetto acarbonetto Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO added in 0327a84


/**
* Allocate (create new or share existing) Netty thread pool required to manage connection. A
* thread pool could be shared across multiple connections.
*
* @return A new thread pool.
*/
public static EventLoopGroup createNettyThreadPool(String prefix, Optional<Integer> threadLimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this doesn't necessarily create a thread pool - should be getOrCreateNettyThreadPool

Copy link
Contributor

@acarbonetto acarbonetto Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in 0327a84

int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
if (Platform.getCapabilities().isKQueueAvailable()) {
String name = prefix + "-kqueue-elg";
return getOrCreate(
name + threadCount,
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
} else if (Platform.getCapabilities().isEPollAvailable()) {
String name = prefix + "-epoll-elg";
return getOrCreate(
name + threadCount,
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
}
// TODO support IO-Uring and NIO

throw new RuntimeException("Current platform supports no known thread pool types");
}

/**
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
*/
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) {
if (groups.containsKey(name)) {
return groups.get(name);
}
EventLoopGroup group = supplier.get();
groups.put(name, group);
return group;
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
@Override
public void run() {
groups.values().forEach(EventLoopGroup::shutdownGracefully);
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package glide.ffi.resolvers;

import response.ResponseOuterClass.Response;

public class RedisValueResolver {
/**
* Resolve a value received from Redis using given C-style pointer.
*
* @param pointer A memory pointer from {@link Response}
* @return A RESP3 value
*/
public static native Object valueFromPointer(long pointer);
shachlanAmazon marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package glide.ffi.resolvers;

public class GlideCoreNativeDefinitions {
public static native String startSocketListenerExternal() throws Exception;
public class SocketListenerResolver {

public static native Object valueFromPointer(long pointer);
/** Make an FFI call to Glide to open a UDS socket to connect to. */
private static native String startSocketListener() throws Exception;

static {
System.loadLibrary("glide-rs");
System.loadLibrary("glide_rs");
}

/**
Expand All @@ -16,7 +16,7 @@ public class GlideCoreNativeDefinitions {
*/
public static String getSocket() {
try {
return startSocketListenerExternal();
return startSocketListener();
} catch (Exception | UnsatisfiedLinkError e) {
System.err.printf("Failed to create a UDS connection: %s%n%n", e);
throw new RuntimeException(e);
Expand Down
Loading
Loading