Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add managers layer to client #717

Merged
merged 18 commits into from
Jan 3, 2024

Conversation

Yury-Fridlyand
Copy link
Collaborator

@Yury-Fridlyand Yury-Fridlyand commented Dec 21, 2023

Part 2 of client implementation. See part 1 in #670

part2

This PR includes changes for:

  1. Manager layer for java client: ConnectionManager and CommandManager
  2. RequestBuilder
  3. Refactor for Platform - class was split into two: Platform and ThreadPoolAllocator
  4. Refactor on CallbackDispatcher for better callback id handling
  5. RedisValueResolver

Proposed usage in Client class:

package babushka.api;

import babushka.connectors.handlers.CallbackDispatcher;
import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.SocketListenerResolver;
import babushka.managers.CommandManager;
import babushka.managers.ConnectionManager;
import java.util.concurrent.atomic.AtomicBoolean;

public class Client {

  public Client() {

    AtomicBoolean connectionStatus = new AtomicBoolean(false);

    CallbackDispatcher callbackDispatcher = new CallbackDispatcher(connectionStatus);
    ChannelHandler channelHandler =
        new ChannelHandler(callbackDispatcher, SocketListenerResolver.getSocket());
    var connectionManager = new ConnectionManager(channelHandler, connectionStatus);
    var commandManager = new CommandManager(channelHandler);
  }
}

And Awaiter:

package babushka.api;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Awaiter {
  private static final long DEFAULT_TIMEOUT_MILLISECONDS = 30000;

  /** Get the future result with default timeout. */
  public static <T> T await(CompletableFuture<T> future) {
    return await(future, DEFAULT_TIMEOUT_MILLISECONDS);
  }

  /** Get the future result with given timeout in ms. */
  public static <T> T await(CompletableFuture<T> future, long timeout) {
    try {
      return future.get(timeout, TimeUnit.MILLISECONDS);
    } catch (ExecutionException e) {
      throw new RuntimeException(e.getMessage(), e.getCause());
    } catch (InterruptedException e) {
      // TODO should shutdown the client service
      if (Thread.currentThread().isInterrupted()) {
        // restore interrupt
        Thread.interrupted();
      }
      throw new RuntimeException("The thread was interrupted", e);
    } catch (TimeoutException e) {
      throw new RuntimeException("Request timed out", e);
    } catch (CancellationException e) {
      throw new RuntimeException("Request was cancelled", e);
    }
  }
}

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
* Managers

Signed-off-by: Yury-Fridlyand <[email protected]>

* Refactor

Signed-off-by: Yury-Fridlyand <[email protected]>

* Split `ClientState` and `Platform`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Refactor

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR feedback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Refactor

Signed-off-by: Yury-Fridlyand <[email protected]>

* javadocs

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
@Yury-Fridlyand Yury-Fridlyand added the java issues and fixes related to the java client label Dec 21, 2023
@Yury-Fridlyand Yury-Fridlyand requested a review from a team as a code owner December 21, 2023 20:47
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
}

