diff --git a/Cargo.lock b/Cargo.lock index 2bba75f8d9c338..4b95422bb332f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,7 +98,7 @@ dependencies = [ "clap 2.33.3", "flate2", "hex", - "hyper 0.14.31", + "hyper 0.14.32", "log", "serde", "serde_derive", @@ -837,7 +837,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.5", - "hyper 0.14.31", + "hyper 0.14.32", "itoa", "matchit", "memchr", @@ -3086,7 +3086,7 @@ dependencies = [ "futures 0.3.31", "headers", "http 0.2.12", - "hyper 0.14.31", + "hyper 0.14.32", "hyper-tls", "native-tls", "tokio", @@ -3102,7 +3102,7 @@ checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.31", + "hyper 0.14.32", "rustls 0.21.12", "tokio", "tokio-rustls", @@ -3114,7 +3114,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.31", + "hyper 0.14.32", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -3127,7 +3127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper 0.14.31", + "hyper 0.14.32", "native-tls", "tokio", "tokio-native-tls", @@ -3571,7 +3571,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" dependencies = [ "futures 0.3.31", - "hyper 0.14.31", + "hyper 0.14.32", "jsonrpc-core", "jsonrpc-server-utils", "log", @@ -5191,7 +5191,7 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.5", - "hyper 0.14.31", + "hyper 0.14.32", "hyper-rustls", "hyper-tls", "ipnet", @@ -9464,7 +9464,7 @@ dependencies = [ "futures 0.3.31", "goauth", "http 0.2.12", - "hyper 0.14.31", + "hyper 0.14.32", "hyper-proxy", "log", "openssl", @@ -11581,7 +11581,7 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.5", - "hyper 0.14.31", + "hyper 0.14.32", "hyper-timeout", "percent-encoding 2.3.1", "pin-project", diff --git a/thread-manager/README.md b/thread-manager/README.md index 61f81f314f2ea6..4e206870dafdc0 100644 --- a/thread-manager/README.md +++ b/thread-manager/README.md @@ -8,6 +8,24 @@ and context switches that would occur if Rayon was entirely unaware it was runni tokio, and each was to spawn as many threads as there are cores. # Supported threading models +## Affinity +All threading models allow setting core affinity, but only on linux + +For core affinity you can set e.g. +```toml +core_allocation.DedicatedCoreSet = { min = 16, max = 64 } +``` +to pin the pool to cores 16-64. + +## Scheduling policy and priority +If you want you can set thread scheduling policy and priority. Keep in mind that this will likely require +```bash + sudo setcap cap_sys_nice+ep + ``` +or root priviledges to run the resulting process. +To see which policies are supported check (the sources)[./src/policy.rs] +If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible. + ## Tokio Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on. Number of worker and blocking threads is configurable, as are thread priorities for the pool. diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index d23b5e16d49644..241866ee0c32a3 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -57,13 +57,9 @@ fn main() -> anyhow::Result<()> { let cfg: ThreadManagerConfig = toml::from_str(&buf)?; let manager = ThreadManager::new(cfg).unwrap(); - let tokio1 = manager - .get_tokio("axum1") - .expect("Expecting runtime named axum1"); + let tokio1 = manager.get_tokio("axum1"); tokio1.start_metrics_sampling(Duration::from_secs(1)); - let tokio2 = manager - .get_tokio("axum2") - .expect("Expecting runtime named axum2"); + let tokio2 = manager.get_tokio("axum2"); tokio2.start_metrics_sampling(Duration::from_secs(1)); let wrk_cores: Vec<_> = (32..64).collect(); diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs index 5edd213a42839e..30b8011217e940 100644 --- a/thread-manager/examples/core_contention_sweep.rs +++ b/thread-manager/examples/core_contention_sweep.rs @@ -109,36 +109,15 @@ fn main() -> anyhow::Result<()> { let (tokio1, tokio2) = match regime { Regime::Shared => { manager = ThreadManager::new(make_config_shared(core_count)).unwrap(); - ( - manager - .get_tokio("axum1") - .expect("Expecting runtime named axum1"), - manager - .get_tokio("axum2") - .expect("Expecting runtime named axum2"), - ) + (manager.get_tokio("axum1"), manager.get_tokio("axum2")) } Regime::Dedicated => { manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap(); - ( - manager - .get_tokio("axum1") - .expect("Expecting runtime named axum1"), - manager - .get_tokio("axum2") - .expect("Expecting runtime named axum2"), - ) + (manager.get_tokio("axum1"), manager.get_tokio("axum2")) } Regime::Single => { manager = ThreadManager::new(make_config_shared(core_count)).unwrap(); - ( - manager - .get_tokio("axum1") - .expect("Expecting runtime named axum1"), - manager - .get_tokio("axum2") - .expect("Expecting runtime named axum2"), - ) + (manager.get_tokio("axum1"), manager.get_tokio("axum2")) } }; diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index e852f00995aed5..1f34330691cd7a 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -120,22 +120,44 @@ impl ThreadManager { } } - pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { + pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { self.lookup( name, &self.native_runtime_mapping, &self.native_thread_runtimes, ) } + pub fn get_native(&self, name: &str) -> &NativeThreadRuntime { + if let Some(runtime) = self.try_get_native(name) { + runtime + } else { + panic!("Native thread pool {name} not configured!"); + } + } - pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> { + pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> { self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes) } - pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> { + pub fn get_rayon(&self, name: &str) -> &RayonRuntime { + if let Some(runtime) = self.try_get_rayon(name) { + runtime + } else { + panic!("Rayon thread pool {name} not configured!"); + } + } + + pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> { self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes) } + pub fn get_tokio(&self, name: &str) -> &TokioRuntime { + if let Some(runtime) = self.try_get_tokio(name) { + runtime + } else { + panic!("Tokio runtime {name} not configured!"); + } + } pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result> { let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector(); @@ -214,6 +236,81 @@ mod tests { #[cfg(not(target_os = "linux"))] fn validate_affinity(_expect_cores: &[usize], _error_msg: &str) {} + /* #[test] + fn thread_priority() { + let priority_high = 10; + let priority_default = crate::policy::DEFAULT_PRIORITY; + let priority_low = 1; + let conf = ThreadManagerConfig { + native_configs: HashMap::from([ + ( + "high".to_owned(), + NativeConfig { + priority: priority_high, + ..Default::default() + }, + ), + ( + "default".to_owned(), + NativeConfig { + ..Default::default() + }, + ), + ( + "low".to_owned(), + NativeConfig { + priority: priority_low, + ..Default::default() + }, + ), + ]), + ..Default::default() + }; + + let manager = ThreadManager::new(conf).unwrap(); + let high = manager.get_native("high"); + let low = manager.get_native("low"); + let default = manager.get_native("default"); + + high.spawn(move || { + let prio = + thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap(); + assert_eq!( + prio, + thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap()) + ); + }) + .unwrap() + .join() + .unwrap(); + low.spawn(move || { + let prio = + thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap(); + assert_eq!( + prio, + thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap()) + ); + }) + .unwrap() + .join() + .unwrap(); + default + .spawn(move || { + let prio = + thread_priority::get_thread_priority(thread_priority::thread_native_id()) + .unwrap(); + assert_eq!( + prio, + thread_priority::ThreadPriority::Crossplatform( + (priority_default).try_into().unwrap() + ) + ); + }) + .unwrap() + .join() + .unwrap(); + }*/ + #[test] fn process_affinity() { let conf = ThreadManagerConfig { @@ -222,7 +319,6 @@ mod tests { NativeConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 }, max_threads: 5, - priority: 0, ..Default::default() }, )]), @@ -232,7 +328,7 @@ mod tests { }; let manager = ThreadManager::new(conf).unwrap(); - let runtime = manager.get_native("test").unwrap(); + let runtime = manager.get_native("test"); let thread1 = runtime .spawn(|| { @@ -263,7 +359,6 @@ mod tests { RayonConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 }, worker_threads: 3, - priority: 0, ..Default::default() }, )]), @@ -273,7 +368,7 @@ mod tests { }; let manager = ThreadManager::new(conf).unwrap(); - let rayon_runtime = manager.get_rayon("test").unwrap(); + let rayon_runtime = manager.get_rayon("test"); let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| { println!("Rayon thread {} reporting", ctx.index()); diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs index cb01eeff8ae3bd..d6e46a4da0daf3 100644 --- a/thread-manager/src/native_thread_runtime.rs +++ b/thread-manager/src/native_thread_runtime.rs @@ -1,5 +1,5 @@ use { - crate::policy::{apply_policy, CoreAllocation}, + crate::policy::{apply_policy, parse_policy, CoreAllocation}, anyhow::bail, log::error, serde::{Deserialize, Serialize}, @@ -18,7 +18,9 @@ use { pub struct NativeConfig { pub core_allocation: CoreAllocation, pub max_threads: usize, + /// Priority in range 1..99 pub priority: u8, + pub policy: String, pub stack_size_bytes: usize, } @@ -27,7 +29,8 @@ impl Default for NativeConfig { Self { core_allocation: CoreAllocation::OsDefault, max_threads: 16, - priority: 0, + priority: crate::policy::DEFAULT_PRIORITY, + policy: "OTHER".to_owned(), stack_size_bytes: 2 * 1024 * 1024, } } @@ -131,12 +134,13 @@ impl NativeThreadRuntime { let core_alloc = self.config.core_allocation.clone(); let priority = self.config.priority; + let policy = parse_policy(&self.config.policy); let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector()); let jh = std::thread::Builder::new() .name(name) .stack_size(self.config.stack_size_bytes) .spawn(move || { - apply_policy(&core_alloc, priority, &chosen_cores_mask); + apply_policy(&core_alloc, policy, priority, &chosen_cores_mask); f() })?; let rc = self.running_count.fetch_add(1, Ordering::Relaxed); diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs index cd975884459c1f..069fa96b44b3c6 100644 --- a/thread-manager/src/policy.rs +++ b/thread-manager/src/policy.rs @@ -1,11 +1,17 @@ +#[cfg(target_os = "linux")] +use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy}; use { serde::{Deserialize, Serialize}, std::sync::OnceLock, - thread_priority::ThreadExt, }; +#[cfg(not(target_os = "linux"))] +struct ThreadSchedulePolicy {} + static CORE_COUNT: OnceLock = OnceLock::new(); +pub const DEFAULT_PRIORITY: u8 = 0; + #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub enum CoreAllocation { ///Use OS default allocation (i.e. do not alter core affinity) @@ -50,17 +56,35 @@ pub fn set_thread_affinity(cores: &[usize]) { #[cfg(not(target_os = "linux"))] pub fn set_thread_affinity(_cores: &[usize]) {} +#[cfg(target_os = "linux")] +pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy { + match policy.to_uppercase().as_ref() { + "BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch), + "OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other), + "IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle), + _ => panic!("Could not parse the policy"), + } +} + +#[cfg(not(target_os = "linux"))] +pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy { + ThreadSchedulePolicy +} + ///Applies policy to the calling thread pub fn apply_policy( alloc: &CoreAllocation, + policy: ThreadSchedulePolicy, priority: u8, chosen_cores_mask: &std::sync::Mutex>, ) { - std::thread::current() - .set_priority(thread_priority::ThreadPriority::Crossplatform( - (priority).try_into().unwrap(), - )) - .expect("Can not set thread priority!"); + #[cfg(target_os = "linux")] + if let Err(e) = std::thread::current().set_priority_and_policy( + policy, + thread_priority::ThreadPriority::Crossplatform((priority).try_into().unwrap()), + ) { + panic!("Can not set thread priority, OS error {:?}", e); + } match alloc { CoreAllocation::PinnedCores { min: _, max: _ } => { diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs index a6d3a29962b2b7..416569d8a68066 100644 --- a/thread-manager/src/rayon_runtime.rs +++ b/thread-manager/src/rayon_runtime.rs @@ -1,14 +1,10 @@ use { - crate::policy::{apply_policy, CoreAllocation}, + crate::policy::{apply_policy, parse_policy, CoreAllocation}, anyhow::Ok, serde::{Deserialize, Serialize}, - solana_metrics::datapoint_info, std::{ ops::Deref, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, }, }; @@ -16,7 +12,9 @@ use { #[serde(default)] pub struct RayonConfig { pub worker_threads: usize, + /// Priority in range 1..99 pub priority: u8, + pub policy: String, pub stack_size_bytes: usize, pub core_allocation: CoreAllocation, } @@ -26,7 +24,8 @@ impl Default for RayonConfig { Self { core_allocation: CoreAllocation::OsDefault, worker_threads: 16, - priority: 0, + priority: crate::policy::DEFAULT_PRIORITY, + policy: "BATCH".to_owned(), stack_size_bytes: 2 * 1024 * 1024, } } @@ -60,21 +59,24 @@ impl Deref for RayonRuntime { impl RayonRuntime { pub fn new(name: String, config: RayonConfig) -> anyhow::Result { - let policy = config.core_allocation.clone(); - let chosen_cores_mask = Mutex::new(policy.as_core_mask_vector()); + let core_allocation = config.core_allocation.clone(); + let chosen_cores_mask = Mutex::new(core_allocation.as_core_mask_vector()); let priority = config.priority; - let spawned_threads = AtomicI64::new(0); + let policy = parse_policy(&config.policy); let rayon_pool = rayon::ThreadPoolBuilder::new() .num_threads(config.worker_threads) .thread_name(move |i| format!("{}_{}", &name, i)) + .stack_size(config.stack_size_bytes) .start_handler(move |_idx| { - let rc = spawned_threads.fetch_add(1, Ordering::Relaxed); - datapoint_info!("thread-manager-rayon", ("threads-spawned", rc, i64),); - apply_policy(&policy, priority, &chosen_cores_mask); + apply_policy(&core_allocation, policy, priority, &chosen_cores_mask); }) .build()?; Ok(Self { inner: Arc::new(RayonRuntimeInner { rayon_pool, config }), }) } + + pub fn new_for_tests(name: &str) -> Self { + Self::new(name.to_owned(), RayonConfig::default()).unwrap() + } } diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index 363d4140c43f27..6e2101c6fde2f6 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -1,5 +1,5 @@ use { - crate::policy::{apply_policy, CoreAllocation}, + crate::policy::{apply_policy, parse_policy, CoreAllocation}, serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, std::{ @@ -20,7 +20,9 @@ pub struct TokioConfig { pub worker_threads: usize, ///max number of blocking threads tokio is allowed to spawn pub max_blocking_threads: usize, + /// Priority in range 1..99 pub priority: u8, + pub policy: String, pub stack_size_bytes: usize, pub event_interval: u32, pub core_allocation: CoreAllocation, @@ -32,7 +34,8 @@ impl Default for TokioConfig { core_allocation: CoreAllocation::OsDefault, worker_threads: 8, max_blocking_threads: 1, - priority: 0, + priority: crate::policy::DEFAULT_PRIORITY, + policy: "OTHER".to_owned(), stack_size_bytes: 2 * 1024 * 1024, event_interval: 61, } @@ -116,7 +119,12 @@ impl TokioRuntime { // todo - tracing //let tname = cur_thread.name().unwrap(); //println!("thread {tname} id {tid} started"); - apply_policy(&c.core_allocation, c.priority, &chosen_cores_mask); + apply_policy( + &c.core_allocation, + parse_policy(&c.policy), + c.priority, + &chosen_cores_mask, + ); }); Ok(TokioRuntime { tokio: builder.build()?,