diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 6ac3f40bcf..bd32ad67fc 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions { /// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'. /// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property. pub discover_az: bool, + /// Connection timeout duration. + /// + /// This optional field sets the maximum duration to wait when attempting to establish + /// a connection. If `None`, the connection will use a default timeout. + pub connection_timeout: Option, } /// To enable async support you need to enable the feature: `tokio-comp` diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs index 4f9b3f0d4e..5e51bdb156 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs @@ -193,6 +193,7 @@ where push_sender: None, disconnect_notifier, discover_az, + connection_timeout: None, }, ) .await diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3726d7a674..8c5e6b0f35 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1162,6 +1162,7 @@ where push_sender, disconnect_notifier, discover_az, + connection_timeout: None, }; let connections = Self::create_initial_connections( diff --git a/glide-core/redis-rs/redis/src/cluster_routing.rs b/glide-core/redis-rs/redis/src/cluster_routing.rs index eab3bf398a..2d7cc2c403 100644 --- a/glide-core/redis-rs/redis/src/cluster_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_routing.rs @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy { | b"FUNCTION STATS" => RouteBy::AllNodes, b"DBSIZE" + | b"DEBUG" | b"FLUSHALL" | b"FLUSHDB" | b"FT._ALIASLIST" @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy { | b"COMMAND LIST" | b"COMMAND" | b"CONFIG GET" - | b"DEBUG" | b"ECHO" | b"FUNCTION LIST" | b"LASTSAVE" diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index cfe8d6dc05..bab2110e39 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -32,7 +32,7 @@ pub const DEFAULT_RETRIES: u32 = 3; pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250); pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250); pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60); -pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250); +pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250); pub const FINISHED_SCAN_CURSOR: &str = "finished"; /// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory: @@ -584,8 +584,9 @@ async fn create_cluster_client( Some(PeriodicCheck::ManualInterval(interval)) => Some(interval), None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL), }; + let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT); let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes) - .connection_timeout(INTERNAL_CONNECTION_TIMEOUT) + .connection_timeout(connection_timeout) .retries(DEFAULT_RETRIES); let read_from_strategy = request.read_from.unwrap_or_default(); builder = builder.read_from(match read_from_strategy { @@ -683,6 +684,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { "\nStandalone mode" }; let request_timeout = format_optional_value("Request timeout", request.request_timeout); + let connection_timeout = + format_optional_value("Connection timeout", request.connection_timeout); let database_id = format!("\ndatabase ID: {}", request.database_id); let rfr_strategy = request .read_from @@ -739,7 +742,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { ); format!( - "\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}", + "\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}", ) } diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index 39a4c1db62..52928bb759 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -72,7 +72,11 @@ async fn get_multiplexed_connection( connection_options: &GlideConnectionOptions, ) -> RedisResult { run_with_timeout( - Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT), + Some( + connection_options + .connection_timeout + .unwrap_or(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT), + ), client.get_multiplexed_async_connection(connection_options.clone()), ) .await @@ -114,6 +118,7 @@ async fn create_connection( retry_strategy: RetryStrategy, push_sender: Option>, discover_az: bool, + connection_timeout: Duration, ) -> Result { let client = &connection_backend.connection_info; let connection_options = GlideConnectionOptions { @@ -122,6 +127,7 @@ async fn create_connection( TokioDisconnectNotifier::new(), )), discover_az, + connection_timeout: Some(connection_timeout), }; let action = || async { get_multiplexed_connection(client, &connection_options) @@ -207,6 +213,7 @@ impl ReconnectingConnection { tls_mode: TlsMode, push_sender: Option>, discover_az: bool, + connection_timeout: Duration, ) -> Result { log_debug( "connection creation", @@ -219,7 +226,14 @@ impl ReconnectingConnection { connection_available_signal: ManualResetEvent::new(true), client_dropped_flagged: AtomicBool::new(false), }; - create_connection(backend, connection_retry_strategy, push_sender, discover_az).await + create_connection( + backend, + connection_retry_strategy, + push_sender, + discover_az, + connection_timeout, + ) + .await } pub(crate) fn node_address(&self) -> String { diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index c5e69fd6dd..86907668e2 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -3,6 +3,7 @@ */ use super::get_redis_connection_info; use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection}; +use super::{get_redis_connection_info, to_duration, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT}; use super::{ConnectionRequest, NodeAddress, TlsMode}; use crate::client::types::ReadFrom as ClientReadFrom; use crate::retry_strategies::RetryStrategy; @@ -16,6 +17,7 @@ use redis::{PushInfo, RedisError, RedisResult, Value}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use telemetrylib::Telemetry; use tokio::sync::mpsc; use tokio::task; @@ -131,6 +133,11 @@ impl StandaloneClient { Some(ClientReadFrom::AZAffinity(_)) ); + let connection_timeout = to_duration( + connection_request.connection_timeout, + DEFAULT_CONNECTION_ATTEMPT_TIMEOUT, + ); + let mut stream = stream::iter(connection_request.addresses.iter()) .map(|address| async { get_connection_and_replication_info( @@ -144,6 +151,7 @@ impl StandaloneClient { tls_mode.unwrap_or(TlsMode::NoTls), &push_sender, discover_az, + connection_timeout, ) .await .map_err(|err| (format!("{}:{}", address.host, address.port), err)) @@ -553,6 +561,7 @@ async fn get_connection_and_replication_info( tls_mode: TlsMode, push_sender: &Option>, discover_az: bool, + connection_timeout: Duration, ) -> Result<(ReconnectingConnection, Value), (ReconnectingConnection, RedisError)> { let result = ReconnectingConnection::new( address, @@ -561,6 +570,7 @@ async fn get_connection_and_replication_info( tls_mode, push_sender.clone(), discover_az, + connection_timeout, ) .await; let reconnecting_connection = match result { diff --git a/glide-core/src/client/types.rs b/glide-core/src/client/types.rs index 0c7680b3a6..f7791d0122 100644 --- a/glide-core/src/client/types.rs +++ b/glide-core/src/client/types.rs @@ -22,6 +22,7 @@ pub struct ConnectionRequest { pub addresses: Vec, pub cluster_mode_enabled: bool, pub request_timeout: Option, + pub connection_timeout: Option, pub connection_retry_strategy: Option, pub periodic_checks: Option, pub pubsub_subscriptions: Option, @@ -149,6 +150,7 @@ impl From for ConnectionRequest { .collect(); let cluster_mode_enabled = value.cluster_mode_enabled; let request_timeout = none_if_zero(value.request_timeout); + let connection_timeout = none_if_zero(value.connection_timeout); let connection_retry_strategy = value .connection_retry_strategy @@ -216,6 +218,7 @@ impl From for ConnectionRequest { addresses, cluster_mode_enabled, request_timeout, + connection_timeout, connection_retry_strategy, periodic_checks, pubsub_subscriptions, diff --git a/glide-core/src/protobuf/connection_request.proto b/glide-core/src/protobuf/connection_request.proto index 5f4db44b00..8e33b39da3 100644 --- a/glide-core/src/protobuf/connection_request.proto +++ b/glide-core/src/protobuf/connection_request.proto @@ -26,7 +26,7 @@ message AuthenticationInfo { enum ProtocolVersion { RESP3 = 0; - RESP2 = 1; + RESP2 = 1; } message PeriodicChecksManualInterval { @@ -71,6 +71,7 @@ message ConnectionRequest { PubSubSubscriptions pubsub_subscriptions = 13; uint32 inflight_requests_limit = 14; string client_az = 15; + uint32 connection_timeout = 16; } message ConnectionRetryStrategy { diff --git a/java/client/src/main/java/glide/api/models/configuration/AdvancedBaseClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/AdvancedBaseClientConfiguration.java new file mode 100644 index 0000000000..ced294fdd4 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/AdvancedBaseClientConfiguration.java @@ -0,0 +1,17 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +@Getter +@SuperBuilder +public abstract class AdvancedBaseClientConfiguration { + + /** + * The duration in milliseconds that the client will wait for a connection to be established. If + * the connection attempt does not complete within this time frame, a connection timeout error + * will occur. If not set, a default value will be used. + */ + private final Integer connectionTimeout; +} diff --git a/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClientConfiguration.java new file mode 100644 index 0000000000..1281fa692a --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClientConfiguration.java @@ -0,0 +1,11 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +@Getter +@SuperBuilder +@ToString +public class AdvancedGlideClientConfiguration extends AdvancedBaseClientConfiguration {} diff --git a/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClusterClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClusterClientConfiguration.java new file mode 100644 index 0000000000..9d7748b271 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClusterClientConfiguration.java @@ -0,0 +1,11 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +@Getter +@SuperBuilder +@ToString +public class AdvancedGlideClusterClientConfiguration extends AdvancedBaseClientConfiguration {} diff --git a/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java index 83d84e7c1f..8f04c6b680 100644 --- a/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java @@ -39,4 +39,7 @@ public class GlideClientConfiguration extends BaseClientConfiguration { /** Subscription configuration for the current client. */ private final StandaloneSubscriptionConfiguration subscriptionConfiguration; + + /** */ + private final AdvancedGlideClientConfiguration advancedConfiguration; } diff --git a/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java index b1d1c7590c..937a8a4ea3 100644 --- a/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java @@ -32,4 +32,7 @@ public class GlideClusterClientConfiguration extends BaseClientConfiguration { /** Subscription configuration for the current client. */ private final ClusterSubscriptionConfiguration subscriptionConfiguration; + + /** */ + private final AdvancedGlideClusterClientConfiguration advancedConfiguration; } diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 99b383a9ed..f520a86f86 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -8,6 +8,7 @@ import connection_request.ConnectionRequestOuterClass.PubSubChannelsOrPatterns; import connection_request.ConnectionRequestOuterClass.PubSubSubscriptions; import connection_request.ConnectionRequestOuterClass.TlsMode; +import glide.api.models.configuration.AdvancedBaseClientConfiguration; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.configuration.GlideClientConfiguration; import glide.api.models.configuration.GlideClusterClientConfiguration; @@ -69,7 +70,8 @@ private Response exceptionHandler(Throwable e) { * @param configuration Connection Request Configuration * @return ConnectionRequest protobuf message */ - private ConnectionRequest createConnectionRequest(BaseClientConfiguration configuration) { + private ConnectionRequest createConnectionRequest( + BaseClientConfiguration configuration) { // shoham if (configuration instanceof GlideClusterClientConfiguration) { return setupConnectionRequestBuilderGlideClusterClient( (GlideClusterClientConfiguration) configuration) @@ -171,6 +173,22 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClient( connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build()); } + if (configuration.getAdvancedConfiguration() != null) { + connectionRequestBuilder = + setupConnectionRequestBuilderAdvancedBaseConfiguration( + connectionRequestBuilder, configuration.getAdvancedConfiguration()); + } + + return connectionRequestBuilder; + } + + private ConnectionRequest.Builder setupConnectionRequestBuilderAdvancedBaseConfiguration( + ConnectionRequest.Builder connectionRequestBuilder, + AdvancedBaseClientConfiguration configuration) { + if (configuration.getConnectionTimeout() != null) { + connectionRequestBuilder.setConnectionTimeout(configuration.getConnectionTimeout()); + } + return connectionRequestBuilder; } @@ -199,6 +217,12 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClusterClien connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build()); } + if (configuration.getAdvancedConfiguration() != null) { + connectionRequestBuilder = + setupConnectionRequestBuilderAdvancedBaseConfiguration( + connectionRequestBuilder, configuration.getAdvancedConfiguration()); + } + return connectionRequestBuilder; } diff --git a/java/integTest/src/test/java/glide/ConnectionTests.java b/java/integTest/src/test/java/glide/ConnectionTests.java index 2aec2e4e6b..1c45043920 100644 --- a/java/integTest/src/test/java/glide/ConnectionTests.java +++ b/java/integTest/src/test/java/glide/ConnectionTests.java @@ -10,19 +10,31 @@ import static glide.api.models.configuration.RequestRoutingConfiguration.SlotType.PRIMARY; import static glide.api.models.configuration.RequestRoutingConfiguration.SlotType.REPLICA; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import glide.api.BaseClient; import glide.api.GlideClient; import glide.api.GlideClusterClient; import glide.api.models.ClusterValue; import glide.api.models.commands.InfoOptions; +import glide.api.models.configuration.AdvancedGlideClientConfiguration; +import glide.api.models.configuration.AdvancedGlideClusterClientConfiguration; +import glide.api.models.configuration.BackoffStrategy; import glide.api.models.configuration.ReadFrom; import glide.api.models.configuration.RequestRoutingConfiguration; +import glide.api.models.exceptions.ClosingException; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; @Timeout(10) // seconds public class ConnectionTests { @@ -52,6 +64,35 @@ public GlideClusterClient createAzTestClient(String az) { .get(); } + @SneakyThrows + public BaseClient createConnectionTimeoutClient( + Boolean clusterMode, + int connectionTimeout, + int requestTimeout, + BackoffStrategy backoffStrategy) { + if (clusterMode) { + var advancedConfiguration = + AdvancedGlideClusterClientConfiguration.builder() + .connectionTimeout(connectionTimeout) + .build(); + return GlideClusterClient.createClient( + commonClusterClientConfig() + .advancedConfiguration(advancedConfiguration) + .requestTimeout(requestTimeout) + .build()) + .get(); + } + var advancedConfiguration = + AdvancedGlideClientConfiguration.builder().connectionTimeout(connectionTimeout).build(); + return GlideClient.createClient( + commonClientConfig() + .advancedConfiguration(advancedConfiguration) + .requestTimeout(requestTimeout) + .reconnectStrategy(backoffStrategy) + .build()) + .get(); + } + /** * Test that the client with AZ affinity strategy routes in a round-robin manner to all replicas * within the specified AZ. @@ -202,4 +243,78 @@ public void test_az_affinity_non_existing_az() { assertEquals(4, matchingEntries); azTestClient.close(); } + + @SneakyThrows + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test_connection_timeout(boolean clusterMode) { + assumeTrue( + SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), + "DEBUG command only allowed from ver 7.0.0"); + + var backoffStrategy = + BackoffStrategy.builder().exponentBase(2).factor(100).numOfRetries(1).build(); + var client = createConnectionTimeoutClient(clusterMode, 250, 20000, backoffStrategy); + + // Run a long-running DEBUG SLEEP command to simulate a blocking operation + CompletableFuture runDebugSleep = + CompletableFuture.runAsync( + () -> { + try { + if (client instanceof GlideClusterClient) { + ((GlideClusterClient) client) + .customCommand(new String[] {"DEBUG", "sleep", "7"}, ALL_NODES) + .get(); + } else if (client instanceof GlideClient) { + ((GlideClient) client).customCommand(new String[] {"DEBUG", "sleep", "7"}).get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error during DEBUG SLEEP command", e); + } + }); + + // Test case 1: Client connection failure due to timeout + CompletableFuture failToConnect = + CompletableFuture.runAsync( + () -> { + try { + // Attempt to connect with a small timeout + Thread.sleep(1000); + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> + createConnectionTimeoutClient(clusterMode, 100, 250, backoffStrategy)); + assertInstanceOf(ClosingException.class, executionException.getCause()); + assertTrue(executionException.getMessage().toLowerCase().contains("timed out")); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted", e); + } + }); + + // Test case 2: Client connection success + CompletableFuture connectToClient = + CompletableFuture.runAsync( + () -> { + try { + // Create a second client with a connection timeout of 10 seconds + Thread.sleep(1000); // Wait to ensure the debug sleep command is running + var timeoutClient = + createConnectionTimeoutClient(clusterMode, 10000, 250, backoffStrategy); + assertEquals(timeoutClient.set("key", "value").get(), "OK"); + timeoutClient.close(); + } catch (Exception e) { + throw new RuntimeException("Error during successful connection attempt", e); + } + }); + + // Run all the futures concurrently + CompletableFuture.allOf(runDebugSleep, failToConnect, connectToClient).join(); + + // Clean up the main client + if (client != null) { + client.close(); + } + } } diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index 8f6ceac47b..f2ecc3da4e 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -111,6 +111,8 @@ TTransaction, ) from glide.config import ( + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, BackoffStrategy, GlideClientConfiguration, GlideClusterClientConfiguration, @@ -176,6 +178,8 @@ "TGlideClient", "TTransaction", # Config + "AdvancedGlideClientConfiguration", + "AdvancedGlideClusterClientConfiguration", "GlideClientConfiguration", "GlideClusterClientConfiguration", "BackoffStrategy", diff --git a/python/python/glide/config.py b/python/python/glide/config.py index b33c037cbf..d519bbcd52 100644 --- a/python/python/glide/config.py +++ b/python/python/glide/config.py @@ -129,6 +129,29 @@ class PeriodicChecksStatus(Enum): """ +class AdvancedBaseClientConfiguration: + """ + Represents the advanced configuration settings for a base Glide client. + + Args: + connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete. + This applies both during initial client creation and any reconnections that may occur during request processing. + **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. + If the client cannot establish a connection within the specified duration, a timeout error will occur. + If not set, a default value will be used. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + self.connection_timeout = connection_timeout + + def _create_a_protobuf_conn_request( + self, request: ConnectionRequest + ) -> ConnectionRequest: + if self.connection_timeout: + request.connection_timeout = self.connection_timeout + return request + + class BaseClientConfiguration: def __init__( self, @@ -141,6 +164,7 @@ def __init__( protocol: ProtocolVersion = ProtocolVersion.RESP3, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedBaseClientConfiguration] = None, ): """ Represents the configuration settings for a Glide client. @@ -168,7 +192,8 @@ def __init__( inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. If not set, a default value will be used. - + client_az (Optional[str]): Availability Zone of the client. + If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. """ self.addresses = addresses self.use_tls = use_tls @@ -179,6 +204,7 @@ def __init__( self.protocol = protocol self.inflight_requests_limit = inflight_requests_limit self.client_az = client_az + self.advanced_config = advanced_config if read_from == ReadFrom.AZ_AFFINITY and not client_az: raise ValueError( @@ -218,6 +244,8 @@ def _create_a_protobuf_conn_request( request.inflight_requests_limit = self.inflight_requests_limit if self.client_az: request.client_az = self.client_az + if self.advanced_config: + self.advanced_config._create_a_protobuf_conn_request(request) return request @@ -230,6 +258,15 @@ def _get_pubsub_callback_and_context( return None, None +class AdvancedGlideClientConfiguration(AdvancedBaseClientConfiguration): + """ + Represents the advanced configuration settings for a Standalone Glide client. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + super().__init__(connection_timeout) + + class GlideClientConfiguration(BaseClientConfiguration): """ Represents the configuration settings for a Standalone Glide client. @@ -261,7 +298,9 @@ class GlideClientConfiguration(BaseClientConfiguration): inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. If not set, a default value will be used. - + client_az (Optional[str]): Availability Zone of the client. + If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + advanced_config (Optional[AdvancedGlideClientConfiguration]) : Advanced configuration, see `AdvancedGlideClientConfiguration`. """ class PubSubChannelModes(IntEnum): @@ -308,6 +347,7 @@ def __init__( pubsub_subscriptions: Optional[PubSubSubscriptions] = None, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedGlideClientConfiguration] = None, ): super().__init__( addresses=addresses, @@ -319,6 +359,7 @@ def __init__( protocol=protocol, inflight_requests_limit=inflight_requests_limit, client_az=client_az, + advanced_config=advanced_config, ) self.reconnect_strategy = reconnect_strategy self.database_id = database_id @@ -375,6 +416,15 @@ def _get_pubsub_callback_and_context( return None, None +class AdvancedGlideClusterClientConfiguration(AdvancedBaseClientConfiguration): + """ + Represents the advanced configuration settings for a Glide Cluster client. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + super().__init__(connection_timeout) + + class GlideClusterClientConfiguration(BaseClientConfiguration): """ Represents the configuration settings for a Cluster Glide client. @@ -404,7 +454,9 @@ class GlideClusterClientConfiguration(BaseClientConfiguration): inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. If not set, a default value will be used. - + client_az (Optional[str]): Availability Zone of the client. + If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + advanced_config (Optional[AdvancedGlideClusterClientConfiguration]) : Advanced configuration, see `AdvancedGlideClusterClientConfiguration`. Notes: @@ -459,6 +511,7 @@ def __init__( pubsub_subscriptions: Optional[PubSubSubscriptions] = None, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedGlideClusterClientConfiguration] = None, ): super().__init__( addresses=addresses, @@ -470,6 +523,7 @@ def __init__( protocol=protocol, inflight_requests_limit=inflight_requests_limit, client_az=client_az, + advanced_config=advanced_config, ) self.periodic_checks = periodic_checks self.pubsub_subscriptions = pubsub_subscriptions diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index 15ff15cf4e..c870913cfc 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -5,6 +5,9 @@ import pytest from glide.config import ( + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, + BackoffStrategy, GlideClientConfiguration, GlideClusterClientConfiguration, NodeAddress, @@ -242,7 +245,8 @@ async def create_client( addresses: Optional[List[NodeAddress]] = None, client_name: Optional[str] = None, protocol: ProtocolVersion = ProtocolVersion.RESP3, - timeout: Optional[int] = 1000, + request_timeout: Optional[int] = 1000, + connection_timeout: Optional[int] = 1000, cluster_mode_pubsub: Optional[ GlideClusterClientConfiguration.PubSubSubscriptions ] = None, @@ -252,6 +256,7 @@ async def create_client( inflight_requests_limit: Optional[int] = None, read_from: ReadFrom = ReadFrom.PRIMARY, client_az: Optional[str] = None, + reconnect_strategy: Optional[BackoffStrategy] = None, ) -> Union[GlideClient, GlideClusterClient]: # Create async socket client use_tls = request.config.getoption("--tls") @@ -266,11 +271,12 @@ async def create_client( credentials=credentials, client_name=client_name, protocol=protocol, - request_timeout=timeout, + request_timeout=request_timeout, pubsub_subscriptions=cluster_mode_pubsub, inflight_requests_limit=inflight_requests_limit, read_from=read_from, client_az=client_az, + advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout), ) return await GlideClusterClient.create(cluster_config) else: @@ -284,11 +290,13 @@ async def create_client( database_id=database_id, client_name=client_name, protocol=protocol, - request_timeout=timeout, + request_timeout=request_timeout, pubsub_subscriptions=standalone_mode_pubsub, inflight_requests_limit=inflight_requests_limit, read_from=read_from, client_az=client_az, + advanced_config=AdvancedGlideClientConfiguration(connection_timeout), + reconnect_strategy=reconnect_strategy, ) return await GlideClient.create(config) @@ -341,7 +349,7 @@ async def test_teardown(request, cluster_mode: bool, protocol: ProtocolVersion): try: # Try connecting without credentials client = await create_client( - request, cluster_mode, protocol=protocol, timeout=2000 + request, cluster_mode, protocol=protocol, request_timeout=2000 ) await client.custom_command(["FLUSHALL"]) await client.close() @@ -354,7 +362,7 @@ async def test_teardown(request, cluster_mode: bool, protocol: ProtocolVersion): request, cluster_mode, protocol=protocol, - timeout=2000, + request_timeout=2000, credentials=credentials, ) try: diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index b32aa6936d..df4afc4222 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -72,6 +72,7 @@ ) from glide.async_commands.transaction import ClusterTransaction, Transaction from glide.config import ( + BackoffStrategy, GlideClientConfiguration, GlideClusterClientConfiguration, ProtocolVersion, @@ -128,7 +129,7 @@ async def test_register_client_name_and_version(self, glide_client: TGlideClient @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_send_and_receive_large_values(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**25 # 33mb key = "0" * length @@ -302,6 +303,94 @@ async def test_statistics(self, glide_client: TGlideClient): assert "total_clients" in stats assert len(stats) == 2 + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_connection_timeout( + self, + request, + cluster_mode: bool, + protocol: ProtocolVersion, + ): + + client = await create_client( + request, + cluster_mode, + protocol=protocol, + request_timeout=2000, + connection_timeout=2000, + ) + assert isinstance(client, (GlideClient, GlideClusterClient)) + + assert await client.set("key", "value") == "OK" + + await client.close() + + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_connection_timeout_when_client_is_blocked( + self, + request, + cluster_mode: bool, + protocol: ProtocolVersion, + ): + min_version = "7.0.0" + + client = await create_client( + request, + cluster_mode, + protocol=protocol, + request_timeout=20000, # 20 seconds timeout + ) + + if await check_if_server_version_lt(client, min_version): + # Debug is only enabled after 7.0.0 + return pytest.mark.skip(reason=f"Valkey version required >= {min_version}") + + async def run_debug_sleep(): + """ + Run a long-running DEBUG SLEEP command. + """ + command = ["DEBUG", "sleep", "7"] + if isinstance(client, GlideClusterClient): + await client.custom_command(command, AllNodes()) + else: + await client.custom_command(command) + + async def fail_to_connect_to_client(): + # try to connect with a small timeout connection + await asyncio.sleep(1) + with pytest.raises(ClosingError) as e: + await create_client( + request, + cluster_mode, + protocol=protocol, + connection_timeout=100, # 100 ms + reconnect_strategy=BackoffStrategy(1, 100, 2), + ) + assert "timed out" in str(e) + + async def connect_to_client(): + # Create a second client with a connection timeout of 7 seconds + await asyncio.sleep(1) + timeout_client = await create_client( + request, + cluster_mode, + protocol=protocol, + connection_timeout=10000, # 10-second connection timeout + reconnect_strategy=BackoffStrategy(1, 100, 2), + ) + + # Ensure the second client can connect and perform a simple operation + assert await timeout_client.set("key", "value") == "OK" + await timeout_client.close() + + # Run tests + await asyncio.gather(run_debug_sleep(), fail_to_connect_to_client()) + await asyncio.gather(run_debug_sleep(), connect_to_client()) + + # Clean up the main client + await client.close() + @pytest.mark.asyncio class TestCommands: @@ -5412,7 +5501,10 @@ async def test_xread_edge_cases_and_failures( ) test_client = await create_client( - request=request, protocol=protocol, cluster_mode=cluster_mode, timeout=900 + request=request, + protocol=protocol, + cluster_mode=cluster_mode, + request_timeout=900, ) # ensure command doesn't time out even if timeout > request timeout assert ( @@ -5805,7 +5897,10 @@ async def test_xreadgroup_edge_cases_and_failures( ) test_client = await create_client( - request=request, protocol=protocol, cluster_mode=cluster_mode, timeout=900 + request=request, + protocol=protocol, + cluster_mode=cluster_mode, + request_timeout=900, ) timeout_key = f"{{testKey}}:{get_random_string(10)}" timeout_group_name = get_random_string(10) @@ -8325,11 +8420,11 @@ async def test_function_stats_running_script( # create a second client to run fcall test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) test_client2 = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) async def endless_fcall_route_call(): @@ -8454,7 +8549,7 @@ async def test_function_kill_no_write( # create a second client to run fcall test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000 ) async def endless_fcall_route_call(): @@ -8509,7 +8604,7 @@ async def test_function_kill_write_is_unkillable( # create a second client to run fcall - and give it a long timeout test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000 ) # call fcall to run the function loaded function @@ -10357,7 +10452,7 @@ async def test_script_binary(self, glide_client: TGlideClient): @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_script_large_keys_no_args(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**13 # 8kb key = "0" * length @@ -10369,7 +10464,7 @@ async def test_script_large_keys_no_args(self, request, cluster_mode, protocol): @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_script_large_args_no_keys(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**12 # 4kb arg1 = "0" * length @@ -10385,7 +10480,7 @@ async def test_script_large_args_no_keys(self, request, cluster_mode, protocol): @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_script_large_keys_and_args(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**12 # 4kb key = "0" * length @@ -10469,7 +10564,7 @@ async def test_script_kill_route( # Create a second client to run the script test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) await script_kill_tests(glide_client, test_client, route) @@ -10485,7 +10580,7 @@ async def test_script_kill_no_route( ): # Create a second client to run the script test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) await script_kill_tests(glide_client, test_client) @@ -10497,12 +10592,12 @@ async def test_script_kill_unkillable( ): # Create a second client to run the script test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) # Create a second client to kill the script test_client2 = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000 ) # Add test for script_kill with writing script diff --git a/python/python/tests/test_config.py b/python/python/tests/test_config.py index 3b22adb09c..2476d8ec0f 100644 --- a/python/python/tests/test_config.py +++ b/python/python/tests/test_config.py @@ -1,16 +1,22 @@ # Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 from glide.config import ( + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, BaseClientConfiguration, + GlideClientConfiguration, GlideClusterClientConfiguration, NodeAddress, PeriodicChecksManualInterval, PeriodicChecksStatus, + ProtocolVersion, ReadFrom, ) +from glide.glide_client import GlideClient, GlideClusterClient from glide.protobuf.connection_request_pb2 import ConnectionRequest from glide.protobuf.connection_request_pb2 import ReadFrom as ProtobufReadFrom from glide.protobuf.connection_request_pb2 import TlsMode +from tests.conftest import create_client def test_default_client_config(): @@ -67,3 +73,24 @@ def test_convert_config_with_azaffinity_to_protobuf(): assert request.tls_mode is TlsMode.SecureTls assert request.read_from == ProtobufReadFrom.AZAffinity assert request.client_az == az + + +def test_connection_timeout_in_protobuf_request(): + connection_timeout = 5000 # in milliseconds + config = GlideClientConfiguration( + [NodeAddress("127.0.0.1")], + advanced_config=AdvancedGlideClientConfiguration(connection_timeout), + ) + request = config._create_a_protobuf_conn_request() + + assert isinstance(request, ConnectionRequest) + assert request.connection_timeout == connection_timeout + + config = GlideClusterClientConfiguration( + [NodeAddress("127.0.0.1")], + advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout), + ) + request = config._create_a_protobuf_conn_request(cluster_mode=True) + + assert isinstance(request, ConnectionRequest) + assert request.connection_timeout == connection_timeout diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index 6069104ed7..60baf383b2 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -66,7 +66,7 @@ async def create_two_clients_with_pubsub( cluster_mode_pubsub=cluster_mode_pubsub1, standalone_mode_pubsub=standalone_mode_pubsub1, protocol=protocol, - timeout=timeout, + request_timeout=timeout, ) try: client2 = await create_client( @@ -75,7 +75,7 @@ async def create_two_clients_with_pubsub( cluster_mode_pubsub=cluster_mode_pubsub2, standalone_mode_pubsub=standalone_mode_pubsub2, protocol=protocol, - timeout=timeout, + request_timeout=timeout, ) except Exception as e: await client1.close() diff --git a/python/python/tests/test_read_from_strategy.py b/python/python/tests/test_read_from_strategy.py index 03f3f8e9ae..cddb1e6f10 100644 --- a/python/python/tests/test_read_from_strategy.py +++ b/python/python/tests/test_read_from_strategy.py @@ -46,7 +46,7 @@ async def test_routing_with_az_affinity_strategy_to_1_replica( cluster_mode, # addresses=multiple_replicas_cluster.nodes_addr, protocol=protocol, - timeout=2000, + request_timeout=2000, ) # Reset the availability zone for all nodes @@ -67,7 +67,7 @@ async def test_routing_with_az_affinity_strategy_to_1_replica( cluster_mode, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, client_az=az, ) @@ -113,7 +113,7 @@ async def test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_repli cluster_mode, # addresses=multiple_replicas_cluster.nodes_addr, protocol=protocol, - timeout=2000, + request_timeout=2000, ) assert await client_for_config_set.config_resetstat() == OK await client_for_config_set.custom_command( @@ -125,7 +125,7 @@ async def test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_repli cluster_mode, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, client_az=az, ) azs = await client_for_testing_az.custom_command( @@ -181,7 +181,7 @@ async def test_az_affinity_non_existing_az( # addresses=multiple_replicas_cluster.nodes_addr, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, client_az="non-existing-az", ) assert await client_for_testing_az.config_resetstat() == OK @@ -217,5 +217,5 @@ async def test_az_affinity_requires_client_az( cluster_mode=cluster_mode, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, ) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index ccdb309f58..e284bb1100 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -972,7 +972,7 @@ async def test_can_return_null_on_watch_transaction_failures( @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_transaction_large_values(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**25 # 33mb key = "0" * length @@ -1033,10 +1033,7 @@ async def test_standalone_transaction(self, glide_client: GlideClient): assert result[5:13] == [2, 2, 2, [b"Bob", b"Alice"], 2, OK, None, 0] assert result[13:] == expected - @pytest.mark.filterwarnings( - action="ignore", message="The test " - ) - def test_transaction_clear(self): + async def test_transaction_clear(self): transaction = Transaction() transaction.info() transaction.select(1) diff --git a/utils/cluster_manager.py b/utils/cluster_manager.py index dc196bcd4f..a167da3464 100644 --- a/utils/cluster_manager.py +++ b/utils/cluster_manager.py @@ -341,10 +341,22 @@ def get_server_command() -> str: except Exception as e: logging.error(f"Error checking {server}: {e}") raise Exception( - "Neither valkey-server nor redis-server found in the system." + "Neither valkey-server nor redis-server found in the system.") + + def get_server_version(server_name): + result = subprocess.run( + [server_name, "--version"], capture_output=True, text=True + ) + version_output = result.stdout + version_match = re.search( + r"(?:Redis|Valkey) server v=(\d+\.\d+\.\d+)", version_output, re.IGNORECASE ) + if version_match: + return tuple(map(int, version_match.group(1).split("."))) + raise Exception("Unable to determine server version.") server_name = get_server_command() + server_version = get_server_version(server_name) logfile = f"{node_folder}/redis.log" # Define command arguments cmd_args = [ @@ -360,6 +372,8 @@ def get_server_command() -> str: "--logfile", logfile, ] + if server_version >= (7, 0, 0): + cmd_args.extend(["--enable-debug-command", "yes"]) if load_module: if len(load_module) == 0: raise ValueError(