Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add penalty for unhealthy nodes in latency-based routing #581

Draft
wants to merge 3 commits into
base: dynamic_route
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use crate::agent::http_transport::dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
};

// Some big value implying that node is unhealthy, should be much bigger than node's latency.
const MAX_LATENCY: Duration = Duration::from_secs(500);
// When a node is detected as unhealthy, we take the following actions:
// - Remove the node entirely from the routing.
// - Penalize its moving average by adding a specified value to the stored latency window. This ensures that any node exhibiting intermittent outages is appropriately penalized.
const PUNISH_LATENCY: Duration = Duration::from_secs(2);

const WINDOW_SIZE: usize = 15;

Expand All @@ -20,6 +22,8 @@ type LatencyMovAvg = SumTreeSMA<Duration, u32, WINDOW_SIZE>;
#[derive(Clone, Debug)]
struct WeightedNode {
node: Node,
/// Reflects the status of the most recent health check.
is_healthy: bool,
/// Moving mean of latencies measurements.
latency_mov_avg: LatencyMovAvg,
/// Weight of the node (invers of the average latency), used for stochastic weighted random sampling.
Expand Down Expand Up @@ -49,14 +53,14 @@ impl LatencyRoutingSnapshot {
/// Helper function to sample nodes based on their weights.
/// Here weight index is selected based on the input number in range [0, 1]
#[inline(always)]
fn weighted_sample(weights: &[f64], number: f64) -> Option<usize> {
fn weighted_sample(weighted_nodes: &[(f64, &Node)], number: f64) -> Option<usize> {
if !(0.0..=1.0).contains(&number) {
return None;
}
let sum: f64 = weights.iter().sum();
let sum: f64 = weighted_nodes.iter().map(|n| n.0).sum();
let mut weighted_number = number * sum;
for (idx, weight) in weights.iter().enumerate() {
weighted_number -= weight;
for (idx, weighted_node) in weighted_nodes.iter().enumerate() {
weighted_number -= weighted_node.0;
if weighted_number <= 0.0 {
return Some(idx);
}
Expand All @@ -70,18 +74,21 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
}

fn next(&self) -> Option<Node> {
// We select a node based on it's weight, using a stochastic weighted random sampling approach.
let weights = self
.weighted_nodes
.iter()
.map(|n| n.weight)
.collect::<Vec<_>>();
// We select a healthy node based on its weight, using a stochastic weighted random sampling approach.

// Preallocate array for a better efficiency.
let mut healthy_weighted_nodes = Vec::with_capacity(self.weighted_nodes.len());
for n in &self.weighted_nodes {
if n.is_healthy {
healthy_weighted_nodes.push((n.weight, &n.node));
}
}
// Generate a random float in the range [0, 1)
let mut rng = rand::thread_rng();
let rand_num = rng.gen::<f64>();
// Using this random float and an array of weights we get an index of the node.
let idx = weighted_sample(weights.as_slice(), rand_num);
idx.map(|idx| self.weighted_nodes[idx].node.clone())
let idx = weighted_sample(healthy_weighted_nodes.as_slice(), rand_num);
idx.map(|idx| healthy_weighted_nodes[idx].1.clone())
}

fn sync_nodes(&mut self, nodes: &[Node]) -> bool {
Expand Down Expand Up @@ -116,11 +123,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
return false;
}

// If latency is None (meaning Node is unhealthy), we assign some big value
let latency = health.latency().unwrap_or(MAX_LATENCY);
// If the node is unhealthy, we penalize it's moving average.
let latency = health.latency().unwrap_or(PUNISH_LATENCY);

if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) {
// Node is already in the array (it is not the first update_node() call).
self.weighted_nodes[idx].is_healthy = health.is_healthy();
self.weighted_nodes[idx].latency_mov_avg.add_sample(latency);
let latency_avg = self.weighted_nodes[idx].latency_mov_avg.get_average();
// As nodes with smaller average latencies are preferred for routing, we use inverted values for weights.
Expand All @@ -131,6 +139,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
latency_mov_avg.add_sample(latency);
let weight = 1.0 / latency_mov_avg.get_average().as_secs_f64();
self.weighted_nodes.push(WeightedNode {
is_healthy: health.is_healthy(),
latency_mov_avg,
node: node.clone(),
weight,
Expand All @@ -152,7 +161,8 @@ mod tests {
node::Node,
snapshot::{
latency_based_routing::{
weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode, MAX_LATENCY,
weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode,
PUNISH_LATENCY,
},
routing_snapshot::RoutingSnapshot,
},
Expand Down Expand Up @@ -212,6 +222,7 @@ mod tests {
Duration::from_millis(1500)
);
assert_eq!(weighted_node.weight, 1.0 / 1.5);
assert_eq!(snapshot.next().unwrap(), node);
// Check third update
let health = HealthCheckStatus::new(Some(Duration::from_secs(3)));
let is_updated = snapshot.update_node(&node, health);
Expand All @@ -222,12 +233,25 @@ mod tests {
Duration::from_millis(2000)
);
assert_eq!(weighted_node.weight, 0.5);
assert_eq!(snapshot.next().unwrap(), node);
// Check forth update with none
let health = HealthCheckStatus::new(None);
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0);
let avg_latency = Duration::from_secs_f64((PUNISH_LATENCY.as_secs() as f64 + 6.0) / 4.0);
assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency);
assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64());
assert_eq!(snapshot.weighted_nodes.len(), 1);
assert_eq!(snapshot.existing_nodes.len(), 1);
// No nodes returned, as the node is unhealthy.
assert!(snapshot.next().is_none());
// Check fifth update
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
let avg_latency = Duration::from_secs_f64((PUNISH_LATENCY.as_secs() as f64 + 7.0) / 5.0);
assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency);
assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64());
assert_eq!(snapshot.weighted_nodes.len(), 1);
Expand All @@ -250,6 +274,7 @@ mod tests {
);
// Add node_1 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
is_healthy: true,
node: node_1.clone(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
Expand All @@ -274,6 +299,7 @@ mod tests {
assert!(snapshot.weighted_nodes.is_empty());
// Add node_2 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
is_healthy: true,
node: node_2.clone(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
Expand All @@ -289,6 +315,7 @@ mod tests {
assert_eq!(snapshot.weighted_nodes[0].node, node_2);
// Add node_3 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
is_healthy: true,
node: node_3,
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
Expand All @@ -308,11 +335,12 @@ mod tests {
#[test]
fn test_weighted_sample() {
// Case 1: empty array
let arr: &[f64] = &[];
let node = Node::new("ic0.com").unwrap();
let arr = &[(0.5, &node)];
let idx = weighted_sample(arr, 0.5);
assert_eq!(idx, None);
// Case 2: single element in array
let arr: &[f64] = &[1.0];
let arr = &[(1.0, &node)];
let idx = weighted_sample(arr, 0.0);
assert_eq!(idx, Some(0));
let idx = weighted_sample(arr, 1.0);
Expand All @@ -323,7 +351,7 @@ mod tests {
let idx = weighted_sample(arr, 1.1);
assert_eq!(idx, None);
// Case 3: two elements in array (second element has twice the weight of the first)
let arr: &[f64] = &[1.0, 2.0]; // prefixed_sum = [1.0, 3.0]
let arr = &[(1.0, &node), (2.0, &node)]; // prefixed_sum = [1.0, 3.0]
let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0
assert_eq!(idx, Some(0));
let idx = weighted_sample(arr, 0.33); // 0.33 * 3.0 < 1.0
Expand All @@ -338,7 +366,7 @@ mod tests {
let idx = weighted_sample(arr, 1.1);
assert_eq!(idx, None);
// Case 4: four elements in array
let arr: &[f64] = &[1.0, 2.0, 1.5, 2.5]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
let arr = &[(1.0, &node), (2.0, &node), (1.5, &node), (2.5, &node)]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0
assert_eq!(idx, Some(0)); // probability ~0.14
let idx = weighted_sample(arr, 0.15); // 0.15 * 7 > 1.0
Expand Down
Loading