Skip to content

Commit

Permalink
redis-core: add az awareness to read strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Adar Ovadia <[email protected]>
  • Loading branch information
Adar Ovadia committed Nov 7, 2024
1 parent c4eb769 commit 5edc891
Show file tree
Hide file tree
Showing 17 changed files with 233 additions and 121 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ jobs:

- name: Run tests
working-directory: ./glide-core
run: cargo test --all-features --release -- --test-threads=1 # TODO remove the concurrency limit after we fix test flakyness.
# TODO remove the concurrency limit after we fix test flakyness.
run: |
# Sort versions and get the latest version between the current version and 7.0, if the latest version is 7.0, skip the test.
versions_to_comare=$(printf "%s\n" "${{ matrix.engine.version }}" "7.0" | sort -V)
lt_version=$(echo "$versions_to_comare" | tail -n1)
if [[ "7.0" == "$lt_version" ]]; then
cargo test --all-features --release -- --test-threads=1 --skip valkey-GTE-7-2
else
cargo test --all-features --release -- --test-threads=1
fi
- name: Run logger tests
working-directory: ./logger_core
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
* Node: Add `JSON.STRLEN` and `JSON.STRAPPEND` command ([#2537](https://github.com/valkey-io/valkey-glide/pull/2537))
* Node: Add `FT.SEARCH` ([#2551](https://github.com/valkey-io/valkey-glide/pull/2551))
* Python: Fix example ([#2556](https://github.com/valkey-io/valkey-glide/issues/2556))
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587]https://github.com/valkey-io/valkey-glide/pull/2587)
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587](https://github.com/valkey-io/valkey-glide/pull/2587))
* Core: Add support to AZ Affinity read strategy ([#2539](https://github.com/valkey-io/valkey-glide/pull/2539))

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ socket-layer = [
"protobuf",
"tokio-util",
]
valkey-GTE-7-2 = []
standalone_heartbeat = []

[dev-dependencies]
Expand Down
7 changes: 5 additions & 2 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ where
) -> RedisResult<Self> {
let connection = Self {
connections: RefCell::new(HashMap::new()),
slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)),
slots: RefCell::new(SlotMap::new(
vec![],
cluster_params.read_from_replicas.clone(),
)),
auto_reconnect: RefCell::new(true),
cluster_params,
read_timeout: RefCell::new(None),
Expand Down Expand Up @@ -387,7 +390,7 @@ where
"can't parse node address",
)))?;
match parse_and_count_slots(&value, self.cluster_params.tls, addr).map(|slots_data| {
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas)
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas.clone())
}) {
Ok(new_slots) => {
result = Ok(new_slots);
Expand Down
119 changes: 51 additions & 68 deletions glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cluster_async::{ClusterParams, ConnectionFuture};
use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
Expand Down Expand Up @@ -233,10 +233,10 @@ where
}
}

fn round_robin_read_from_az_awareness_replica(
pub(crate) fn round_robin_read_from_az_awareness_replica(
&self,
slot_map_value: &SlotMapValue,
user_az: String,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
Expand Down Expand Up @@ -266,7 +266,7 @@ where
// Check if this replica’s AZ matches the user’s AZ.
if let Some(connection_details) = self.connection_details_for_address(replica.as_str())
{
if connection_details.1.az.as_deref() == Some(&user_az) {
if connection_details.1.az.as_deref() == Some(&client_az) {
// Attempt to update `latest_used_replica` with the index of this replica.
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
initial_index,
Expand All @@ -280,42 +280,33 @@ where
}
}

fn lookup_route(
&self,
route: &Route,
cluster_params: &Option<ClusterParams>,
) -> Option<ConnectionAndAddress<Connection>> {
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
if addrs.replicas.is_empty() {
return self.connection_for_address(addrs.primary.as_str());
}

//
match route.slot_addr() {
// Master strategy will be in use when the command is not read_only
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
// ReplicaOptional strategy will be in use when the command is read_only
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
SlotAddr::ReplicaOptional => match &self.read_from_replica_strategy {
ReadFromReplicaStrategy::AlwaysFromPrimary => {
self.connection_for_address(addrs.primary.as_str())
}
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
}
ReadFromReplicaStrategy::AZAffinity => self
.round_robin_read_from_az_awareness_replica(
slot_map_value,
cluster_params.as_ref().unwrap().client_az.clone()?,
),
ReadFromReplicaStrategy::AZAffinity(az) => {
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
}
},
// when the user strategy per command is replica_preffered
SlotAddr::ReplicaRequired => match self.read_from_replica_strategy {
ReadFromReplicaStrategy::AZAffinity => self
.round_robin_read_from_az_awareness_replica(
slot_map_value,
cluster_params.as_ref().unwrap().client_az.clone()?,
),
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
ReadFromReplicaStrategy::AZAffinity(az) => {
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
}
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
Expand All @@ -325,17 +316,9 @@ where
&self,
route: &Route,
) -> Option<ConnectionAndAddress<Connection>> {
self.connection_for_route_with_params(route, None)
}

pub(crate) fn connection_for_route_with_params(
&self,
route: &Route,
cluster_params: Option<ClusterParams>,
) -> Option<ConnectionAndAddress<Connection>> {
self.lookup_route(route, &cluster_params).or_else(|| {
self.lookup_route(route).or_else(|| {
if route.slot_addr() != SlotAddr::Master {
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master), &cluster_params)
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master))
} else {
None
}
Expand Down Expand Up @@ -513,7 +496,7 @@ mod tests {
}

fn create_container_with_az_strategy(
strategy: ReadFromReplicaStrategy,
// strategy: ReadFromReplicaStrategy,
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
Expand Down Expand Up @@ -571,7 +554,7 @@ mod tests {
ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: strategy,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
topology_hash: 0,
}
}
Expand Down Expand Up @@ -802,28 +785,17 @@ mod tests {

#[test]
fn get_connection_for_az_affinity_route() {
let container =
create_container_with_az_strategy(ReadFromReplicaStrategy::AZAffinity, false);
let mut cluster_params = ClusterParams::default();

cluster_params.client_az = Some("use-1a".to_string());
let container = create_container_with_az_strategy(false);

// slot number is not exits
assert!(container
.connection_for_route_with_params(
&Route::new(1001, SlotAddr::ReplicaOptional),
Some(cluster_params.clone())
)
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

// Get the replica that holds the slot 1002
assert_eq!(
21,
container
.connection_for_route_with_params(
&Route::new(1002, SlotAddr::ReplicaOptional),
Some(cluster_params.clone())
)
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
Expand All @@ -832,20 +804,14 @@ mod tests {
assert_eq!(
2,
container
.connection_for_route_with_params(
&Route::new(1500, SlotAddr::Master),
Some(cluster_params.clone())
)
.connection_for_route(&Route::new(1500, SlotAddr::Master))
.unwrap()
.1
);

// receive one of the replicas that holds the slot 2001 and is in the availability zone of the client ("use-1a")
assert!(one_of(
container.connection_for_route_with_params(
&Route::new(2001, SlotAddr::ReplicaRequired),
Some(cluster_params.clone())
),
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));

Expand All @@ -854,10 +820,7 @@ mod tests {
assert_eq!(
31,
container
.connection_for_route_with_params(
&Route::new(2001, SlotAddr::ReplicaOptional),
Some(cluster_params.clone())
)
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
Expand All @@ -867,10 +830,7 @@ mod tests {
assert_eq!(
32,
container
.connection_for_route_with_params(
&Route::new(2001, SlotAddr::ReplicaOptional),
Some(cluster_params.clone())
)
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
Expand All @@ -880,15 +840,38 @@ mod tests {
assert_eq!(
3,
container
.connection_for_route_with_params(
&Route::new(2001, SlotAddr::ReplicaOptional),
Some(cluster_params.clone())
)
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
}

#[test]
fn get_connection_for_az_affinity_route_round_robin() {
let container = create_container_with_az_strategy(false);

let mut addresses = vec![
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
];
addresses.sort();
assert_eq!(addresses, vec![31, 31, 33, 33]);
}

#[test]
fn get_connection_by_address() {
let container = create_container();
Expand Down
19 changes: 2 additions & 17 deletions glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{
};
use std::net::SocketAddr;

use future::ok;
use futures::prelude::*;
use futures_util::{future::BoxFuture, join};
use tracing::warn;
Expand Down Expand Up @@ -374,22 +373,8 @@ where
match info_res {
Ok(value) => {
let info_dict: Result<InfoDict, RedisError> = FromRedisValue::from_redis_value(&value);
if let Ok(info_dict) = info_dict {
if let Some(az) = info_dict.get::<String>("availability_zone") {
conn_details.az = Some(az);
Ok(())
} else {
Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to get availability_zone from info",
)))
}
} else {
Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to parse info command",
)))
}
conn_details.az = info_dict?.get::<String>("availability_zone");
Ok(())
}
Err(_) => {
// Handle the error case for the INFO command
Expand Down
10 changes: 4 additions & 6 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ where
conn_lock: RwLock::new(ConnectionsContainer::new(
Default::default(),
connections,
cluster_params.read_from_replicas,
cluster_params.read_from_replicas.clone(),
0,
)),
cluster_params: cluster_params.clone(),
Expand Down Expand Up @@ -1807,7 +1807,7 @@ where
*write_guard = ConnectionsContainer::new(
new_slots,
new_connections,
inner.cluster_params.read_from_replicas,
inner.cluster_params.read_from_replicas.clone(),
topology_hash,
);
Ok(())
Expand Down Expand Up @@ -2026,9 +2026,7 @@ where
)
}
InternalSingleNodeRouting::SpecificNode(route) => {
match read_guard
.connection_for_route_with_params(&route, Some(core.cluster_params.clone()))
{
match read_guard.connection_for_route(&route) {
Some((conn, address)) => ConnectionCheck::Found((conn, address)),
None => {
// No connection is found for the given route:
Expand Down Expand Up @@ -2498,7 +2496,7 @@ where
curr_retry,
inner.cluster_params.tls,
num_of_nodes_to_query,
inner.cluster_params.read_from_replicas,
inner.cluster_params.read_from_replicas.clone(),
),
failed_addresses,
)
Expand Down
Loading

0 comments on commit 5edc891

Please sign in to comment.