/** Check a response received from Glide. */
private boolean checkGlideRsResponse(Response response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling is very similar to extractValueFromGlideRsResponse in the CommandManager. I have combined these in the following PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do this in the next PR and not here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep it shouldn't be duplicated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i'm not reviewing of this function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. To reduce the size of the PR to review, we're doing this in stages. We have an API-layer PR that deals with this function properly.
And we handle errors in the API side, rather than in the backend.

Signed-off-by: Yury-Fridlyand <[email protected]>
import redis_request.RedisRequestOuterClass.Routes;
import redis_request.RedisRequestOuterClass.SimpleRoutes;

public class RequestBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is being merged with the Connection and CommandManagers by San.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is being merged with the Connection and CommandManagers by San.

How long will it take? If it'll be done soon, maybe just update this PR with the change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'll make this PR really big :(
I'll raise the PR now so you can review them side-by-side. How about that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@shachlanAmazon shachlanAmazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initial comments

}

return RedisRequest.newBuilder()
.setSingleCommand( // set command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these comments needed? they just repeat the name of the called function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest that we postpone review of this file, as it gets merged into the ConnectionManager https://github.com/aws/babushka/pull/737/files#diff-83c7a94dda6cdf7ffac0c085bf9abbff0fd3a3baf753e5043be7f21fdf48733d

// TODO commented out due to #710 https://github.com/aws/babushka/issues/710
// empty response means a successful command
// throw new IllegalStateException("A malformed response received: " + response.toString());
return "OK";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a null response, not OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Out of scope for now. We have an incoming API with typing PR coming that properly handles the valueFromPointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why return "OK"; -> return null; out of scope, but 🤷 just add a TODO

}
// TODO commented out due to #710 https://github.com/aws/babushka/issues/710
// empty response means a successful command
// throw new IllegalStateException("A malformed response received: " + response.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove commented out code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
We have an incoming API with typing PR coming that properly handles the valueFromPointer response.

private CompletableFuture<String> submitNewRequest(RequestType command, List<String> args) {
// TODO this explicitly uses ForkJoin thread pool. May be we should use another one.
return CompletableFuture.supplyAsync(
() -> channel.write(RequestBuilder.prepareRedisRequest(command, args), true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if channel.write returns a completable future, why do you need CompletableFuture.supplyAsync?

}

/** Check a response received from Glide. */
private boolean checkGlideRsResponse(Response response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do this in the next PR and not here?

import redis_request.RedisRequestOuterClass.Routes;
import redis_request.RedisRequestOuterClass.SimpleRoutes;

public class RequestBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is being merged with the Connection and CommandManagers by San.

How long will it take? If it'll be done soon, maybe just update this PR with the change.

public class CallbackDispatcher {
/** Unique request ID (callback ID). Thread-safe. */

/** Client connection status needed to distinguish connection request. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment only explains where this is used, not what does the value mean.

var future = new CompletableFuture<Response>();
Integer callbackId = connectionStatus.get() ? freeRequestIds.poll() : CONNECTION_PROMISE_ID;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not following why do we need to have a reserved ID for the connection promise.
The client creation call shouldn't return to the user until the connection is set with the connection configurations. So, why can't we just assign the first callback index to the connection info? we don't need to save this index to a later stage - we won't configure the connection again in the client lifetime.

int callbackId = response.getCallbackIdx();
if (callbackId == 0) {
connectionPromise.completeAsync(() -> response);
// A connection response doesn't contain a callback id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rust core does return it with the callback index it was sent with. If java doesn't support 0 in the callback index, you can simply start all callback indexes from 1

responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
// TODO: log an error.
// probably a response was received after shutdown or `registerRequest` call was missing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error in this case and close the client

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really should only ever happen if the client is shutting down. In extreme cases, there are no threads available to call compete... and this operation is cancelled.

connectionPromise.completeAsync(() -> response);
// A connection response doesn't contain a callback id
int callbackId = connectionStatus.get() ? response.getCallbackIdx() : CONNECTION_PROMISE_ID;
CompletableFuture<Response> future = responses.get(callbackId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use remove here

* Check response and extract data from it.
*
* @param response A response received from rust core lib
* @return A String from the Redis RESP2 response, or Ok. Otherwise, returns null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using RESP3 by default, but it can be changed, lets remove the protocol from the comment

"%s: %s",
response.getRequestError().getType(), response.getRequestError().getMessage()));
} else if (response.hasClosingError()) {
CompletableFuture.runAsync(channel::close);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does channel::close closes the socket and all open futures? We need to call shutdownGracefully to close the socket and all open futures when we get a closing error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the following PR, we're handling the closeError at a higher level, which has access to close the open requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then please add a TODO to close the channel on closeError

*
* @param host Server address
* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useSsl => useTLS

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this. We have an incoming PR with configuration that properly names these variables as per node/python.

Copy link
Contributor

@acarbonetto acarbonetto Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 4cd6642

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why merge the wrong names in this PR? This isn't something that increases the size of the PR.

* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
* Security
* @param clusterMode true if REDIS instance runs in the cluster mode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true if the client is used for connecting to a Redis Cluster.

Copy link
Contributor

@acarbonetto acarbonetto Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 4cd6642

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's 4cd6642?

responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
// TODO: log an error.
// probably a response was received after shutdown or `registerRequest` call was missing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really should only ever happen if the client is shutting down. In extreme cases, there are no threads available to call compete... and this operation is cancelled.

"%s: %s",
response.getRequestError().getType(), response.getRequestError().getMessage()));
} else if (response.hasClosingError()) {
CompletableFuture.runAsync(channel::close);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the following PR, we're handling the closeError at a higher level, which has access to close the open requests.

}
// TODO commented out due to #710 https://github.com/aws/babushka/issues/710
// empty response means a successful command
// throw new IllegalStateException("A malformed response received: " + response.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
We have an incoming API with typing PR coming that properly handles the valueFromPointer response.

*
* @param host Server address
* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this. We have an incoming PR with configuration that properly names these variables as per node/python.

}

/** Check a response received from Glide. */
private boolean checkGlideRsResponse(Response response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. To reduce the size of the PR to review, we're doing this in stages. We have an API-layer PR that deals with this function properly.
And we handle errors in the API side, rather than in the backend.

import redis_request.RedisRequestOuterClass.Routes;
import redis_request.RedisRequestOuterClass.SimpleRoutes;

public class RequestBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'll make this PR really big :(
I'll raise the PR now so you can review them side-by-side. How about that?

Signed-off-by: Andrew Carbonetto <[email protected]>
* submits connection requests until a successful connection response is returned and the status
* changes to true. The callback dispatcher then submits command requests.
*/
private final AtomicBoolean isConnectedStatus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this - isConnectedStatus is no longer needed.

Copy link
Collaborator

@jonathanl-bq jonathanl-bq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few nit comments. Approved if those are fixed as well as other remaining comments.

Copy link
Contributor

@shachlanAmazon shachlanAmazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix comments / mark all future work with a TODO.

* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
* Security
* @param clusterMode true if REDIS instance runs in the cluster mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's 4cd6642?

*
* @param host Server address
* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why merge the wrong names in this PR? This isn't something that increases the size of the PR.

// TODO commented out due to #710 https://github.com/aws/babushka/issues/710
// empty response means a successful command
// throw new IllegalStateException("A malformed response received: " + response.toString());
return "OK";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why return "OK"; -> return null; out of scope, but 🤷 just add a TODO

responses.put(callbackId, future);
return Pair.of(callbackId, future);
}

public CompletableFuture<Response> registerConnection() {
return connectionPromise;
var res = registerRequest();
return res.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you created a future, you saved it in responses, and now you ignore the key? how will you remove this future from responses?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connection requests are always defined with key callbackIdx of 0.
When the connection completes, we call completeRequest and the response will be removed from the responses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why build in this assumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this in-person, and it doesn't look like our connection requests accept a idx so we have to assume that the callback idx will be 0.

* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which scenario will we need more than a ELG? will the user be able to register ELGs with different names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We will accept user-provided ELG in the configurations.

Copy link
Contributor

@acarbonetto acarbonetto Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO added in 0327a84

* @param key The key name
* @param value The value to set
*/
public CompletableFuture<String> set(String key, String value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you like to track the missing options field? TODO, issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a TODO here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO added in 0327a84

"%s: %s",
response.getRequestError().getType(), response.getRequestError().getMessage()));
} else if (response.hasClosingError()) {
CompletableFuture.runAsync(channel::close);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then please add a TODO to close the channel on closeError

*/
private String extractValueFromGlideRsResponse(Response response) {
if (response.hasRequestError()) {
// TODO do we need to support different types of exceptions and distinguish them by type?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove the question mark :)
yes, you need to throw the same exceptions as other clients.

Copy link
Contributor

@acarbonetto acarbonetto Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO updated in 0327a84

* responses from the same socket channel handler.
*/
@RequiredArgsConstructor
public class ConnectionManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a single-use object - why is it an instantiated object and not a static function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea. Static classes are harder to test, and when we add configuration this class will grow.

That being said, if we want to really drive home the 'single use' responsibility of the object (i.e. make a connection and then forget about this) we should make this a static class with static-only methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to differ this comment to the follow-up PR.
We will make it into a static class there if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acarbonetto doesn't have to be in the next PR. can be in a separate PR (small is fast!)

/** Check a response received from Glide. */
private Void checkGlideRsResponse(Response response) {
if (response.hasRequestError()) {
// TODO do we need to support different types of exceptions and distinguish them by type?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should expect only closing errors.

Copy link
Contributor

@acarbonetto acarbonetto Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO updated in 0327a84

Signed-off-by: Andrew Carbonetto <[email protected]>
responses.put(callbackId, future);
return Pair.of(callbackId, future);
}

public CompletableFuture<Response> registerConnection() {
return connectionPromise;
var res = registerRequest();
return res.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this in-person, and it doesn't look like our connection requests accept a idx so we have to assume that the callback idx will be 0.

* responses from the same socket channel handler.
*/
@RequiredArgsConstructor
public class ConnectionManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to differ this comment to the follow-up PR.
We will make it into a static class there if necessary.

@acarbonetto acarbonetto merged commit 29d9bb0 into valkey-io:main Jan 3, 2024
3 checks passed
@acarbonetto acarbonetto deleted the java/integ_yuryf_client_part2 branch January 3, 2024 16:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
java issues and fixes related to the java client
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants