diff --git a/Cargo.lock b/Cargo.lock index 1e1b5eed668659..7075c76655b42a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -270,8 +270,10 @@ name = "agave-thread-manager" version = "2.2.0" dependencies = [ "affinity", + "agave-thread-manager", "anyhow", "axum 0.7.9", + "cfg-if 1.0.0", "env_logger", "hyper 0.14.32", "log", diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml index c104a215a5530d..eae781df71e243 100644 --- a/thread-manager/Cargo.toml +++ b/thread-manager/Cargo.toml @@ -13,6 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } +cfg-if = "1.0.0" log = { workspace = true } num_cpus = { workspace = true } rayon = { workspace = true } @@ -26,8 +27,12 @@ tower = "0.5.2" affinity = "0.1.2" [dev-dependencies] +agave-thread-manager = { path = ".", features = ["dev-context-only-utils"] } axum = "0.7.9" env_logger = { workspace = true } hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] } serde_json = { workspace = true } toml = { workspace = true } + +[features] +dev-context-only-utils = [] diff --git a/thread-manager/README.md b/thread-manager/README.md index f6ac21d434ea69..2a8f6158c6704a 100644 --- a/thread-manager/README.md +++ b/thread-manager/README.md @@ -58,7 +58,7 @@ one may want to spawn many rayon pools. # Examples -All examples need `wrk` HTTP behnchmarking tool for load generation. Please install it before running. +All examples need `wrk` HTTP benchmarking tool for load generation. Please install it before running. * core_contention_basics will demonstrate why core contention is bad, and how thread configs can help * core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index a28026b8962b0b..ec4d1cd9e5c92d 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -55,7 +55,7 @@ fn main() -> anyhow::Result<()> { let join_handle = scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000))); - join_handle.join().expect("WRK crashed!") + join_handle.join().expect("Load generator crashed!") }); //print out the results of the bench run println!("Results are: {:?}", results); diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index 921a58769f3d3b..f233d232b0ba08 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -15,21 +15,21 @@ pub use { rayon_runtime::{RayonConfig, RayonRuntime}, tokio_runtime::{TokioConfig, TokioRuntime}, }; -pub type ConstString = Box; pub const MAX_THREAD_NAME_CHARS: usize = 12; #[derive(Default, Debug)] pub struct ThreadManagerInner { - pub tokio_runtimes: HashMap, - pub tokio_runtime_mapping: HashMap, + pub tokio_runtimes: HashMap, + pub tokio_runtime_mapping: HashMap, - pub native_thread_runtimes: HashMap, - pub native_runtime_mapping: HashMap, + pub native_thread_runtimes: HashMap, + pub native_runtime_mapping: HashMap, - pub rayon_runtimes: HashMap, - pub rayon_runtime_mapping: HashMap, + pub rayon_runtimes: HashMap, + pub rayon_runtime_mapping: HashMap, } + impl ThreadManagerInner { /// Populates mappings with copies of config names, overrides as appropriate fn populate_mappings(&mut self, config: &ThreadManagerConfig) { @@ -37,29 +37,26 @@ impl ThreadManagerInner { for name in config.native_configs.keys() { self.native_runtime_mapping - .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + .insert(name.clone(), name.clone()); } for (k, v) in config.native_runtime_mapping.iter() { - self.native_runtime_mapping - .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + self.native_runtime_mapping.insert(k.clone(), v.clone()); } for name in config.tokio_configs.keys() { self.tokio_runtime_mapping - .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + .insert(name.clone(), name.clone()); } for (k, v) in config.tokio_runtime_mapping.iter() { - self.tokio_runtime_mapping - .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + self.tokio_runtime_mapping.insert(k.clone(), v.clone()); } for name in config.rayon_configs.keys() { self.rayon_runtime_mapping - .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + .insert(name.clone(), name.clone()); } for (k, v) in config.rayon_runtime_mapping.iter() { - self.rayon_runtime_mapping - .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + self.rayon_runtime_mapping.insert(k.clone(), v.clone()); } } } @@ -68,6 +65,7 @@ impl ThreadManagerInner { pub struct ThreadManager { inner: Arc, } + impl Deref for ThreadManager { type Target = ThreadManagerInner; @@ -110,13 +108,16 @@ impl ThreadManager { fn lookup<'a, T>( &'a self, name: &str, - mapping: &HashMap, - runtimes: &'a HashMap, + mapping: &HashMap, + runtimes: &'a HashMap, ) -> Option<&'a T> { match mapping.get(name) { Some(n) => runtimes.get(n), None => match mapping.get("default") { - Some(n) => runtimes.get(n), + Some(n) => { + log::warn!("Falling back to default runtime for {name}"); + runtimes.get(n) + } None => None, }, } @@ -133,7 +134,7 @@ impl ThreadManager { if let Some(runtime) = self.try_get_native(name) { runtime } else { - panic!("Native thread pool {name} not configured!"); + panic!("Native thread pool for {name} can not be found!"); } } @@ -145,7 +146,7 @@ impl ThreadManager { if let Some(runtime) = self.try_get_rayon(name) { runtime } else { - panic!("Rayon thread pool {name} not configured!"); + panic!("Rayon thread pool for {name} can not be found!"); } } @@ -157,44 +158,35 @@ impl ThreadManager { if let Some(runtime) = self.try_get_tokio(name) { runtime } else { - panic!("Tokio runtime {name} not configured!"); + panic!("Tokio thread pool for {name} can not be found!"); } } + pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result> { let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector(); - crate::policy::set_thread_affinity(&chosen_cores_mask); Ok(chosen_cores_mask) } pub fn new(config: ThreadManagerConfig) -> anyhow::Result { - let mut core_allocations = HashMap::>::new(); + let mut core_allocations = HashMap::>::new(); Self::set_process_affinity(&config)?; let mut manager = ThreadManagerInner::default(); manager.populate_mappings(&config); for (name, cfg) in config.native_configs.iter() { let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone()); - manager - .native_thread_runtimes - .insert(name.clone().into_boxed_str(), nrt); + manager.native_thread_runtimes.insert(name.clone(), nrt); } for (name, cfg) in config.rayon_configs.iter() { let rrt = RayonRuntime::new(name.clone(), cfg.clone())?; - manager - .rayon_runtimes - .insert(name.clone().into_boxed_str(), rrt); + manager.rayon_runtimes.insert(name.clone(), rrt); } for (name, cfg) in config.tokio_configs.iter() { let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?; - core_allocations.insert( - name.clone().into_boxed_str(), - cfg.core_allocation.as_core_mask_vector(), - ); - manager - .tokio_runtimes - .insert(name.clone().into_boxed_str(), tokiort); + core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector()); + manager.tokio_runtimes.insert(name.clone(), tokiort); } Ok(Self { inner: Arc::new(manager), diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs index 69acb3d6e9d709..9ee6d58490fec6 100644 --- a/thread-manager/src/native_thread_runtime.rs +++ b/thread-manager/src/native_thread_runtime.rs @@ -95,7 +95,7 @@ impl JoinHandle { impl Drop for JoinHandle { fn drop(&mut self) { if self.std_handle.is_some() { - warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!"); + warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your threads!"); self.join_inner().expect("Child thread panicked"); } } @@ -155,4 +155,9 @@ impl NativeThreadRuntime { running_count: self.running_count.clone(), }) } + + #[cfg(feature = "dev-context-only-utils")] + pub fn new_for_tests(name: &str) -> Self { + Self::new(name.to_owned(), NativeConfig::default()) + } } diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs index 0065987bfea133..a1c0d452ab33f0 100644 --- a/thread-manager/src/policy.rs +++ b/thread-manager/src/policy.rs @@ -1,14 +1,17 @@ -#[cfg(target_os = "linux")] -use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy}; +cfg_if::cfg_if! { + if #[cfg(target_os = "linux")]{ + use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy}; + } + else{ + #[derive(Clone, Copy)] + pub(crate) struct ThreadSchedulePolicy {} + } +} use { serde::{Deserialize, Serialize}, std::sync::OnceLock, }; -#[cfg(not(target_os = "linux"))] -#[derive(Clone, Copy)] -pub(crate) struct ThreadSchedulePolicy {} - static CORE_COUNT: OnceLock = OnceLock::new(); pub const DEFAULT_PRIORITY: u8 = 0; @@ -34,53 +37,50 @@ impl CoreAllocation { } } } +cfg_if::cfg_if! { + if #[cfg(target_os = "linux")]{ -#[cfg(target_os = "linux")] -pub fn set_thread_affinity(cores: &[usize]) { - assert!( - !cores.is_empty(), - "Can not call setaffinity with empty cores mask" - ); - if let Err(e) = affinity::set_thread_affinity(cores) { - let thread = std::thread::current(); - panic!( - "Can not set core affinity {:?} for thread {:?} named {:?}, error {}", - cores, - thread.id(), - thread.name(), - e - ); - } -} - -#[cfg(not(target_os = "linux"))] -pub fn set_thread_affinity(_cores: &[usize]) {} - -#[cfg(target_os = "linux")] -pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy { - match policy.to_uppercase().as_ref() { - "BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch), - "OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other), - "IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle), - _ => panic!("Could not parse the policy"), + pub fn set_thread_affinity(cores: &[usize]) { + assert!( + !cores.is_empty(), + "Can not call setaffinity with empty cores mask" + ); + if let Err(e) = affinity::set_thread_affinity(cores) { + let thread = std::thread::current(); + panic!( + "Can not set core affinity {:?} for thread {:?} named {:?}, error {}", + cores, + thread.id(), + thread.name(), + e + ); + } + } + fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) { + if let Err(e) = std::thread::current().set_priority_and_policy( + policy, + thread_priority::ThreadPriority::Crossplatform((priority).try_into().expect("Priority value outside of OS-supported range")), + ) { + panic!("Can not set thread priority, OS error {:?}", e); + } + } + pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy { + match policy.to_uppercase().as_ref() { + "BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch), + "OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other), + "IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle), + _ => panic!("Could not parse the policy"), + } + } } -} + else{ -#[cfg(not(target_os = "linux"))] -pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy { - ThreadSchedulePolicy {} -} - -#[cfg(not(target_os = "linux"))] -fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {} + pub fn set_thread_affinity(_cores: &[usize]) {} -#[cfg(target_os = "linux")] -fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) { - if let Err(e) = std::thread::current().set_priority_and_policy( - policy, - thread_priority::ThreadPriority::Crossplatform((priority).try_into().unwrap()), - ) { - panic!("Can not set thread priority, OS error {:?}", e); + pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy { + ThreadSchedulePolicy {} + } + fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {} } } diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs index 706a2ecec39382..0699c503e7cdd9 100644 --- a/thread-manager/src/rayon_runtime.rs +++ b/thread-manager/src/rayon_runtime.rs @@ -80,7 +80,9 @@ impl RayonRuntime { }) } + #[cfg(feature = "dev-context-only-utils")] pub fn new_for_tests(name: &str) -> Self { - Self::new(name.to_owned(), RayonConfig::default()).unwrap() + Self::new(name.to_owned(), RayonConfig::default()) + .expect("Failed to create rayon runtime for tests") } } diff --git a/thread-manager/src/runtime_manager.rs b/thread-manager/src/runtime_manager.rs deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index 6495dd5f11ca1a..0c72ea9b0ebe77 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -136,12 +136,14 @@ impl TokioRuntime { } /// Makes test runtime with 2 threads, only for unittests + #[cfg(feature = "dev-context-only-utils")] pub fn new_for_tests() -> Self { let cfg = TokioConfig { worker_threads: 2, ..Default::default() }; - TokioRuntime::new("solNetTest".to_owned(), cfg.clone()).unwrap() + TokioRuntime::new("solNetTest".to_owned(), cfg.clone()) + .expect("Failed to create Tokio runtime for tests") } }