From 68171e341bfe8bcac66f5889d63cad3ccccca32c Mon Sep 17 00:00:00 2001 From: Nikolay Komarevskiy Date: Mon, 12 Aug 2024 11:31:19 +0300 Subject: [PATCH] chore: add penalty in latency-based routing --- .../snapshot/latency_based_routing.rs | 72 +++++++++++++------ 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs index 1ae101363..a28c8ac81 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs @@ -7,8 +7,10 @@ use crate::agent::http_transport::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, }; -// Some big value implying that node is unhealthy, should be much bigger than node's latency. -const MAX_LATENCY: Duration = Duration::from_secs(500); +// When a node is detected as unhealthy, we take the following actions: +// - Remove the node entirely from the routing process. +// - Penalize its moving average by adding a specified value to the stored latency window. This ensures that any node exhibiting intermittent outages is appropriately penalized. +const PUNISH_LATENCY: Duration = Duration::from_secs(2); const WINDOW_SIZE: usize = 15; @@ -20,6 +22,8 @@ type LatencyMovAvg = SumTreeSMA; #[derive(Clone, Debug)] struct WeightedNode { node: Node, + /// Reflects the status of the most recent health check. + is_healthy: bool, /// Moving mean of latencies measurements. latency_mov_avg: LatencyMovAvg, /// Weight of the node (invers of the average latency), used for stochastic weighted random sampling. @@ -49,14 +53,14 @@ impl LatencyRoutingSnapshot { /// Helper function to sample nodes based on their weights. /// Here weight index is selected based on the input number in range [0, 1] #[inline(always)] -fn weighted_sample(weights: &[f64], number: f64) -> Option { +fn weighted_sample(weighted_nodes: &[(f64, &Node)], number: f64) -> Option { if !(0.0..=1.0).contains(&number) { return None; } - let sum: f64 = weights.iter().sum(); + let sum: f64 = weighted_nodes.iter().map(|n| n.0).sum(); let mut weighted_number = number * sum; - for (idx, weight) in weights.iter().enumerate() { - weighted_number -= weight; + for (idx, weighted_node) in weighted_nodes.iter().enumerate() { + weighted_number -= weighted_node.0; if weighted_number <= 0.0 { return Some(idx); } @@ -70,18 +74,21 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { } fn next(&self) -> Option { - // We select a node based on it's weight, using a stochastic weighted random sampling approach. - let weights = self - .weighted_nodes - .iter() - .map(|n| n.weight) - .collect::>(); + // We select a healthy node based on its weight, using a stochastic weighted random sampling approach. + + // Preallocate array for a better efficiency. + let mut healthy_weighted_nodes = Vec::with_capacity(self.weighted_nodes.len()); + for n in &self.weighted_nodes { + if n.is_healthy { + healthy_weighted_nodes.push((n.weight, &n.node)); + } + } // Generate a random float in the range [0, 1) let mut rng = rand::thread_rng(); let rand_num = rng.gen::(); // Using this random float and an array of weights we get an index of the node. - let idx = weighted_sample(weights.as_slice(), rand_num); - idx.map(|idx| self.weighted_nodes[idx].node.clone()) + let idx = weighted_sample(&healthy_weighted_nodes.as_slice(), rand_num); + idx.map(|idx| healthy_weighted_nodes[idx].1.clone()) } fn sync_nodes(&mut self, nodes: &[Node]) -> bool { @@ -116,11 +123,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { return false; } - // If latency is None (meaning Node is unhealthy), we assign some big value - let latency = health.latency().unwrap_or(MAX_LATENCY); + // If the node is unhealthy, we penalize it's moving average. + let latency = health.latency().unwrap_or(PUNISH_LATENCY); if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) { // Node is already in the array (it is not the first update_node() call). + self.weighted_nodes[idx].is_healthy = health.is_healthy(); self.weighted_nodes[idx].latency_mov_avg.add_sample(latency); let latency_avg = self.weighted_nodes[idx].latency_mov_avg.get_average(); // As nodes with smaller average latencies are preferred for routing, we use inverted values for weights. @@ -131,6 +139,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { latency_mov_avg.add_sample(latency); let weight = 1.0 / latency_mov_avg.get_average().as_secs_f64(); self.weighted_nodes.push(WeightedNode { + is_healthy: health.is_healthy(), latency_mov_avg, node: node.clone(), weight, @@ -152,7 +161,8 @@ mod tests { node::Node, snapshot::{ latency_based_routing::{ - weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode, MAX_LATENCY, + weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode, + PUNISH_LATENCY, }, routing_snapshot::RoutingSnapshot, }, @@ -212,6 +222,7 @@ mod tests { Duration::from_millis(1500) ); assert_eq!(weighted_node.weight, 1.0 / 1.5); + assert_eq!(snapshot.next().unwrap(), node); // Check third update let health = HealthCheckStatus::new(Some(Duration::from_secs(3))); let is_updated = snapshot.update_node(&node, health); @@ -222,12 +233,25 @@ mod tests { Duration::from_millis(2000) ); assert_eq!(weighted_node.weight, 0.5); + assert_eq!(snapshot.next().unwrap(), node); // Check forth update with none let health = HealthCheckStatus::new(None); let is_updated = snapshot.update_node(&node, health); assert!(is_updated); let weighted_node = snapshot.weighted_nodes.first().unwrap(); - let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0); + let avg_latency = Duration::from_secs_f64((PUNISH_LATENCY.as_secs() as f64 + 6.0) / 4.0); + assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency); + assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64()); + assert_eq!(snapshot.weighted_nodes.len(), 1); + assert_eq!(snapshot.existing_nodes.len(), 1); + // No nodes returned, as the node is unhealthy. + assert!(snapshot.next().is_none()); + // Check fifth update + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + let avg_latency = Duration::from_secs_f64((PUNISH_LATENCY.as_secs() as f64 + 7.0) / 5.0); assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency); assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64()); assert_eq!(snapshot.weighted_nodes.len(), 1); @@ -250,6 +274,7 @@ mod tests { ); // Add node_1 to weighted_nodes manually snapshot.weighted_nodes.push(WeightedNode { + is_healthy: true, node: node_1.clone(), latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO), weight: 0.0, @@ -274,6 +299,7 @@ mod tests { assert!(snapshot.weighted_nodes.is_empty()); // Add node_2 to weighted_nodes manually snapshot.weighted_nodes.push(WeightedNode { + is_healthy: true, node: node_2.clone(), latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO), weight: 0.0, @@ -289,6 +315,7 @@ mod tests { assert_eq!(snapshot.weighted_nodes[0].node, node_2); // Add node_3 to weighted_nodes manually snapshot.weighted_nodes.push(WeightedNode { + is_healthy: true, node: node_3, latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO), weight: 0.0, @@ -308,11 +335,12 @@ mod tests { #[test] fn test_weighted_sample() { // Case 1: empty array - let arr: &[f64] = &[]; + let node = Node::new("ic0.com").unwrap(); + let arr = &[(0.5, &node)]; let idx = weighted_sample(arr, 0.5); assert_eq!(idx, None); // Case 2: single element in array - let arr: &[f64] = &[1.0]; + let arr = &[(1.0, &node)]; let idx = weighted_sample(arr, 0.0); assert_eq!(idx, Some(0)); let idx = weighted_sample(arr, 1.0); @@ -323,7 +351,7 @@ mod tests { let idx = weighted_sample(arr, 1.1); assert_eq!(idx, None); // Case 3: two elements in array (second element has twice the weight of the first) - let arr: &[f64] = &[1.0, 2.0]; // prefixed_sum = [1.0, 3.0] + let arr = &[(1.0, &node), (2.0, &node)]; // prefixed_sum = [1.0, 3.0] let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0 assert_eq!(idx, Some(0)); let idx = weighted_sample(arr, 0.33); // 0.33 * 3.0 < 1.0 @@ -338,7 +366,7 @@ mod tests { let idx = weighted_sample(arr, 1.1); assert_eq!(idx, None); // Case 4: four elements in array - let arr: &[f64] = &[1.0, 2.0, 1.5, 2.5]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0] + let arr = &[(1.0, &node), (2.0, &node), (1.5, &node), (2.5, &node)]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0] let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0 assert_eq!(idx, Some(0)); // probability ~0.14 let idx = weighted_sample(arr, 0.15); // 0.15 * 7 > 1.0