Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Nov 5, 2024
1 parent b254424 commit a7a336f
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 61 deletions.
78 changes: 53 additions & 25 deletions glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use crate::cluster_topology::get_slot;
use crate::cmd::{Arg, Cmd};
use crate::types::Value;
use crate::{ErrorKind, RedisResult};
use crate::{ErrorKind, RedisError, RedisResult};
use std::borrow::Cow;
use std::iter::Once;

Expand Down Expand Up @@ -96,7 +96,12 @@ pub enum MultipleNodeRoutingInfo {
AllNodes,
/// Route to all primaries in the cluster
AllMasters,
/// Instructions for how to split a multi-slot command (e.g. MGET, MSET) into sub-commands. Each tuple is the route for each subcommand, and the indices of the arguments from the original command that should be copied to the subcommand.
/// Routes the request to multiple slots.
/// This variant contains instructions for splitting a multi-slot command (e.g., MGET, MSET) into sub-commands.
/// Each tuple consists of a `Route` representing the target node for the subcommand,
/// and a vector of argument indices from the original command that should be copied to each subcommand.
/// The `MultiSlotArgPattern` specifies the pattern of the command’s arguments, indicating how they are organized
/// (e.g., only keys, key-value pairs, etc).
MultiSlot((Vec<(Route, Vec<usize>)>, MultiSlotArgPattern)),
}

Expand Down Expand Up @@ -251,8 +256,9 @@ type MultiSlotResIdxIter<'a> = std::iter::Map<
fn(&'a (Route, Vec<usize>)) -> Cow<'a, [usize]>,
>;

/// Generates an iterator that returns grouped result indices for each slot in a multi-slot command response,
/// structured according to the given argument pattern.
/// Generates an iterator that yields a vector of result indices for each slot within the final merged results array for a multi-slot command response.
/// The indices are calculated based on the `args_pattern` and the positions of the arguments for each slot-specific request in the original multi-slot request,
/// ensuring that the results are ordered according to the structure of the initial multi-slot command.
///
/// # Arguments
/// * `route_arg_indices` - A reference to a vector where each element is a tuple containing a route and
Expand All @@ -270,54 +276,71 @@ type MultiSlotResIdxIter<'a> = std::iter::Map<
/// - Therefore, the iterator generated by this function would yield grouped result indices as follows:
/// - Slot "foo" is mapped to `[0, 2]` in the final result order.
/// - Slot "foo2" is mapped to `[1]`.
fn multi_slot_sorting_order<'a>(
fn calculate_multi_slot_result_indices<'a>(
route_arg_indices: &'a [(Route, Vec<usize>)],
args_pattern: &MultiSlotArgPattern,
) -> MultiSlotResIdxIter<'a> {
) -> RedisResult<MultiSlotResIdxIter<'a>> {
let check_indices_input = |step_count: usize| {
for (_, indices) in route_arg_indices {
if indices.len() % step_count != 0 {
return Err(RedisError::from((
ErrorKind::ClientError,
"Invalid indices input detected",
format!(
"Expected argument pattern with tuples of size {step_count}, but found indices: {indices:?}"
),
)));
}
}
Ok(())
};

match args_pattern {
MultiSlotArgPattern::KeysOnly => route_arg_indices
MultiSlotArgPattern::KeysOnly => Ok(route_arg_indices
.iter()
.map(|(_, indices)| Cow::Borrowed(indices)),
.map(|(_, indices)| Cow::Borrowed(indices))),
MultiSlotArgPattern::KeysAndPath => {
// The last index corresponds to the path, skip it
route_arg_indices
Ok(route_arg_indices
.iter()
.map(|(_, indices)| Cow::Borrowed(&indices[..indices.len() - 1]))
.map(|(_, indices)| Cow::Borrowed(&indices[..indices.len() - 1])))
}
MultiSlotArgPattern::KeyPathValueTriples => {
// For each triplet (key, path, value) we receive a single response.
// For example, for argument indices: [(_, [0,1,2]), (_, [3,4,5,9,10,11]), (_, [6,7,8])]
// The resulting grouped indices would be: [0], [1, 3], [2]
route_arg_indices.iter().map(|(_, indices)| {
check_indices_input(3)?;
Ok(route_arg_indices.iter().map(|(_, indices)| {
Cow::Owned(
indices
.iter()
.step_by(3)
.map(|idx| idx / 3)
.collect::<Vec<usize>>(),
)
})
}))
}
MultiSlotArgPattern::KeyValuePairs =>
// For each pair (key, value) we receive a single response.
// For example, for argument indices: [(_, [0,1]), (_, [2,3,6,7]), (_, [4,5])]
// The resulting grouped indices would be: [0], [1, 3], [2]
{
route_arg_indices.iter().map(|(_, indices)| {
check_indices_input(2)?;
Ok(route_arg_indices.iter().map(|(_, indices)| {
Cow::Owned(
indices
.iter()
.step_by(2)
.map(|idx| idx / 2)
.collect::<Vec<usize>>(),
)
})
}))
}
}
}

