diff --git a/Cargo.lock b/Cargo.lock index 7c33681a993e58..b70a22fb5addc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,6 +251,7 @@ dependencies = [ "affinity", "anyhow", "axum 0.7.9", + "env_logger", "log", "num_cpus", "rayon", @@ -461,9 +462,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.1" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anyhow" diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml index 265eb3aafd97fb..0c760371ddfe0e 100644 --- a/thread-manager/Cargo.toml +++ b/thread-manager/Cargo.toml @@ -26,5 +26,6 @@ affinity = "0.1.2" [dev-dependencies] axum = "0.7.9" +env_logger = { workspace = true } serde_json = { workspace = true } toml = { workspace = true } diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index 219712df060a68..ea481a707893b8 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -1,9 +1,13 @@ -use std::{ - future::IntoFuture, - io::{Read, Write}, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, +use { + agave_thread_manager::*, + log::{debug, info}, + std::{ + future::IntoFuture, + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, + }, }; async fn axum_main(port: u16) { @@ -31,47 +35,50 @@ async fn axum_main(port: u16) { match timeout { Ok(v) => v.unwrap(), Err(_) => { - println!("Terminating server on port {port}"); + info!("Terminating server on port {port}"); } } } -use agave_thread_manager::*; fn main() -> anyhow::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let experiments = [ - "examples/core_contention_dedicated_set.json", - "examples/core_contention_contending_set.json", + "examples/core_contention_dedicated_set.toml", + "examples/core_contention_contending_set.toml", ]; for exp in experiments { - println!("==================="); - println!("Running {exp}"); - let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - conffile.push(exp); + info!("==================="); + info!("Running {exp}"); + let mut conf_file = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + conf_file.push(exp); let mut buf = String::new(); - std::fs::File::open(conffile)?.read_to_string(&mut buf)?; + std::fs::File::open(conf_file)?.read_to_string(&mut buf)?; let cfg: RuntimeManagerConfig = toml::from_str(&buf)?; - //println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?); - let rtm = ThreadManager::new(cfg).unwrap(); - let tok1 = rtm + let manager = ThreadManager::new(cfg).unwrap(); + let tokio1 = manager .get_tokio("axum1") .expect("Expecting runtime named axum1"); - let tok2 = rtm + tokio1.start_metrics_sampling(Duration::from_secs(1)); + let tokio2 = manager .get_tokio("axum2") .expect("Expecting runtime named axum2"); + tokio2.start_metrics_sampling(Duration::from_secs(1)); let wrk_cores: Vec<_> = (32..64).collect(); - let results = std::thread::scope(|s| { - s.spawn(|| { - tok1.tokio.block_on(axum_main(8888)); + let results = std::thread::scope(|scope| { + scope.spawn(|| { + tokio1.tokio.block_on(axum_main(8888)); }); - s.spawn(|| { - tok2.tokio.block_on(axum_main(8889)); + scope.spawn(|| { + tokio2.tokio.block_on(axum_main(8889)); }); - let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()); - jh.join().expect("WRK crashed!") + let join_handle = + scope.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()); + join_handle.join().expect("WRK crashed!") }); + //print out the results of the bench run println!("Results are: {:?}", results); } Ok(()) @@ -112,7 +119,7 @@ fn run_wrk( let mut all_latencies = vec![]; let mut all_rps = vec![]; for (out, port) in outs.zip(ports.iter()) { - println!("========================="); + debug!("========================="); std::io::stdout().write_all(&out.stderr)?; let res = str::from_utf8(&out.stdout)?; let mut res = res.lines().last().unwrap().split(' '); @@ -122,7 +129,7 @@ fn run_wrk( let requests: usize = res.next().unwrap().parse()?; let rps = requests as f32 / 10.0; - println!("WRK results for port {port}: {latency:?} {rps}"); + debug!("WRK results for port {port}: {latency:?} {rps}"); all_latencies.push(Duration::from_micros(latency_us)); all_rps.push(rps); } diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs index 51ba4c08e714bd..e466b3bae05086 100644 --- a/thread-manager/examples/core_contention_sweep.rs +++ b/thread-manager/examples/core_contention_sweep.rs @@ -1,15 +1,18 @@ -use std::{ - collections::HashMap, - future::IntoFuture, - io::Write, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, +use { + agave_thread_manager::*, + log::{debug, info}, + std::{ + collections::HashMap, + future::IntoFuture, + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, + }, }; async fn axum_main(port: u16) { use axum::{routing::get, Router}; - // basic handler that responds with a static string async fn root() -> &'static str { tokio::time::sleep(Duration::from_millis(1)).await; @@ -24,6 +27,7 @@ async fn axum_main(port: u16) { tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) .await .unwrap(); + info!("Server on port {port} ready"); let timeout = tokio::time::timeout( Duration::from_secs(11), axum::serve(listener, app).into_future(), @@ -32,11 +36,10 @@ async fn axum_main(port: u16) { match timeout { Ok(v) => v.unwrap(), Err(_) => { - println!("Terminating server on port {port}"); + info!("Terminating server on port {port}"); } } } -use agave_thread_manager::*; fn make_config_shared(cc: usize) -> RuntimeManagerConfig { let tokio_cfg_1 = TokioConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc }, @@ -46,41 +49,33 @@ fn make_config_shared(cc: usize) -> RuntimeManagerConfig { let tokio_cfg_2 = tokio_cfg_1.clone(); RuntimeManagerConfig { tokio_configs: HashMap::from([ - ("tokio1".into(), tokio_cfg_1), - ("tokio2".into(), tokio_cfg_2), - ]), - tokio_runtime_mapping: HashMap::from([ - ("axum1".into(), "tokio1".into()), - ("axum2".into(), "tokio2".into()), + ("axum1".into(), tokio_cfg_1), + ("axum2".into(), tokio_cfg_2), ]), ..Default::default() } } -fn make_config_dedicated(cc: usize) -> RuntimeManagerConfig { +fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig { let tokio_cfg_1 = TokioConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, - max: cc / 2, + max: core_count / 2, }, - worker_threads: cc / 2, + worker_threads: core_count / 2, ..Default::default() }; let tokio_cfg_2 = TokioConfig { core_allocation: CoreAllocation::DedicatedCoreSet { - min: cc / 2, - max: cc, + min: core_count / 2, + max: core_count, }, - worker_threads: cc / 2, + worker_threads: core_count / 2, ..Default::default() }; RuntimeManagerConfig { tokio_configs: HashMap::from([ - ("tokio1".into(), tokio_cfg_1), - ("tokio2".into(), tokio_cfg_2), - ]), - tokio_runtime_mapping: HashMap::from([ - ("axum1".into(), "tokio1".into()), - ("axum2".into(), "tokio2".into()), + ("axum1".into(), tokio_cfg_1), + ("axum2".into(), tokio_cfg_2), ]), ..Default::default() } @@ -93,7 +88,7 @@ enum Regime { Single, } impl Regime { - const VALUES: [Self; 3] = [Self::Shared, Self::Dedicated, Self::Single]; + const VALUES: [Self; 3] = [Self::Dedicated, Self::Shared, Self::Single]; } #[derive(Debug, Default, serde::Serialize)] @@ -103,72 +98,84 @@ struct Results { } fn main() -> anyhow::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let mut all_results: HashMap = HashMap::new(); for regime in Regime::VALUES { - let mut res = Results::default(); - for core_cnt in [2, 4, 8, 16] { - let rtm; - println!("==================="); - println!("Running {core_cnt} cores under {regime:?}"); - let (tok1, tok2) = match regime { + let mut results = Results::default(); + for core_count in [2, 4, 8, 16] { + let manager; + info!("==================="); + info!("Running {core_count} cores under {regime:?}"); + let (tokio1, tokio2) = match regime { Regime::Shared => { - rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap(); + manager = ThreadManager::new(make_config_shared(core_count)).unwrap(); ( - rtm.get_tokio("axum1") + manager + .get_tokio("axum1") .expect("Expecting runtime named axum1"), - rtm.get_tokio("axum2") + manager + .get_tokio("axum2") .expect("Expecting runtime named axum2"), ) } Regime::Dedicated => { - rtm = ThreadManager::new(make_config_dedicated(core_cnt)).unwrap(); + manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap(); ( - rtm.get_tokio("axum1") + manager + .get_tokio("axum1") .expect("Expecting runtime named axum1"), - rtm.get_tokio("axum2") + manager + .get_tokio("axum2") .expect("Expecting runtime named axum2"), ) } Regime::Single => { - rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap(); + manager = ThreadManager::new(make_config_shared(core_count)).unwrap(); ( - rtm.get_tokio("axum1") + manager + .get_tokio("axum1") .expect("Expecting runtime named axum1"), - rtm.get_tokio("axum2") + manager + .get_tokio("axum2") .expect("Expecting runtime named axum2"), ) } }; let wrk_cores: Vec<_> = (32..64).collect(); - let results = std::thread::scope(|s| { + let measurement = std::thread::scope(|s| { s.spawn(|| { - tok1.tokio.spawn(axum_main(8888)); + tokio1.start_metrics_sampling(Duration::from_secs(1)); + tokio1.tokio.block_on(axum_main(8888)); }); let jh = match regime { Regime::Single => s.spawn(|| { - run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 1000).unwrap() + run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 3000).unwrap() }), _ => { s.spawn(|| { - tok2.tokio.spawn(axum_main(8889)); + tokio2.start_metrics_sampling(Duration::from_secs(1)); + tokio2.tokio.block_on(axum_main(8889)); }); s.spawn(|| { - run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap() + run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 3000).unwrap() }) } }; jh.join().expect("WRK crashed!") }); - println!("Results are: {:?}", results); - res.latencies_s.push( - results.0.iter().map(|a| a.as_secs_f32()).sum::() / results.0.len() as f32, + info!("Results are: {:?}", measurement); + results.latencies_s.push( + measurement.0.iter().map(|a| a.as_secs_f32()).sum::() + / measurement.0.len() as f32, ); - res.rps.push(results.1.iter().sum()); + results.rps.push(measurement.1.iter().sum()); } - all_results.insert(format!("{regime:?}"), res); + all_results.insert(format!("{regime:?}"), results); std::thread::sleep(Duration::from_secs(3)); } + + //print the resulting measurements so they can be e.g. plotted with matplotlib println!("{}", serde_json::to_string_pretty(&all_results)?); Ok(()) @@ -180,6 +187,9 @@ fn run_wrk( threads: usize, connections: usize, ) -> anyhow::Result<(Vec, Vec)> { + //Sleep a bit to let axum start + std::thread::sleep(Duration::from_millis(500)); + let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); script.push("examples/report.lua"); let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); @@ -209,7 +219,7 @@ fn run_wrk( let mut all_latencies = vec![]; let mut all_rps = vec![]; for (out, port) in outs.zip(ports.iter()) { - println!("========================="); + debug!("========================="); std::io::stdout().write_all(&out.stderr)?; let res = str::from_utf8(&out.stdout)?; let mut res = res.lines().last().unwrap().split(' '); @@ -219,7 +229,7 @@ fn run_wrk( let requests: usize = res.next().unwrap().parse()?; let rps = requests as f32 / 10.0; - println!("WRK results for port {port}: {latency:?} {rps}"); + debug!("WRK results for port {port}: {latency:?} {rps}"); all_latencies.push(Duration::from_micros(latency_us)); all_rps.push(rps); } diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs index 828745d80372cd..cd975884459c1f 100644 --- a/thread-manager/src/policy.rs +++ b/thread-manager/src/policy.rs @@ -1,8 +1,11 @@ use { serde::{Deserialize, Serialize}, + std::sync::OnceLock, thread_priority::ThreadExt, }; +static CORE_COUNT: OnceLock = OnceLock::new(); + #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub enum CoreAllocation { ///Use OS default allocation (i.e. do not alter core affinity) @@ -17,17 +20,31 @@ pub enum CoreAllocation { impl CoreAllocation { /// Converts into a vector of core IDs. OsDefault is converted to empty vector. pub fn as_core_mask_vector(&self) -> Vec { + let core_count = CORE_COUNT.get_or_init(num_cpus::get); match *self { CoreAllocation::PinnedCores { min, max } => (min..max).collect(), CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(), - CoreAllocation::OsDefault => vec![], + CoreAllocation::OsDefault => Vec::from_iter(0..*core_count), } } } #[cfg(target_os = "linux")] pub fn set_thread_affinity(cores: &[usize]) { - affinity::set_thread_affinity(cores).expect("Can not set thread affinity for runtime worker"); + assert!( + !cores.is_empty(), + "Can not call setaffinity with empty cores mask" + ); + if let Err(e) = affinity::set_thread_affinity(cores) { + let thread = std::thread::current(); + let msg = format!( + "Can not set core affinity {:?} for thread {:?} named {:?}, error {e}", + cores, + thread.id(), + thread.name() + ); + panic!("{}", msg); + } } #[cfg(not(target_os = "linux"))] diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index 3e0682b8c46bd3..b8563f9ae11348 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -2,9 +2,12 @@ use { crate::policy::{apply_policy, CoreAllocation}, serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, - std::sync::{ - atomic::{AtomicI64, AtomicUsize, Ordering}, - Arc, Mutex, + std::{ + sync::{ + atomic::{AtomicI64, AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, }, thread_priority::ThreadExt, }; @@ -35,35 +38,6 @@ impl Default for TokioConfig { } } -#[derive(Debug)] -pub struct ThreadCounters { - pub namespace: &'static str, - pub parked_threads_cnt: AtomicI64, - pub active_threads_cnt: AtomicI64, -} - -impl ThreadCounters { - pub fn on_park(&self) { - let parked = self.parked_threads_cnt.fetch_add(1, Ordering::Relaxed); - let active = self.active_threads_cnt.fetch_sub(1, Ordering::Relaxed); - datapoint_info!( - self.namespace, - ("threads_parked", parked, i64), - ("threads_active", active, i64), - ); - } - - pub fn on_unpark(&self) { - let parked = self.parked_threads_cnt.fetch_sub(1, Ordering::Relaxed); - let active = self.active_threads_cnt.fetch_add(1, Ordering::Relaxed); - datapoint_info!( - self.namespace, - ("threads_parked", parked, i64), - ("threads_active", active, i64), - ); - } -} - #[derive(Debug)] pub struct TokioRuntime { pub tokio: tokio::runtime::Runtime, @@ -72,6 +46,12 @@ pub struct TokioRuntime { } impl TokioRuntime { + /// Starts the metrics sampling task on the runtime to monitor how many workers are busy doing useful things. + pub fn start_metrics_sampling(&self, period: Duration) { + let counters = self.counters.clone(); + self.tokio.spawn(metrics_sampler(counters, period)); + } + pub fn new(name: String, cfg: TokioConfig) -> anyhow::Result { let num_workers = if cfg.worker_threads == 0 { num_cpus::get() @@ -94,7 +74,9 @@ impl TokioRuntime { let counters = Arc::new(ThreadCounters { namespace: format!("thread-manager-tokio-{}", &base_name).leak(), // no workaround, metrics crate will only consume 'static str parked_threads_cnt: AtomicI64::new(0), - active_threads_cnt: AtomicI64::new(0), + active_threads_cnt: AtomicI64::new( + (num_workers.wrapping_add(cfg.max_blocking_threads)) as i64, + ), }); let counters_clone1 = counters.clone(); let counters_clone2 = counters.clone(); @@ -134,3 +116,37 @@ impl TokioRuntime { }) } } + +///Internal counters to keep track of worker pool utilization +#[derive(Debug)] +pub struct ThreadCounters { + pub namespace: &'static str, + pub parked_threads_cnt: AtomicI64, + pub active_threads_cnt: AtomicI64, +} + +impl ThreadCounters { + pub fn on_park(&self) { + self.parked_threads_cnt.fetch_add(1, Ordering::Relaxed); + self.active_threads_cnt.fetch_sub(1, Ordering::Relaxed); + } + + pub fn on_unpark(&self) { + self.parked_threads_cnt.fetch_sub(1, Ordering::Relaxed); + self.active_threads_cnt.fetch_add(1, Ordering::Relaxed); + } +} + +async fn metrics_sampler(counters: Arc, period: Duration) { + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + let parked = counters.parked_threads_cnt.load(Ordering::Relaxed); + let active = counters.active_threads_cnt.load(Ordering::Relaxed); + datapoint_info!( + counters.namespace, + ("threads_parked", parked, i64), + ("threads_active", active, i64), + ); + } +}