Skip to content

Commit

Permalink
redis-rs tests
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Dec 25, 2024
1 parent 9741f63 commit ca542b5
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 39 deletions.
41 changes: 41 additions & 0 deletions glide-core/redis-rs/redis/tests/support/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,47 @@ impl TestClusterContext {
}
}

pub fn new_with_cluster_client__with_timeout_builder<F>(
nodes: u16,
replicas: u16,
initializer: F,
mtls_enabled: bool,
request_timeout: Duration,
connection_timeout: Duration,
) -> TestClusterContext
where
F: FnOnce(redis::cluster::ClusterClientBuilder) -> redis::cluster::ClusterClientBuilder,
{
let cluster = RedisCluster::new(nodes, replicas);
let initial_nodes: Vec<ConnectionInfo> = cluster
.iter_servers()
.map(RedisServer::connection_info)
.collect();
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes.clone())
.use_protocol(use_protocol())
.response_timeout(request_timeout)
.connection_timeout(connection_timeout);

#[cfg(feature = "tls-rustls")]
if mtls_enabled {
if let Some(tls_file_paths) = &cluster.tls_paths {
builder = builder.certs(load_certs_from_file(tls_file_paths));
}
}

builder = initializer(builder);

let client = builder.build().unwrap();

TestClusterContext {
cluster,
client,
mtls_enabled,
nodes: initial_nodes,
protocol: use_protocol(),
}
}

pub fn connection(&self) -> redis::cluster::ClusterConnection {
self.client.get_connection(None).unwrap()
}
Expand Down
44 changes: 43 additions & 1 deletion glide-core/redis-rs/redis/tests/support/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use rand::Rng;
use redis::{
cluster_routing::{RoutingInfo, SingleNodeRoutingInfo},
cmd, RedisResult, Value,
};
use std::collections::HashMap;
use versions::Versioning;

use super::TestContext;
use super::{TestClusterContext, TestContext};

#[macro_export]
macro_rules! assert_args {
Expand Down Expand Up @@ -33,3 +38,40 @@ pub fn version_greater_or_equal(ctx: &TestContext, version: &str) -> bool {
// Compare server version with the specified version
server_version >= compared_version
}

pub async fn kill_one_node(
cluster: &TestClusterContext,
slot_distribution: Vec<(String, String, String, Vec<Vec<u16>>)>,
) -> RoutingInfo {
let mut cluster_conn = cluster.async_connection(None).await;
let distribution_clone = slot_distribution.clone();
let index_of_random_node = rand::thread_rng().gen_range(0..slot_distribution.len());
let random_node = distribution_clone.get(index_of_random_node).unwrap();
let random_node_route_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: random_node.1.clone(),
port: random_node.2.parse::<u16>().unwrap(),
});
let random_node_id = &random_node.0;
// Create connections to all nodes
for node in &distribution_clone {
if random_node_id == &node.0 {
continue;
}
let node_route = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: node.1.clone(),
port: node.2.parse::<u16>().unwrap(),
});

let mut forget_cmd = cmd("CLUSTER");
forget_cmd.arg("FORGET").arg(random_node_id);
let _: RedisResult<Value> = cluster_conn
.route_command(&forget_cmd, node_route.clone())
.await;
}
let mut shutdown_cmd = cmd("SHUTDOWN");
shutdown_cmd.arg("NOSAVE");
let _: RedisResult<Value> = cluster_conn
.route_command(&shutdown_cmd, random_node_route_info.clone())
.await;
random_node_route_info
}
45 changes: 45 additions & 0 deletions glide-core/redis-rs/redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5183,6 +5183,51 @@ mod cluster_async {
assert_eq!(request_counter.load(Ordering::Relaxed), 1);
}

#[tokio::test]
#[serial_test::serial] // test cluster scan with node fail in the middle
async fn test_connection_timeout() {
let cluster = TestClusterContext::new_with_cluster_client__with_timeout_builder(
3,
0,
|builder| builder.retries(1),
false,
Duration::from_millis(20000),
Duration::from_millis(100),
);
let mut connection = cluster.async_connection(None).await;

// Perform an initial PING to establish the connection
let ping_response: Result<String, redis::RedisError> =
redis::cmd("PING").query_async(&mut connection).await;
assert_eq!(ping_response.unwrap(), "PONG");

// Get the cluster nodes and slots distribution
let cluster_nodes = cluster.get_cluster_nodes().await;
let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes);

// Simulate a node failure
let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await;

// Wait for the cluster to stabilize after node failure
let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await;
match ready {
Ok(_) => {
// Attempt another PING to verify behavior after node failure
let ping_after_failure: Result<String, redis::RedisError> =
redis::cmd("PING").query_async(&mut connection).await;

// Expect an error due to the killed node
assert!(
ping_after_failure.is_err(),
"PING succeeded unexpectedly after node failure"
);
}
Err(e) => {
panic!("Cluster did not stabilize after node failure: {:?}", e);
}
}
}

#[cfg(feature = "tls-rustls")]
mod mtls_test {
use crate::support::mtls_test::create_cluster_client_from_cluster;
Expand Down
38 changes: 0 additions & 38 deletions glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,10 @@ mod support;
#[cfg(test)]
mod test_cluster_scan_async {
use crate::support::*;
use rand::Rng;
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
use redis::{cmd, from_redis_value, ObjectType, RedisResult, ScanStateRC, Value};
use std::time::Duration;

async fn kill_one_node(
cluster: &TestClusterContext,
slot_distribution: Vec<(String, String, String, Vec<Vec<u16>>)>,
) -> RoutingInfo {
let mut cluster_conn = cluster.async_connection(None).await;
let distribution_clone = slot_distribution.clone();
let index_of_random_node = rand::thread_rng().gen_range(0..slot_distribution.len());
let random_node = distribution_clone.get(index_of_random_node).unwrap();
let random_node_route_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: random_node.1.clone(),
port: random_node.2.parse::<u16>().unwrap(),
});
let random_node_id = &random_node.0;
// Create connections to all nodes
for node in &distribution_clone {
if random_node_id == &node.0 {
continue;
}
let node_route = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: node.1.clone(),
port: node.2.parse::<u16>().unwrap(),
});

let mut forget_cmd = cmd("CLUSTER");
forget_cmd.arg("FORGET").arg(random_node_id);
let _: RedisResult<Value> = cluster_conn
.route_command(&forget_cmd, node_route.clone())
.await;
}
let mut shutdown_cmd = cmd("SHUTDOWN");
shutdown_cmd.arg("NOSAVE");
let _: RedisResult<Value> = cluster_conn
.route_command(&shutdown_cmd, random_node_route_info.clone())
.await;
random_node_route_info
}

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan() {
Expand Down

0 comments on commit ca542b5

Please sign in to comment.