Skip to content

Commit

Permalink
Add connection timeout configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Dec 22, 2024
1 parent 7bd4dcd commit 7ec079b
Show file tree
Hide file tree
Showing 25 changed files with 465 additions and 44 deletions.
5 changes: 5 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions {
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
///
/// This optional field sets the maximum duration to wait when attempting to establish
/// a connection. If `None`, the connection will use a default timeout.
pub connection_timeout: Option<Duration>,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
push_sender: None,
disconnect_notifier,
discover_az,
connection_timeout: None,
},
)
.await
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ where
push_sender,
disconnect_notifier,
discover_az,
connection_timeout: None,
};

let connections = Self::create_initial_connections(
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"FUNCTION STATS" => RouteBy::AllNodes,

b"DBSIZE"
| b"DEBUG"
| b"FLUSHALL"
| b"FLUSHDB"
| b"FT._ALIASLIST"
Expand Down Expand Up @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"COMMAND LIST"
| b"COMMAND"
| b"CONFIG GET"
| b"DEBUG"
| b"ECHO"
| b"FUNCTION LIST"
| b"LASTSAVE"
Expand Down
9 changes: 6 additions & 3 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub const DEFAULT_RETRIES: u32 = 3;
pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
Expand Down Expand Up @@ -584,8 +584,9 @@ async fn create_cluster_client(
Some(PeriodicCheck::ManualInterval(interval)) => Some(interval),
None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
};
let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT);
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT)
.connection_timeout(connection_timeout)
.retries(DEFAULT_RETRIES);
let read_from_strategy = request.read_from.unwrap_or_default();
builder = builder.read_from(match read_from_strategy {
Expand Down Expand Up @@ -683,6 +684,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
"\nStandalone mode"
};
let request_timeout = format_optional_value("Request timeout", request.request_timeout);
let connection_timeout =
format_optional_value("Connection timeout", request.connection_timeout);
let database_id = format!("\ndatabase ID: {}", request.database_id);
let rfr_strategy = request
.read_from
Expand Down Expand Up @@ -739,7 +742,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
);

format!(
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
)
}

