Skip to content

Commit

Permalink
improved UX with interior refcounting
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Jan 8, 2025
1 parent 772b456 commit a4e0afb
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 69 deletions.
2 changes: 1 addition & 1 deletion thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn main() -> anyhow::Result<()> {
conf_file.push(exp);
let mut buf = String::new();
std::fs::File::open(conf_file)?.read_to_string(&mut buf)?;
let cfg: RuntimeManagerConfig = toml::from_str(&buf)?;
let cfg: ThreadManagerConfig = toml::from_str(&buf)?;

let manager = ThreadManager::new(cfg).unwrap();
let tokio1 = manager
Expand Down
8 changes: 4 additions & 4 deletions thread-manager/examples/core_contention_sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,22 @@ async fn axum_main(port: u16) {
}
}
}
fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
fn make_config_shared(cc: usize) -> ThreadManagerConfig {
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc },
worker_threads: cc,
..Default::default()
};
let tokio_cfg_2 = tokio_cfg_1.clone();
RuntimeManagerConfig {
ThreadManagerConfig {
tokio_configs: HashMap::from([
("axum1".into(), tokio_cfg_1),
("axum2".into(), tokio_cfg_2),
]),
..Default::default()
}
}
fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig {
fn make_config_dedicated(core_count: usize) -> ThreadManagerConfig {
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet {
min: 0,
Expand All @@ -72,7 +72,7 @@ fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig {
worker_threads: core_count / 2,
..Default::default()
};
RuntimeManagerConfig {
ThreadManagerConfig {
tokio_configs: HashMap::from([
("axum1".into(), tokio_cfg_1),
("axum2".into(), tokio_cfg_2),
Expand Down
142 changes: 96 additions & 46 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
anyhow::Ok,
serde::{Deserialize, Serialize},
std::collections::HashMap,
std::{collections::HashMap, ops::Deref, sync::Arc},
};

pub mod native_thread_runtime;
Expand All @@ -18,7 +18,7 @@ pub use {
pub type ConstString = Box<str>;

#[derive(Default, Debug)]
pub struct ThreadManager {
pub struct ThreadManagerInner {
pub tokio_runtimes: HashMap<ConstString, TokioRuntime>,
pub tokio_runtime_mapping: HashMap<ConstString, ConstString>,

Expand All @@ -28,44 +28,9 @@ pub struct ThreadManager {
pub rayon_runtimes: HashMap<ConstString, RayonRuntime>,
pub rayon_runtime_mapping: HashMap<ConstString, ConstString>,
}

#[derive(Default, Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct RuntimeManagerConfig {
pub native_configs: HashMap<String, NativeConfig>,
pub native_runtime_mapping: HashMap<String, String>,

pub rayon_configs: HashMap<String, RayonConfig>,
pub rayon_runtime_mapping: HashMap<String, String>,

pub tokio_configs: HashMap<String, TokioConfig>,
pub tokio_runtime_mapping: HashMap<String, String>,

pub default_core_allocation: CoreAllocation,
}

impl ThreadManager {
pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
let name = self.native_runtime_mapping.get(name)?;
self.native_thread_runtimes.get(name)
}
pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
let name = self.rayon_runtime_mapping.get(name)?;
self.rayon_runtimes.get(name)
}
pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
let name = self.tokio_runtime_mapping.get(name)?;
self.tokio_runtimes.get(name)
}
pub fn set_process_affinity(config: &RuntimeManagerConfig) -> anyhow::Result<Vec<usize>> {
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();

crate::policy::set_thread_affinity(&chosen_cores_mask);
Ok(chosen_cores_mask)
}

impl ThreadManagerInner {
/// Populates mappings with copies of config names, overrides as appropriate
fn populate_mappings(&mut self, config: &RuntimeManagerConfig) {
fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
//TODO: this should probably be cleaned up with a macro at some point...

for name in config.native_configs.keys() {
Expand Down Expand Up @@ -95,10 +60,93 @@ impl ThreadManager {
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
}
}
pub fn new(config: RuntimeManagerConfig) -> anyhow::Result<Self> {
}

#[derive(Default, Debug, Clone)]
pub struct ThreadManager {
inner: Arc<ThreadManagerInner>,
}
impl Deref for ThreadManager {
type Target = ThreadManagerInner;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct ThreadManagerConfig {
pub native_configs: HashMap<String, NativeConfig>,
pub native_runtime_mapping: HashMap<String, String>,

pub rayon_configs: HashMap<String, RayonConfig>,
pub rayon_runtime_mapping: HashMap<String, String>,

pub tokio_configs: HashMap<String, TokioConfig>,
pub tokio_runtime_mapping: HashMap<String, String>,

pub default_core_allocation: CoreAllocation,
}

impl Default for ThreadManagerConfig {
fn default() -> Self {
Self {
native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
native_runtime_mapping: HashMap::new(),
rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
rayon_runtime_mapping: HashMap::new(),
tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
tokio_runtime_mapping: HashMap::new(),
default_core_allocation: CoreAllocation::OsDefault,
}
}
}

impl ThreadManager {
/// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None.
fn lookup<'a, T>(
&'a self,
name: &str,
mapping: &HashMap<ConstString, ConstString>,
runtimes: &'a HashMap<ConstString, T>,
) -> Option<&'a T> {
match mapping.get(name) {
Some(n) => runtimes.get(n),
None => match mapping.get("default") {
Some(n) => runtimes.get(n),
None => None,
},
}
}

pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
self.lookup(
name,
&self.native_runtime_mapping,
&self.native_thread_runtimes,
)
}

pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
}

pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
}

pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();

crate::policy::set_thread_affinity(&chosen_cores_mask);
Ok(chosen_cores_mask)
}

pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
let mut core_allocations = HashMap::<ConstString, Vec<usize>>::new();
Self::set_process_affinity(&config)?;
let mut manager = Self::default();
let mut manager = ThreadManagerInner::default();
manager.populate_mappings(&config);
for (name, cfg) in config.native_configs.iter() {
let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
Expand All @@ -124,14 +172,16 @@ impl ThreadManager {
.tokio_runtimes
.insert(name.clone().into_boxed_str(), tokiort);
}
Ok(manager)
Ok(Self {
inner: Arc::new(manager),
})
}
}

#[cfg(test)]
mod tests {
use {
crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManagerConfig, ThreadManager},
crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager, ThreadManagerConfig},
std::{collections::HashMap, io::Read},
};

Expand All @@ -151,7 +201,7 @@ mod tests {
.unwrap()
.read_to_string(&mut buf)
.unwrap();
let cfg: RuntimeManagerConfig = toml::from_str(&buf).unwrap();
let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
println!("{:?}", cfg);
}
}
Expand All @@ -166,7 +216,7 @@ mod tests {

#[test]
fn process_affinity() {
let conf = RuntimeManagerConfig {
let conf = ThreadManagerConfig {
native_configs: HashMap::from([(
"pool1".to_owned(),
NativeConfig {
Expand Down Expand Up @@ -207,7 +257,7 @@ mod tests {

#[test]
fn rayon_affinity() {
let conf = RuntimeManagerConfig {
let conf = ThreadManagerConfig {
rayon_configs: HashMap::from([(
"test".to_owned(),
RayonConfig {
Expand Down
51 changes: 40 additions & 11 deletions thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use {
log::error,
serde::{Deserialize, Serialize},
solana_metrics::datapoint_info,
std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
std::{
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
},
};

Expand All @@ -23,21 +26,34 @@ impl Default for NativeConfig {
fn default() -> Self {
Self {
core_allocation: CoreAllocation::OsDefault,
max_threads: 10,
max_threads: 16,
priority: 0,
stack_size_bytes: 2 * 1024 * 1024,
}
}
}

#[derive(Debug)]
pub struct NativeThreadRuntime {
pub struct NativeThreadRuntimeInner {
pub id_count: AtomicUsize,
pub running_count: Arc<AtomicUsize>,
pub config: NativeConfig,
pub name: String,
}

#[derive(Debug, Clone)]
pub struct NativeThreadRuntime {
inner: Arc<NativeThreadRuntimeInner>,
}

impl Deref for NativeThreadRuntime {
type Target = NativeThreadRuntimeInner;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

pub struct JoinHandle<T> {
std_handle: Option<std::thread::JoinHandle<T>>,
running_count: Arc<AtomicUsize>,
Expand Down Expand Up @@ -82,13 +98,27 @@ impl<T> Drop for JoinHandle<T> {
impl NativeThreadRuntime {
pub fn new(name: String, cfg: NativeConfig) -> Self {
Self {
id_count: AtomicUsize::new(0),
running_count: Arc::new(AtomicUsize::new(0)),
config: cfg,
name,
inner: Arc::new(NativeThreadRuntimeInner {
id_count: AtomicUsize::new(0),
running_count: Arc::new(AtomicUsize::new(0)),
config: cfg,
name,
}),
}
}

pub fn spawn<F, T>(&self, f: F) -> anyhow::Result<JoinHandle<T>>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
let n = self.id_count.fetch_add(1, Ordering::Relaxed);
let name = format!("{}-{}", &self.name, n);
self.spawn_named(name, f)
}

pub fn spawn_named<F, T>(&self, name: String, f: F) -> anyhow::Result<JoinHandle<T>>
where
F: FnOnce() -> T,
F: Send + 'static,
Expand All @@ -102,9 +132,8 @@ impl NativeThreadRuntime {
let core_alloc = self.config.core_allocation.clone();
let priority = self.config.priority;
let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector());
let n = self.id_count.fetch_add(1, Ordering::Relaxed);
let jh = std::thread::Builder::new()
.name(format!("{}-{}", &self.name, n))
.name(name)
.stack_size(self.config.stack_size_bytes)
.spawn(move || {
apply_policy(&core_alloc, priority, &chosen_cores_mask);
Expand Down
Loading

0 comments on commit a4e0afb

Please sign in to comment.