diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index ea481a707893b8..d23b5e16d49644 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -54,7 +54,7 @@ fn main() -> anyhow::Result<()> { conf_file.push(exp); let mut buf = String::new(); std::fs::File::open(conf_file)?.read_to_string(&mut buf)?; - let cfg: RuntimeManagerConfig = toml::from_str(&buf)?; + let cfg: ThreadManagerConfig = toml::from_str(&buf)?; let manager = ThreadManager::new(cfg).unwrap(); let tokio1 = manager diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs index e466b3bae05086..5edd213a42839e 100644 --- a/thread-manager/examples/core_contention_sweep.rs +++ b/thread-manager/examples/core_contention_sweep.rs @@ -40,14 +40,14 @@ async fn axum_main(port: u16) { } } } -fn make_config_shared(cc: usize) -> RuntimeManagerConfig { +fn make_config_shared(cc: usize) -> ThreadManagerConfig { let tokio_cfg_1 = TokioConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc }, worker_threads: cc, ..Default::default() }; let tokio_cfg_2 = tokio_cfg_1.clone(); - RuntimeManagerConfig { + ThreadManagerConfig { tokio_configs: HashMap::from([ ("axum1".into(), tokio_cfg_1), ("axum2".into(), tokio_cfg_2), @@ -55,7 +55,7 @@ fn make_config_shared(cc: usize) -> RuntimeManagerConfig { ..Default::default() } } -fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig { +fn make_config_dedicated(core_count: usize) -> ThreadManagerConfig { let tokio_cfg_1 = TokioConfig { core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, @@ -72,7 +72,7 @@ fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig { worker_threads: core_count / 2, ..Default::default() }; - RuntimeManagerConfig { + ThreadManagerConfig { tokio_configs: HashMap::from([ ("axum1".into(), tokio_cfg_1), ("axum2".into(), tokio_cfg_2), diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index c439432cb20bd0..e852f00995aed5 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -1,7 +1,7 @@ use { anyhow::Ok, serde::{Deserialize, Serialize}, - std::collections::HashMap, + std::{collections::HashMap, ops::Deref, sync::Arc}, }; pub mod native_thread_runtime; @@ -18,7 +18,7 @@ pub use { pub type ConstString = Box; #[derive(Default, Debug)] -pub struct ThreadManager { +pub struct ThreadManagerInner { pub tokio_runtimes: HashMap, pub tokio_runtime_mapping: HashMap, @@ -28,44 +28,9 @@ pub struct ThreadManager { pub rayon_runtimes: HashMap, pub rayon_runtime_mapping: HashMap, } - -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct RuntimeManagerConfig { - pub native_configs: HashMap, - pub native_runtime_mapping: HashMap, - - pub rayon_configs: HashMap, - pub rayon_runtime_mapping: HashMap, - - pub tokio_configs: HashMap, - pub tokio_runtime_mapping: HashMap, - - pub default_core_allocation: CoreAllocation, -} - -impl ThreadManager { - pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { - let name = self.native_runtime_mapping.get(name)?; - self.native_thread_runtimes.get(name) - } - pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> { - let name = self.rayon_runtime_mapping.get(name)?; - self.rayon_runtimes.get(name) - } - pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> { - let name = self.tokio_runtime_mapping.get(name)?; - self.tokio_runtimes.get(name) - } - pub fn set_process_affinity(config: &RuntimeManagerConfig) -> anyhow::Result> { - let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector(); - - crate::policy::set_thread_affinity(&chosen_cores_mask); - Ok(chosen_cores_mask) - } - +impl ThreadManagerInner { /// Populates mappings with copies of config names, overrides as appropriate - fn populate_mappings(&mut self, config: &RuntimeManagerConfig) { + fn populate_mappings(&mut self, config: &ThreadManagerConfig) { //TODO: this should probably be cleaned up with a macro at some point... for name in config.native_configs.keys() { @@ -95,10 +60,93 @@ impl ThreadManager { .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); } } - pub fn new(config: RuntimeManagerConfig) -> anyhow::Result { +} + +#[derive(Default, Debug, Clone)] +pub struct ThreadManager { + inner: Arc, +} +impl Deref for ThreadManager { + type Target = ThreadManagerInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct ThreadManagerConfig { + pub native_configs: HashMap, + pub native_runtime_mapping: HashMap, + + pub rayon_configs: HashMap, + pub rayon_runtime_mapping: HashMap, + + pub tokio_configs: HashMap, + pub tokio_runtime_mapping: HashMap, + + pub default_core_allocation: CoreAllocation, +} + +impl Default for ThreadManagerConfig { + fn default() -> Self { + Self { + native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]), + native_runtime_mapping: HashMap::new(), + rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]), + rayon_runtime_mapping: HashMap::new(), + tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]), + tokio_runtime_mapping: HashMap::new(), + default_core_allocation: CoreAllocation::OsDefault, + } + } +} + +impl ThreadManager { + /// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None. + fn lookup<'a, T>( + &'a self, + name: &str, + mapping: &HashMap, + runtimes: &'a HashMap, + ) -> Option<&'a T> { + match mapping.get(name) { + Some(n) => runtimes.get(n), + None => match mapping.get("default") { + Some(n) => runtimes.get(n), + None => None, + }, + } + } + + pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { + self.lookup( + name, + &self.native_runtime_mapping, + &self.native_thread_runtimes, + ) + } + + pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> { + self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes) + } + + pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> { + self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes) + } + + pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result> { + let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector(); + + crate::policy::set_thread_affinity(&chosen_cores_mask); + Ok(chosen_cores_mask) + } + + pub fn new(config: ThreadManagerConfig) -> anyhow::Result { let mut core_allocations = HashMap::>::new(); Self::set_process_affinity(&config)?; - let mut manager = Self::default(); + let mut manager = ThreadManagerInner::default(); manager.populate_mappings(&config); for (name, cfg) in config.native_configs.iter() { let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone()); @@ -124,14 +172,16 @@ impl ThreadManager { .tokio_runtimes .insert(name.clone().into_boxed_str(), tokiort); } - Ok(manager) + Ok(Self { + inner: Arc::new(manager), + }) } } #[cfg(test)] mod tests { use { - crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManagerConfig, ThreadManager}, + crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager, ThreadManagerConfig}, std::{collections::HashMap, io::Read}, }; @@ -151,7 +201,7 @@ mod tests { .unwrap() .read_to_string(&mut buf) .unwrap(); - let cfg: RuntimeManagerConfig = toml::from_str(&buf).unwrap(); + let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap(); println!("{:?}", cfg); } } @@ -166,7 +216,7 @@ mod tests { #[test] fn process_affinity() { - let conf = RuntimeManagerConfig { + let conf = ThreadManagerConfig { native_configs: HashMap::from([( "pool1".to_owned(), NativeConfig { @@ -207,7 +257,7 @@ mod tests { #[test] fn rayon_affinity() { - let conf = RuntimeManagerConfig { + let conf = ThreadManagerConfig { rayon_configs: HashMap::from([( "test".to_owned(), RayonConfig { diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs index f99db65ae9fe5c..cb01eeff8ae3bd 100644 --- a/thread-manager/src/native_thread_runtime.rs +++ b/thread-manager/src/native_thread_runtime.rs @@ -4,9 +4,12 @@ use { log::error, serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, - std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Mutex, + std::{ + ops::Deref, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }, }; @@ -23,7 +26,7 @@ impl Default for NativeConfig { fn default() -> Self { Self { core_allocation: CoreAllocation::OsDefault, - max_threads: 10, + max_threads: 16, priority: 0, stack_size_bytes: 2 * 1024 * 1024, } @@ -31,13 +34,26 @@ impl Default for NativeConfig { } #[derive(Debug)] -pub struct NativeThreadRuntime { +pub struct NativeThreadRuntimeInner { pub id_count: AtomicUsize, pub running_count: Arc, pub config: NativeConfig, pub name: String, } +#[derive(Debug, Clone)] +pub struct NativeThreadRuntime { + inner: Arc, +} + +impl Deref for NativeThreadRuntime { + type Target = NativeThreadRuntimeInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + pub struct JoinHandle { std_handle: Option>, running_count: Arc, @@ -82,13 +98,27 @@ impl Drop for JoinHandle { impl NativeThreadRuntime { pub fn new(name: String, cfg: NativeConfig) -> Self { Self { - id_count: AtomicUsize::new(0), - running_count: Arc::new(AtomicUsize::new(0)), - config: cfg, - name, + inner: Arc::new(NativeThreadRuntimeInner { + id_count: AtomicUsize::new(0), + running_count: Arc::new(AtomicUsize::new(0)), + config: cfg, + name, + }), } } + pub fn spawn(&self, f: F) -> anyhow::Result> + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + let n = self.id_count.fetch_add(1, Ordering::Relaxed); + let name = format!("{}-{}", &self.name, n); + self.spawn_named(name, f) + } + + pub fn spawn_named(&self, name: String, f: F) -> anyhow::Result> where F: FnOnce() -> T, F: Send + 'static, @@ -102,9 +132,8 @@ impl NativeThreadRuntime { let core_alloc = self.config.core_allocation.clone(); let priority = self.config.priority; let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector()); - let n = self.id_count.fetch_add(1, Ordering::Relaxed); let jh = std::thread::Builder::new() - .name(format!("{}-{}", &self.name, n)) + .name(name) .stack_size(self.config.stack_size_bytes) .spawn(move || { apply_policy(&core_alloc, priority, &chosen_cores_mask); diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs index b731bd83051bcb..a6d3a29962b2b7 100644 --- a/thread-manager/src/rayon_runtime.rs +++ b/thread-manager/src/rayon_runtime.rs @@ -3,9 +3,12 @@ use { anyhow::Ok, serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, - std::sync::{ - atomic::{AtomicI64, Ordering}, - Mutex, + std::{ + ops::Deref, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, Mutex, + }, }, }; @@ -22,7 +25,7 @@ impl Default for RayonConfig { fn default() -> Self { Self { core_allocation: CoreAllocation::OsDefault, - worker_threads: 4, + worker_threads: 16, priority: 0, stack_size_bytes: 2 * 1024 * 1024, } @@ -30,10 +33,30 @@ impl Default for RayonConfig { } #[derive(Debug)] -pub struct RayonRuntime { +pub struct RayonRuntimeInner { pub rayon_pool: rayon::ThreadPool, pub config: RayonConfig, } +impl Deref for RayonRuntimeInner { + type Target = rayon::ThreadPool; + + fn deref(&self) -> &Self::Target { + &self.rayon_pool + } +} + +#[derive(Debug, Clone)] +pub struct RayonRuntime { + inner: Arc, +} + +impl Deref for RayonRuntime { + type Target = RayonRuntimeInner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} impl RayonRuntime { pub fn new(name: String, config: RayonConfig) -> anyhow::Result { @@ -50,6 +73,8 @@ impl RayonRuntime { apply_policy(&policy, priority, &chosen_cores_mask); }) .build()?; - Ok(Self { rayon_pool, config }) + Ok(Self { + inner: Arc::new(RayonRuntimeInner { rayon_pool, config }), + }) } } diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index b8563f9ae11348..363d4140c43f27 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -3,6 +3,7 @@ use { serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, std::{ + ops::Deref, sync::{ atomic::{AtomicI64, AtomicUsize, Ordering}, Arc, Mutex, @@ -29,7 +30,7 @@ impl Default for TokioConfig { fn default() -> Self { Self { core_allocation: CoreAllocation::OsDefault, - worker_threads: 1, + worker_threads: 8, max_blocking_threads: 1, priority: 0, stack_size_bytes: 2 * 1024 * 1024, @@ -45,6 +46,14 @@ pub struct TokioRuntime { pub counters: Arc, } +impl Deref for TokioRuntime { + type Target = tokio::runtime::Runtime; + + fn deref(&self) -> &Self::Target { + &self.tokio + } +} + impl TokioRuntime { /// Starts the metrics sampling task on the runtime to monitor how many workers are busy doing useful things. pub fn start_metrics_sampling(&self, period: Duration) { @@ -115,6 +124,15 @@ impl TokioRuntime { counters, }) } + + /// Makes test runtime with 2 threads, only for unittests + pub fn new_for_tests() -> Self { + let cfg = TokioConfig { + worker_threads: 2, + ..Default::default() + }; + TokioRuntime::new("solNetTest".to_owned(), cfg.clone()).unwrap() + } } ///Internal counters to keep track of worker pool utilization