diff --git a/Cargo.lock b/Cargo.lock index 9d453a98..341c9e56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.5.2" @@ -133,13 +139,24 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + [[package]] name = "async-lock" version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.3.1", "event-listener-strategy", "pin-project-lite", ] @@ -155,6 +172,15 @@ dependencies = [ "syn 2.0.76", ] +[[package]] +name = "async-watch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a078faf4e27c0c6cc0efb20e5da59dcccc04968ebf2801d8e0b2195124cdcdb2" +dependencies = [ + "event-listener 2.5.3", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -682,6 +708,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.3.1" @@ -699,7 +731,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" dependencies = [ - "event-listener", + "event-listener 5.3.1", "pin-project-lite", ] @@ -1014,7 +1046,11 @@ dependencies = [ name = "ic-agent" version = "0.37.1" dependencies = [ + "arc-swap", + "async-channel", "async-lock", + "async-trait", + "async-watch", "backoff", "cached", "candid", @@ -1047,9 +1083,12 @@ dependencies = [ "serde_repr", "sha2 0.10.8", "simple_asn1", + "stop-token", "thiserror", "time", "tokio", + "tracing", + "tracing-subscriber", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -1468,6 +1507,16 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -1535,6 +1584,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.13.2" @@ -2190,6 +2245,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2293,6 +2357,18 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "stop-token" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af91f480ee899ab2d9f8435bfdfc14d08a5754bd9d3fef1f1a1c23336aad6c8b" +dependencies = [ + "async-channel", + "cfg-if", + "futures-core", + "pin-project-lite", +] + [[package]] name = "string_cache" version = "0.8.7" @@ -2414,6 +2490,16 @@ dependencies = [ "syn 2.0.76", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.36" @@ -2557,9 +2643,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.76", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -2567,6 +2665,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -2649,6 +2773,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.5" diff --git a/ic-agent/Cargo.toml b/ic-agent/Cargo.toml index 9c5462b7..d69bad2c 100644 --- a/ic-agent/Cargo.toml +++ b/ic-agent/Cargo.toml @@ -15,7 +15,11 @@ keywords = ["internet-computer", "agent", "icp", "dfinity"] include = ["src", "Cargo.toml", "../LICENSE", "README.md"] [dependencies] +arc-swap = "1.7" +async-channel = "1.9" +async-trait = "0.1" async-lock = "3.3" +async-watch = "0.3" backoff = "0.4.0" cached = { version = "0.52", features = ["ahash"], default-features = false } candid = { workspace = true } @@ -41,8 +45,10 @@ serde_cbor = { workspace = true } serde_repr = { workspace = true } sha2 = { workspace = true } simple_asn1 = "0.6.1" +stop-token = "0.7" thiserror = { workspace = true } time = { workspace = true } +tracing = "0.1" url = "2.1.0" [dependencies.reqwest] @@ -67,6 +73,7 @@ web-sys = { version = "0.3", features = ["Window"], optional = true } [dev-dependencies] serde_json.workspace = true +tracing-subscriber = "0.3" [target.'cfg(not(target_family = "wasm"))'.dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/ic-agent/src/agent/route_provider.rs b/ic-agent/src/agent/route_provider.rs index 0db3d2a0..d958faaa 100644 --- a/ic-agent/src/agent/route_provider.rs +++ b/ic-agent/src/agent/route_provider.rs @@ -7,6 +7,8 @@ use url::Url; use crate::agent::AgentError; +pub mod dynamic_routing; + const IC0_DOMAIN: &str = "ic0.app"; const ICP0_DOMAIN: &str = "icp0.io"; const ICP_API_DOMAIN: &str = "icp-api.io"; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs similarity index 93% rename from ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs index e05f6f4d..ec6022a5 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs @@ -7,19 +7,15 @@ use std::{ use arc_swap::ArcSwap; use candid::Principal; +use futures_util::{select, FutureExt}; use reqwest::Client; +use stop_token::StopSource; use thiserror::Error; -use tokio::{ - runtime::Handle, - sync::{mpsc, watch}, - time::timeout, -}; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{error, info, warn}; use url::Url; use crate::{ - agent::http_transport::{ + agent::route_provider::{ dynamic_routing::{ health_check::{HealthCheck, HealthChecker, HealthManagerActor}, messages::FetchedNodes, @@ -28,7 +24,7 @@ use crate::{ snapshot::routing_snapshot::RoutingSnapshot, type_aliases::AtomicSwap, }, - route_provider::RouteProvider, + RouteProvider, }, AgentError, }; @@ -64,12 +60,10 @@ pub struct DynamicRouteProvider { check_period: Duration, /// Snapshot of the routing nodes. routing_snapshot: AtomicSwap, - /// Task tracker for managing the spawned tasks. - tracker: TaskTracker, /// Initial seed nodes, which are used for the initial fetching of the nodes. seeds: Vec, - /// Cancellation token for stopping the spawned tasks. - token: CancellationToken, + /// Cancellation source for stopping the spawned tasks. + stop: StopSource, } /// An error that occurred when the DynamicRouteProvider service was running. @@ -153,9 +147,8 @@ impl DynamicRouteProviderBuilder { checker: self.checker, check_period: self.check_period, routing_snapshot: self.routing_snapshot, - tracker: TaskTracker::new(), seeds: self.seeds, - token: CancellationToken::new(), + stop: StopSource::new(), }; route_provider.run().await; @@ -203,10 +196,10 @@ where pub async fn run(&self) { info!("{DYNAMIC_ROUTE_PROVIDER}: started ..."); // Communication channel between NodesFetchActor and HealthManagerActor. - let (fetch_sender, fetch_receiver) = watch::channel(None); + let (fetch_sender, fetch_receiver) = async_watch::channel(None); // Communication channel with HealthManagerActor to receive info about healthy seed nodes (used only once). - let (init_sender, mut init_receiver) = mpsc::channel(1); + let (init_sender, init_receiver) = async_channel::bounded(1); // Start the receiving part first. let health_manager_actor = HealthManagerActor::new( @@ -215,10 +208,9 @@ where Arc::clone(&self.routing_snapshot), fetch_receiver, init_sender, - self.token.clone(), + self.stop.token(), ); - self.tracker - .spawn(async move { health_manager_actor.run().await }); + crate::util::spawn(async move { health_manager_actor.run().await }); // Dispatch all seed nodes for initial health checks if let Err(err) = fetch_sender.send(Some(FetchedNodes { @@ -229,16 +221,16 @@ where // Try await for healthy seeds. let start = Instant::now(); - match timeout(TIMEOUT_AWAIT_HEALTHY_SEED, init_receiver.recv()).await { - Ok(_) => info!( - "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", - start.elapsed() - ), - Err(_) => warn!( + select! { + _ = crate::util::sleep(TIMEOUT_AWAIT_HEALTHY_SEED).fuse() => warn!( "{DYNAMIC_ROUTE_PROVIDER}: no healthy seeds found within {:?}", start.elapsed() ), - }; + _ = init_receiver.recv().fuse() => info!( + "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", + start.elapsed() + ) + } // We can close the channel now. init_receiver.close(); @@ -248,33 +240,15 @@ where self.fetch_retry_interval, fetch_sender, Arc::clone(&self.routing_snapshot), - self.token.clone(), + self.stop.token(), ); - self.tracker.spawn(async move { fetch_actor.run().await }); + crate::util::spawn(async move { fetch_actor.run().await }); info!( "{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully" ); } } -// Gracefully stop the inner spawned tasks running in the background. -impl Drop for DynamicRouteProvider { - fn drop(&mut self) { - self.token.cancel(); - self.tracker.close(); - let tracker = self.tracker.clone(); - // If no runtime is available do nothing. - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - tracker.wait().await; - warn!("{DYNAMIC_ROUTE_PROVIDER}: stopped gracefully"); - }); - } else { - error!("{DYNAMIC_ROUTE_PROVIDER}: no runtime available, cannot stop the spawned tasks"); - } - } -} - #[cfg(test)] mod tests { use candid::Principal; @@ -286,8 +260,8 @@ mod tests { use tracing::Level; use tracing_subscriber::FmtSubscriber; - use crate::{ - agent::http_transport::{ + use crate::agent::{ + route_provider::{ dynamic_routing::{ dynamic_route_provider::{ DynamicRouteProviderBuilder, IC0_SEED_DOMAIN, MAINNET_ROOT_SUBNET_ID, @@ -301,8 +275,7 @@ mod tests { assert_routed_domains, route_n_times, NodeHealthCheckerMock, NodesFetcherMock, }, }, - route_provider::RouteProvider, - ReqwestTransport, + RouteProvider, }, Agent, AgentError, }; @@ -367,17 +340,14 @@ mod tests { .build() .await; let route_provider = Arc::new(route_provider) as Arc; - let transport = - ReqwestTransport::create_with_client_route(Arc::clone(&route_provider), client) - .expect("failed to create transport"); let agent = Agent::builder() - .with_transport(transport) + .with_arc_route_provider(route_provider.clone()) .build() .expect("failed to create an agent"); let subnet_id = Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(); // Assert that seed (ic0.app) is not used for routing. Henceforth, only discovered API nodes are used. assert_no_routing_via_domains( - Arc::clone(&route_provider), + route_provider.clone(), vec![IC0_SEED_DOMAIN], Duration::from_secs(40), Duration::from_secs(2), diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs b/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs similarity index 78% rename from ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs index 491f010b..512fda8a 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use futures_util::FutureExt; use http::{Method, StatusCode}; use reqwest::{Client, Request}; use std::{ @@ -6,12 +7,11 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::{sync::mpsc, time}; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use tracing::{debug, error, info, warn}; +use stop_token::{StopSource, StopToken}; +use tracing::{debug, error, info, trace, warn}; use url::Url; -use crate::agent::http_transport::dynamic_routing::{ +use crate::agent::route_provider::dynamic_routing::{ dynamic_route_provider::DynamicRouteProviderError, messages::{FetchedNodes, NodeHealthState}, node::Node, @@ -113,7 +113,7 @@ struct HealthCheckActor { /// The sender channel (listener) to send the health status. sender_channel: SenderMpsc, /// The cancellation token of the actor. - token: CancellationToken, + token: StopToken, } impl HealthCheckActor { @@ -122,7 +122,7 @@ impl HealthCheckActor { period: Duration, node: Node, sender_channel: SenderMpsc, - token: CancellationToken, + token: StopToken, ) -> Self { Self { checker, @@ -135,10 +135,9 @@ impl HealthCheckActor { /// Runs the actor. async fn run(self) { - let mut interval = time::interval(self.period); loop { - tokio::select! { - _ = interval.tick() => { + futures_util::select! { + _ = crate::util::sleep(self.period).fuse() => { let health = self.checker.check(&self.node).await.unwrap_or_default(); let message = NodeHealthState { node: self.node.clone(), @@ -147,7 +146,7 @@ impl HealthCheckActor { // Inform the listener about node's health. It can only fail if the listener was closed/dropped. self.sender_channel.send(message).await.expect("Failed to send node's health state"); } - _ = self.token.cancelled() => { + _ = self.token.clone().fuse() => { info!("{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {:?}", self.node); break; } @@ -178,11 +177,9 @@ pub(super) struct HealthManagerActor { /// The sender channel to send the initialization status to DynamicRouteProvider (used only once in the init phase). init_sender: SenderMpsc, /// The cancellation token of the actor. - token: CancellationToken, - /// The cancellation token for all the health checks. - nodes_token: CancellationToken, - /// The task tracker of the health checks, waiting for the tasks to exit (graceful termination). - nodes_tracker: TaskTracker, + token: StopToken, + /// The cancellation source for all the health checks. + nodes_stop: StopSource, /// The flag indicating if this actor is initialized with healthy nodes. is_initialized: bool, } @@ -198,9 +195,9 @@ where routing_snapshot: AtomicSwap, fetch_receiver: ReceiverWatch, init_sender: SenderMpsc, - token: CancellationToken, + token: StopToken, ) -> Self { - let (check_sender, check_receiver) = mpsc::channel(CHANNEL_BUFFER); + let (check_sender, check_receiver) = async_channel::bounded(CHANNEL_BUFFER); Self { checker, @@ -211,8 +208,7 @@ where check_receiver, init_sender, token, - nodes_token: CancellationToken::new(), - nodes_tracker: TaskTracker::new(), + nodes_stop: StopSource::new(), is_initialized: false, } } @@ -220,27 +216,30 @@ where /// Runs the actor. pub async fn run(mut self) { loop { - tokio::select! { + futures_util::select_biased! { + _ = self.token.clone().fuse() => { + self.check_receiver.close(); + trace!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); + break; + } // Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel. - result = self.fetch_receiver.changed() => { - if let Err(err) = result { - error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); - self.token.cancel(); - continue; + result = self.fetch_receiver.recv().fuse() => { + match result { + Err(err) => { + error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); + continue; // will hit the stoptoken next + } + Ok(Some(FetchedNodes { nodes })) => { + self.handle_fetch_update(nodes).await; + } + Ok(None) => continue, } - // Get the latest value from the channel and mark it as seen. - let Some(FetchedNodes { nodes }) = self.fetch_receiver.borrow_and_update().clone() else { continue }; - self.handle_fetch_update(nodes).await; } // Receive health check messages from all running HealthCheckActor/s. - Some(msg) = self.check_receiver.recv() => { - self.handle_health_update(msg).await; - } - _ = self.token.cancelled() => { - self.stop_all_checks().await; - self.check_receiver.close(); - warn!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); - break; + msg = self.check_receiver.recv().fuse() => { + if let Ok(msg) = msg { + self.handle_health_update(msg).await; + } } } } @@ -272,14 +271,14 @@ where // If the snapshot has changed, store it and restart all node's health checks. if new_snapshot.sync_nodes(&nodes) { self.routing_snapshot.store(Arc::new(new_snapshot)); - self.stop_all_checks().await; - self.start_checks(nodes.to_vec()); + warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); + self.reset_checks(nodes.to_vec()); } } - fn start_checks(&mut self, nodes: Vec) { - // Create a single cancellation token for all started health checks. - self.nodes_token = CancellationToken::new(); + fn reset_checks(&mut self, nodes: Vec) { + // Create a cancellation source for all started health checks. + self.nodes_stop = StopSource::new(); for node in nodes { debug!("{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}"); let actor = HealthCheckActor::new( @@ -287,16 +286,9 @@ where self.period, node, self.check_sender.clone(), - self.nodes_token.clone(), + self.nodes_stop.token(), ); - self.nodes_tracker.spawn(async move { actor.run().await }); + crate::util::spawn(async move { actor.run().await }); } } - - async fn stop_all_checks(&self) { - warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); - self.nodes_token.cancel(); - self.nodes_tracker.close(); - self.nodes_tracker.wait().await; - } } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs b/ic-agent/src/agent/route_provider/dynamic_routing/messages.rs similarity index 85% rename from ic-agent/src/agent/http_transport/dynamic_routing/messages.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/messages.rs index 5feeae25..61876389 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/messages.rs @@ -1,4 +1,4 @@ -use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; +use crate::agent::route_provider::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; /// Represents a message with fetched nodes. #[derive(Debug, Clone)] diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs b/ic-agent/src/agent/route_provider/dynamic_routing/mod.rs similarity index 100% rename from ic-agent/src/agent/http_transport/dynamic_routing/mod.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/mod.rs diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs b/ic-agent/src/agent/route_provider/dynamic_routing/node.rs similarity index 96% rename from ic-agent/src/agent/http_transport/dynamic_routing/node.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/node.rs index 37716da3..3aa7ca7f 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/node.rs @@ -1,7 +1,7 @@ use url::Url; use crate::agent::{ - http_transport::dynamic_routing::dynamic_route_provider::DynamicRouteProviderError, + route_provider::dynamic_routing::dynamic_route_provider::DynamicRouteProviderError, ApiBoundaryNode, }; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs similarity index 82% rename from ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs index e887e668..67ff9677 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs @@ -1,23 +1,20 @@ use async_trait::async_trait; use candid::Principal; +use futures_util::FutureExt; use reqwest::Client; use std::{fmt::Debug, sync::Arc, time::Duration}; -use tokio::time::{self, sleep}; -use tokio_util::sync::CancellationToken; +use stop_token::StopToken; use tracing::{error, warn}; use url::Url; use crate::agent::{ - http_transport::{ - dynamic_routing::{ - dynamic_route_provider::DynamicRouteProviderError, - health_check::HEALTH_MANAGER_ACTOR, - messages::FetchedNodes, - node::Node, - snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::{AtomicSwap, SenderWatch}, - }, - reqwest_transport::ReqwestTransport, + route_provider::dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderError, + health_check::HEALTH_MANAGER_ACTOR, + messages::FetchedNodes, + node::Node, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::{AtomicSwap, SenderWatch}, }, Agent, }; @@ -55,14 +52,9 @@ impl NodesFetcher { #[async_trait] impl Fetch for NodesFetcher { async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError> { - let transport = ReqwestTransport::create_with_client(url, self.http_client.clone()) - .map_err(|err| { - DynamicRouteProviderError::NodesFetchError(format!( - "Failed to build transport: {err}" - )) - })?; let agent = Agent::builder() - .with_transport(transport) + .with_http_client(self.http_client.clone()) + .with_url(url) .build() .map_err(|err| { DynamicRouteProviderError::NodesFetchError(format!( @@ -102,7 +94,7 @@ pub(super) struct NodesFetchActor { /// The snapshot of the routing table. routing_snapshot: AtomicSwap, /// The token to cancel/stop the actor. - token: CancellationToken, + token: StopToken, } impl NodesFetchActor @@ -116,7 +108,7 @@ where retry_interval: Duration, fetch_sender: SenderWatch, snapshot: AtomicSwap, - token: CancellationToken, + token: StopToken, ) -> Self { Self { fetcher, @@ -130,10 +122,9 @@ where /// Runs the actor. pub async fn run(self) { - let mut interval = time::interval(self.period); loop { - tokio::select! { - _ = interval.tick() => { + futures_util::select! { + _ = crate::util::sleep(self.period).fuse() => { // Retry until success: // - try to get a healthy node from the routing snapshot // - if snapshot is empty, break the cycle and wait for the next fetch cycle @@ -146,8 +137,7 @@ where if let Some(node) = snapshot.next_node() { match self.fetcher.fetch((&node).into()).await { Ok(nodes) => { - let msg = Some( - FetchedNodes {nodes}); + let msg = Some(FetchedNodes {nodes}); match self.fetch_sender.send(msg) { Ok(()) => break, // message sent successfully, exist the loop Err(err) => { @@ -165,10 +155,10 @@ where break; }; warn!("Retrying to fetch the nodes in {:?}", self.fetch_retry_interval); - sleep(self.fetch_retry_interval).await; + crate::util::sleep(self.fetch_retry_interval).await; } } - _ = self.token.cancelled() => { + _ = self.token.clone().fuse() => { warn!("{NODES_FETCH_ACTOR}: was gracefully cancelled"); break; } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs similarity index 99% rename from ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs index 6b1ee0b0..2aa3fd90 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs @@ -5,7 +5,7 @@ use std::{ use rand::Rng; -use crate::agent::http_transport::dynamic_routing::{ +use crate::agent::route_provider::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, }; @@ -321,7 +321,7 @@ mod tests { time::Duration, }; - use crate::agent::http_transport::dynamic_routing::{ + use crate::agent::route_provider::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::{ diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/mod.rs similarity index 100% rename from ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/snapshot/mod.rs diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/round_robin_routing.rs similarity index 99% rename from ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/snapshot/round_robin_routing.rs index 2f3fd421..67ce51e0 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/round_robin_routing.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use crate::agent::http_transport::dynamic_routing::{ +use crate::agent::route_provider::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, }; @@ -114,7 +114,7 @@ mod tests { use std::time::Duration; use std::{collections::HashSet, sync::atomic::Ordering}; - use crate::agent::http_transport::dynamic_routing::{ + use crate::agent::route_provider::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::{ diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs similarity index 92% rename from ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs index 5357b271..9f9331a2 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; +use crate::agent::route_provider::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; /// A trait for interacting with the snapshot of nodes (routing table). pub trait RoutingSnapshot: Send + Sync + Clone + Debug { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs b/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs similarity index 97% rename from ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs index 60004d75..fea43caf 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs @@ -6,7 +6,7 @@ use arc_swap::ArcSwap; use async_trait::async_trait; use url::Url; -use crate::agent::http_transport::{ +use crate::agent::route_provider::{ dynamic_routing::{ dynamic_route_provider::DynamicRouteProviderError, health_check::{HealthCheck, HealthCheckStatus}, @@ -14,7 +14,7 @@ use crate::agent::http_transport::{ nodes_fetch::Fetch, type_aliases::AtomicSwap, }, - route_provider::RouteProvider, + RouteProvider, }; pub(super) fn route_n_times(n: usize, f: Arc) -> Vec { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs b/ic-agent/src/agent/route_provider/dynamic_routing/type_aliases.rs similarity index 63% rename from ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs rename to ic-agent/src/agent/route_provider/dynamic_routing/type_aliases.rs index 6be931fb..f4d76fa3 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/type_aliases.rs @@ -1,18 +1,17 @@ use arc_swap::ArcSwap; use std::sync::Arc; -use tokio::sync::{mpsc, watch}; /// A type alias for the sender end of a watch channel. -pub(super) type SenderWatch = watch::Sender>; +pub(super) type SenderWatch = async_watch::Sender>; /// A type alias for the receiver end of a watch channel. -pub(super) type ReceiverWatch = watch::Receiver>; +pub(super) type ReceiverWatch = async_watch::Receiver>; /// A type alias for the sender end of a multi-producer, single-consumer channel. -pub(super) type SenderMpsc = mpsc::Sender; +pub(super) type SenderMpsc = async_channel::Sender; /// A type alias for the receiver end of a multi-producer, single-consumer channel. -pub(super) type ReceiverMpsc = mpsc::Receiver; +pub(super) type ReceiverMpsc = async_channel::Receiver; /// A type alias for an atomic swap operation on a shared value. pub(super) type AtomicSwap = Arc>; diff --git a/ic-agent/src/util.rs b/ic-agent/src/util.rs index c33a3485..353d20f0 100644 --- a/ic-agent/src/util.rs +++ b/ic-agent/src/util.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::time::Duration; pub async fn sleep(d: Duration) { @@ -19,3 +20,13 @@ pub async fn sleep(d: Duration) { const _: () = { panic!("Using ic-agent from WASM requires enabling the `wasm-bindgen` feature") }; } + +#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))] +pub fn spawn(f: impl Future + 'static) { + wasm_bindgen_futures::spawn_local(f); +} + +#[cfg(not(all(target_family = "wasm", feature = "wasm-bindgen")))] +pub fn spawn(f: impl Future + Send + 'static) { + tokio::spawn(f); +}