/// Combines multi-slot command results from the `values` field, where each entry is expected to be an array of results,
/// into a single ordered array.
/// Merges the results of a multi-slot command from the `values` field, where each entry is expected to be an array of results.
/// The combined results are ordered according to the sequence in which they appeared in the original command.
///
/// # Arguments
///
Expand All @@ -332,8 +355,7 @@ fn multi_slot_sorting_order<'a>(
///
/// * `args_pattern` - Specifies the argument pattern (e.g., `KeysOnly`, `KeyValuePairs`, ...).
/// The pattern defines how the argument indices are grouped for each slot and determines
/// the structure of `sorting_order`, which is used to distribute results from `values` into the correct
/// positions in the final array.
/// the ordering of results from `values` as they are placed in the final combined array.
///
/// # Returns
///
Expand All @@ -343,7 +365,7 @@ pub(crate) fn combine_and_sort_array_results(
route_arg_indices: &[(Route, Vec<usize>)],
args_pattern: &MultiSlotArgPattern,
) -> RedisResult<Value> {
let sorting_order = multi_slot_sorting_order(route_arg_indices, args_pattern);
let result_indices = calculate_multi_slot_result_indices(route_arg_indices, args_pattern)?;
let mut results = Vec::new();
results.resize(
values.iter().fold(0, |acc, value| match value {
Expand All @@ -352,13 +374,19 @@ pub(crate) fn combine_and_sort_array_results(
}),
Value::Nil,
);
assert_eq!(
values.len(),
sorting_order.len(),
"values={values:?}\nsorting_order={sorting_order:?}"
);
if values.len() != result_indices.len() {
return Err(RedisError::from((
ErrorKind::ClientError,
"Mismatch in the number of multi-slot results compared to the expected result count.",
format!(
"Expected: {:?}, Found: {:?}\nvalues: {values:?}\nresult_indices: {result_indices:?}",
values.len(),
result_indices.len()
),
)));
}

for (key_indices, value) in sorting_order.into_iter().zip(values) {
for (key_indices, value) in result_indices.into_iter().zip(values) {
match value {
Value::Array(values) => {
assert_eq!(values.len(), key_indices.len());
Expand Down
72 changes: 36 additions & 36 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1395,13 +1395,13 @@ export class BaseClient {
* Removes the specified keys. A key is ignored if it does not exist.
*
* @see {@link https://valkey.io/commands/del/|valkey.io} for details.
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity.
*
* @param keys - The keys we wanted to remove.
Expand Down Expand Up @@ -1504,13 +1504,13 @@ export class BaseClient {
/** Retrieve the values of multiple keys.
*
* @see {@link https://valkey.io/commands/mget/|valkey.io} for details.
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity.
*
* @param keys - A list of keys to retrieve values for.
Expand All @@ -1537,17 +1537,17 @@ export class BaseClient {
/** Set multiple keys to multiple values in a single operation.
*
* @see {@link https://valkey.io/commands/mset/|valkey.io} for details.
*
* @remarks In cluster mode, if keys in `keyValueMap` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keyValueMap` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity.
*
* @param keysAndValues - A list of key-value pairs to set.
*
*
* @returns A simple "OK" response.
*
* @example
Expand Down Expand Up @@ -3456,13 +3456,13 @@ export class BaseClient {

/**
* Returns the number of keys in `keys` that exist in the database.
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity.
*
* @see {@link https://valkey.io/commands/exists/|valkey.io} for details.
Expand All @@ -3486,13 +3486,13 @@ export class BaseClient {
* Removes the specified keys. A key is ignored if it does not exist.
* This command, similar to {@link del}, removes specified keys and ignores non-existent ones.
* However, this command does not block the server, while {@link https://valkey.io/commands/del|`DEL`} does.
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity.
*
* @see {@link https://valkey.io/commands/unlink/|valkey.io} for details.
Expand Down Expand Up @@ -7120,13 +7120,13 @@ export class BaseClient {
* Updates the last access time of the specified keys.
*
* @see {@link https://valkey.io/commands/touch/|valkey.io} for more details.
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity.
*
* @param keys - The keys to update the last access time of.
Expand All @@ -7150,13 +7150,13 @@ export class BaseClient {
* transaction. Executing a transaction will automatically flush all previously watched keys.
*
* @see {@link https://valkey.io/commands/watch/|valkey.io} and {@link https://valkey.io/topics/transactions/#cas|Valkey Glide Wiki} for more details.
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
*
* @remarks In cluster mode, if keys in `keys` map to different hash slots,
* the command will be split across these slots and executed separately for each.
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* This means the command is atomic only at the slot level. If one or more slot-specific
* requests fail, the entire call will return the first encountered error, even
* though some requests may have succeeded while others did not.
* If this behavior impacts your application logic, consider splitting the
* If this behavior impacts your application logic, consider splitting the
* request into sub-requests per slot to ensure atomicity. *
* @param keys - The keys to watch.
* @returns A simple `"OK"` response.
Expand Down

0 comments on commit a7a336f

Please sign in to comment.