Expand Down
18 changes: 16 additions & 2 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ async fn get_multiplexed_connection(
connection_options: &GlideConnectionOptions,
) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
Some(
connection_options
.connection_timeout
.unwrap_or(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
),
client.get_multiplexed_async_connection(connection_options.clone()),
)
.await
Expand Down Expand Up @@ -114,6 +118,7 @@ async fn create_connection(
retry_strategy: RetryStrategy,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
let client = &connection_backend.connection_info;
let connection_options = GlideConnectionOptions {
Expand All @@ -122,6 +127,7 @@ async fn create_connection(
TokioDisconnectNotifier::new(),
)),
discover_az,
connection_timeout: Some(connection_timeout),
};
let action = || async {
get_multiplexed_connection(client, &connection_options)
Expand Down Expand Up @@ -207,6 +213,7 @@ impl ReconnectingConnection {
tls_mode: TlsMode,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
log_debug(
"connection creation",
Expand All @@ -219,7 +226,14 @@ impl ReconnectingConnection {
connection_available_signal: ManualResetEvent::new(true),
client_dropped_flagged: AtomicBool::new(false),
};
create_connection(backend, connection_retry_strategy, push_sender, discover_az).await
create_connection(
backend,
connection_retry_strategy,
push_sender,
discover_az,
connection_timeout,
)
.await
}

pub(crate) fn node_address(&self) -> String {
Expand Down
10 changes: 10 additions & 0 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
use super::get_redis_connection_info;
use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection};
use super::{get_redis_connection_info, to_duration, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT};
use super::{ConnectionRequest, NodeAddress, TlsMode};
use crate::client::types::ReadFrom as ClientReadFrom;
use crate::retry_strategies::RetryStrategy;
Expand All @@ -16,6 +17,7 @@ use redis::{PushInfo, RedisError, RedisResult, Value};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use telemetrylib::Telemetry;
use tokio::sync::mpsc;
use tokio::task;
Expand Down Expand Up @@ -131,6 +133,11 @@ impl StandaloneClient {
Some(ClientReadFrom::AZAffinity(_))
);

let connection_timeout = to_duration(
connection_request.connection_timeout,
DEFAULT_CONNECTION_ATTEMPT_TIMEOUT,
);

let mut stream = stream::iter(connection_request.addresses.iter())
.map(|address| async {
get_connection_and_replication_info(
Expand All @@ -144,6 +151,7 @@ impl StandaloneClient {
tls_mode.unwrap_or(TlsMode::NoTls),
&push_sender,
discover_az,
connection_timeout,
)
.await
.map_err(|err| (format!("{}:{}", address.host, address.port), err))
Expand Down Expand Up @@ -553,6 +561,7 @@ async fn get_connection_and_replication_info(
tls_mode: TlsMode,
push_sender: &Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<(ReconnectingConnection, Value), (ReconnectingConnection, RedisError)> {
let result = ReconnectingConnection::new(
address,
Expand All @@ -561,6 +570,7 @@ async fn get_connection_and_replication_info(
tls_mode,
push_sender.clone(),
discover_az,
connection_timeout,
)
.await;
let reconnecting_connection = match result {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct ConnectionRequest {
pub addresses: Vec<NodeAddress>,
pub cluster_mode_enabled: bool,
pub request_timeout: Option<u32>,
pub connection_timeout: Option<u32>,
pub connection_retry_strategy: Option<ConnectionRetryStrategy>,
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
Expand Down Expand Up @@ -149,6 +150,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
.collect();
let cluster_mode_enabled = value.cluster_mode_enabled;
let request_timeout = none_if_zero(value.request_timeout);
let connection_timeout = none_if_zero(value.connection_timeout);
let connection_retry_strategy =
value
.connection_retry_strategy
Expand Down Expand Up @@ -216,6 +218,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
addresses,
cluster_mode_enabled,
request_timeout,
connection_timeout,
connection_retry_strategy,
periodic_checks,
pubsub_subscriptions,
Expand Down
3 changes: 2 additions & 1 deletion glide-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message AuthenticationInfo {

enum ProtocolVersion {
RESP3 = 0;
RESP2 = 1;
RESP2 = 1;
}

message PeriodicChecksManualInterval {
Expand Down Expand Up @@ -71,6 +71,7 @@ message ConnectionRequest {
PubSubSubscriptions pubsub_subscriptions = 13;
uint32 inflight_requests_limit = 14;
string client_az = 15;
uint32 connection_timeout = 16;
}

message ConnectionRetryStrategy {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import lombok.Getter;
import lombok.experimental.SuperBuilder;

@Getter
@SuperBuilder
public abstract class AdvancedBaseClientConfiguration {

/**
* The duration in milliseconds that the client will wait for a connection to be established. If
* the connection attempt does not complete within this time frame, a connection timeout error
* will occur. If not set, a default value will be used.
*/
private final Integer connectionTimeout;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

@Getter
@SuperBuilder
@ToString
public class AdvancedGlideClientConfiguration extends AdvancedBaseClientConfiguration {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

@Getter
@SuperBuilder
@ToString
public class AdvancedGlideClusterClientConfiguration extends AdvancedBaseClientConfiguration {}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ public class GlideClientConfiguration extends BaseClientConfiguration {

/** Subscription configuration for the current client. */
private final StandaloneSubscriptionConfiguration subscriptionConfiguration;

/** */
private final AdvancedGlideClientConfiguration advancedConfiguration;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ public class GlideClusterClientConfiguration extends BaseClientConfiguration {

/** Subscription configuration for the current client. */
private final ClusterSubscriptionConfiguration subscriptionConfiguration;

/** */
private final AdvancedGlideClusterClientConfiguration advancedConfiguration;
}
26 changes: 25 additions & 1 deletion java/client/src/main/java/glide/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import connection_request.ConnectionRequestOuterClass.PubSubChannelsOrPatterns;
import connection_request.ConnectionRequestOuterClass.PubSubSubscriptions;
import connection_request.ConnectionRequestOuterClass.TlsMode;
import glide.api.models.configuration.AdvancedBaseClientConfiguration;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
Expand Down Expand Up @@ -69,7 +70,8 @@ private Response exceptionHandler(Throwable e) {
* @param configuration Connection Request Configuration
* @return ConnectionRequest protobuf message
*/
private ConnectionRequest createConnectionRequest(BaseClientConfiguration configuration) {
private ConnectionRequest createConnectionRequest(
BaseClientConfiguration configuration) { // shoham
if (configuration instanceof GlideClusterClientConfiguration) {
return setupConnectionRequestBuilderGlideClusterClient(
(GlideClusterClientConfiguration) configuration)
Expand Down Expand Up @@ -171,6 +173,22 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClient(
connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build());
}

if (configuration.getAdvancedConfiguration() != null) {
connectionRequestBuilder =
setupConnectionRequestBuilderAdvancedBaseConfiguration(
connectionRequestBuilder, configuration.getAdvancedConfiguration());
}

return connectionRequestBuilder;
}

private ConnectionRequest.Builder setupConnectionRequestBuilderAdvancedBaseConfiguration(
ConnectionRequest.Builder connectionRequestBuilder,
AdvancedBaseClientConfiguration configuration) {
if (configuration.getConnectionTimeout() != null) {
connectionRequestBuilder.setConnectionTimeout(configuration.getConnectionTimeout());
}

return connectionRequestBuilder;
}

Expand Down Expand Up @@ -199,6 +217,12 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClusterClien
connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build());
}

if (configuration.getAdvancedConfiguration() != null) {
connectionRequestBuilder =
setupConnectionRequestBuilderAdvancedBaseConfiguration(
connectionRequestBuilder, configuration.getAdvancedConfiguration());
}

return connectionRequestBuilder;
}

Expand Down
Loading

0 comments on commit 7ec079b

Please sign in to comment.