-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java: Add managers layer to client #717
Changes from 15 commits
dfc0ea2
ce166b1
e429be9
708293f
12dcaba
68b42a5
fbd0366
579c53c
c78169b
7d1ba35
7c5fc69
cb0f6c4
74a6ce0
6dcad76
4cd6642
596462c
448c6e5
0327a84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,44 @@ | ||
package glide.connectors.handlers; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import lombok.RequiredArgsConstructor; | ||
import org.apache.commons.lang3.tuple.Pair; | ||
import response.ResponseOuterClass.Response; | ||
|
||
/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */ | ||
@RequiredArgsConstructor | ||
public class CallbackDispatcher { | ||
/** Unique request ID (callback ID). Thread-safe. */ | ||
private final AtomicInteger requestId = new AtomicInteger(0); | ||
|
||
/** | ||
* Client connection status needed to distinguish connection request. The callback dispatcher only | ||
* submits connection requests until a successful connection response is returned and the status | ||
* changes to true. The callback dispatcher then submits command requests. | ||
*/ | ||
private final AtomicBoolean isConnectedStatus; | ||
|
||
/** Unique request ID (callback ID). Thread-safe and overflow-safe. */ | ||
private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); | ||
|
||
/** | ||
* Storage of Futures to handle responses. Map key is callback id, which starts from 1.<br> | ||
* Each future is a promise for every submitted by user request. | ||
* Each future is a promise for every submitted by user request.<br> | ||
shachlanAmazon marked this conversation as resolved.
Show resolved
Hide resolved
acarbonetto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Note: Protobuf packet contains callback ID as uint32, but it stores data as a bit field.<br> | ||
* Negative java values would be shown as positive on rust side. Meanwhile, no data loss happen, | ||
acarbonetto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* because callback ID remains unique. | ||
*/ | ||
private final Map<Integer, CompletableFuture<Response>> responses = new ConcurrentHashMap<>(); | ||
private final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses = | ||
new ConcurrentHashMap<>(); | ||
|
||
/** | ||
* Storage for connection request similar to {@link #responses}. Unfortunately, connection | ||
* requests can't be stored in the same storage, because callback ID = 0 is hardcoded for | ||
* connection requests. | ||
* Storage of freed callback IDs. It is needed to avoid occupying an ID being used and to speed up | ||
* search for a next free ID.<br> | ||
*/ | ||
private final CompletableFuture<Response> connectionPromise = new CompletableFuture<>(); | ||
// TODO: Optimize to avoid growing up to 2e32 (16 Gb) https://github.com/aws/babushka/issues/704 | ||
private final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>(); | ||
|
||
/** | ||
* Register a new request to be sent. Once response received, the given future completes with it. | ||
|
@@ -32,14 +47,20 @@ public class CallbackDispatcher { | |
* response. | ||
*/ | ||
public Pair<Integer, CompletableFuture<Response>> registerRequest() { | ||
int callbackId = requestId.incrementAndGet(); | ||
var future = new CompletableFuture<Response>(); | ||
Integer callbackId = freeRequestIds.poll(); | ||
if (callbackId == null) { | ||
// on null, we have no available request ids available in freeRequestIds | ||
// instead, get the next available request from counter | ||
callbackId = nextAvailableRequestId.getAndIncrement(); | ||
} | ||
responses.put(callbackId, future); | ||
return Pair.of(callbackId, future); | ||
} | ||
|
||
public CompletableFuture<Response> registerConnection() { | ||
return connectionPromise; | ||
var res = registerRequest(); | ||
return res.getValue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so you created a future, you saved it in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Connection requests are always defined with key callbackIdx of 0. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why build in this assumption? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed this in-person, and it doesn't look like our connection requests accept a idx so we have to assume that the callback idx will be 0. |
||
} | ||
|
||
/** | ||
|
@@ -48,17 +69,20 @@ public CompletableFuture<Response> registerConnection() { | |
* @param response A response received | ||
*/ | ||
public void completeRequest(Response response) { | ||
// Complete and return the response at callbackId | ||
// free up the callback ID in the freeRequestIds list | ||
int callbackId = response.getCallbackIdx(); | ||
if (callbackId == 0) { | ||
connectionPromise.completeAsync(() -> response); | ||
CompletableFuture<Response> future = responses.remove(callbackId); | ||
freeRequestIds.add(callbackId); | ||
if (future != null) { | ||
future.completeAsync(() -> response); | ||
} else { | ||
responses.get(callbackId).completeAsync(() -> response); | ||
responses.remove(callbackId); | ||
// TODO: log an error. | ||
// probably a response was received after shutdown or `registerRequest` call was missing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should throw an error in this case and close the client There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This really should only ever happen if the client is shutting down. In extreme cases, there are no threads available to call compete... and this operation is cancelled. |
||
} | ||
} | ||
|
||
public void shutdownGracefully() { | ||
connectionPromise.cancel(false); | ||
responses.values().forEach(future -> future.cancel(false)); | ||
responses.clear(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package glide.connectors.resources; | ||
|
||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.epoll.EpollEventLoopGroup; | ||
import io.netty.channel.kqueue.KQueueEventLoopGroup; | ||
import io.netty.util.concurrent.DefaultThreadFactory; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.function.Supplier; | ||
|
||
/** A class responsible to allocating and deallocating shared thread pools. */ | ||
public class ThreadPoolAllocator { | ||
shachlanAmazon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br> | ||
* Map key is supposed to be pool name + thread count as a string concat product. | ||
*/ | ||
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in which scenario will we need more than a ELG? will the user be able to register ELGs with different names? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. We will accept user-provided ELG in the configurations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO added in 0327a84 |
||
|
||
/** | ||
* Allocate (create new or share existing) Netty thread pool required to manage connection. A | ||
* thread pool could be shared across multiple connections. | ||
* | ||
* @return A new thread pool. | ||
*/ | ||
public static EventLoopGroup createNettyThreadPool(String prefix, Optional<Integer> threadLimit) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: this doesn't necessarily create a thread pool - should be getOrCreateNettyThreadPool There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated in 0327a84 |
||
int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); | ||
if (Platform.getCapabilities().isKQueueAvailable()) { | ||
String name = prefix + "-kqueue-elg"; | ||
return getOrCreate( | ||
name + threadCount, | ||
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); | ||
} else if (Platform.getCapabilities().isEPollAvailable()) { | ||
String name = prefix + "-epoll-elg"; | ||
return getOrCreate( | ||
name + threadCount, | ||
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); | ||
} | ||
// TODO support IO-Uring and NIO | ||
|
||
throw new RuntimeException("Current platform supports no known thread pool types"); | ||
} | ||
|
||
/** | ||
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. | ||
*/ | ||
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) { | ||
if (groups.containsKey(name)) { | ||
return groups.get(name); | ||
} | ||
EventLoopGroup group = supplier.get(); | ||
groups.put(name, group); | ||
return group; | ||
} | ||
|
||
/** | ||
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing | ||
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br> | ||
* See {@link Runtime#addShutdownHook}. | ||
*/ | ||
private static class ShutdownHook implements Runnable { | ||
@Override | ||
public void run() { | ||
groups.values().forEach(EventLoopGroup::shutdownGracefully); | ||
} | ||
} | ||
|
||
static { | ||
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook")); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package glide.ffi.resolvers; | ||
|
||
import response.ResponseOuterClass.Response; | ||
|
||
public class RedisValueResolver { | ||
/** | ||
* Resolve a value received from Redis using given C-style pointer. | ||
* | ||
* @param pointer A memory pointer from {@link Response} | ||
* @return A RESP3 value | ||
*/ | ||
public static native Object valueFromPointer(long pointer); | ||
shachlanAmazon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this - isConnectedStatus is no longer needed.