Skip to content

Commit

Permalink
cleaner api
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Jan 8, 2025
1 parent a4e0afb commit e00029a
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 72 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ and context switches that would occur if Rayon was entirely unaware it was runni
tokio, and each was to spawn as many threads as there are cores.

# Supported threading models
## Affinity
All threading models allow setting core affinity, but only on linux

For core affinity you can set e.g.
```toml
core_allocation.DedicatedCoreSet = { min = 16, max = 64 }
```
to pin the pool to cores 16-64.

## Scheduling policy and priority
If you want you can set thread scheduling policy and priority. Keep in mind that this will likely require
```bash
sudo setcap cap_sys_nice+ep
```
or root priviledges to run the resulting process.
To see which policies are supported check (the sources)[./src/policy.rs]
If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible.

## Tokio
Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on.
Number of worker and blocking threads is configurable, as are thread priorities for the pool.
Expand Down
8 changes: 2 additions & 6 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,9 @@ fn main() -> anyhow::Result<()> {
let cfg: ThreadManagerConfig = toml::from_str(&buf)?;

let manager = ThreadManager::new(cfg).unwrap();
let tokio1 = manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1");
let tokio1 = manager.get_tokio("axum1");
tokio1.start_metrics_sampling(Duration::from_secs(1));
let tokio2 = manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2");
let tokio2 = manager.get_tokio("axum2");
tokio2.start_metrics_sampling(Duration::from_secs(1));

let wrk_cores: Vec<_> = (32..64).collect();
Expand Down
27 changes: 3 additions & 24 deletions thread-manager/examples/core_contention_sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,36 +109,15 @@ fn main() -> anyhow::Result<()> {
let (tokio1, tokio2) = match regime {
Regime::Shared => {
manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
(
manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2"),
)
(manager.get_tokio("axum1"), manager.get_tokio("axum2"))
}
Regime::Dedicated => {
manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap();
(
manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2"),
)
(manager.get_tokio("axum1"), manager.get_tokio("axum2"))
}
Regime::Single => {
manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
(
manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2"),
)
(manager.get_tokio("axum1"), manager.get_tokio("axum2"))
}
};

Expand Down
109 changes: 102 additions & 7 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,44 @@ impl ThreadManager {
}
}

pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
self.lookup(
name,
&self.native_runtime_mapping,
&self.native_thread_runtimes,
)
}
pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
if let Some(runtime) = self.try_get_native(name) {
runtime
} else {
panic!("Native thread pool {name} not configured!");
}
}

pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
}

pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
if let Some(runtime) = self.try_get_rayon(name) {
runtime
} else {
panic!("Rayon thread pool {name} not configured!");
}
}

pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
}

pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
if let Some(runtime) = self.try_get_tokio(name) {
runtime
} else {
panic!("Tokio runtime {name} not configured!");
}
}
pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();

Expand Down Expand Up @@ -214,6 +236,81 @@ mod tests {
#[cfg(not(target_os = "linux"))]
fn validate_affinity(_expect_cores: &[usize], _error_msg: &str) {}

/* #[test]
fn thread_priority() {
let priority_high = 10;
let priority_default = crate::policy::DEFAULT_PRIORITY;
let priority_low = 1;
let conf = ThreadManagerConfig {
native_configs: HashMap::from([
(
"high".to_owned(),
NativeConfig {
priority: priority_high,
..Default::default()
},
),
(
"default".to_owned(),
NativeConfig {
..Default::default()
},
),
(
"low".to_owned(),
NativeConfig {
priority: priority_low,
..Default::default()
},
),
]),
..Default::default()
};
let manager = ThreadManager::new(conf).unwrap();
let high = manager.get_native("high");
let low = manager.get_native("low");
let default = manager.get_native("default");
high.spawn(move || {
let prio =
thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
assert_eq!(
prio,
thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
);
})
.unwrap()
.join()
.unwrap();
low.spawn(move || {
let prio =
thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
assert_eq!(
prio,
thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
);
})
.unwrap()
.join()
.unwrap();
default
.spawn(move || {
let prio =
thread_priority::get_thread_priority(thread_priority::thread_native_id())
.unwrap();
assert_eq!(
prio,
thread_priority::ThreadPriority::Crossplatform(
(priority_default).try_into().unwrap()
)
);
})
.unwrap()
.join()
.unwrap();
}*/

#[test]
fn process_affinity() {
let conf = ThreadManagerConfig {
Expand All @@ -222,7 +319,6 @@ mod tests {
NativeConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
max_threads: 5,
priority: 0,
..Default::default()
},
)]),
Expand All @@ -232,7 +328,7 @@ mod tests {
};

let manager = ThreadManager::new(conf).unwrap();
let runtime = manager.get_native("test").unwrap();
let runtime = manager.get_native("test");

let thread1 = runtime
.spawn(|| {
Expand Down Expand Up @@ -263,7 +359,6 @@ mod tests {
RayonConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
worker_threads: 3,
priority: 0,
..Default::default()
},
)]),
Expand All @@ -273,7 +368,7 @@ mod tests {
};

let manager = ThreadManager::new(conf).unwrap();
let rayon_runtime = manager.get_rayon("test").unwrap();
let rayon_runtime = manager.get_rayon("test");

let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
println!("Rayon thread {} reporting", ctx.index());
Expand Down
10 changes: 7 additions & 3 deletions thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::policy::{apply_policy, CoreAllocation},
crate::policy::{apply_policy, parse_policy, CoreAllocation},
anyhow::bail,
log::error,
serde::{Deserialize, Serialize},
Expand All @@ -18,7 +18,9 @@ use {
pub struct NativeConfig {
pub core_allocation: CoreAllocation,
pub max_threads: usize,
/// Priority in range 1..99
pub priority: u8,
pub policy: String,
pub stack_size_bytes: usize,
}

Expand All @@ -27,7 +29,8 @@ impl Default for NativeConfig {
Self {
core_allocation: CoreAllocation::OsDefault,
max_threads: 16,
priority: 0,
priority: crate::policy::DEFAULT_PRIORITY,
policy: "OTHER".to_owned(),
stack_size_bytes: 2 * 1024 * 1024,
}
}
Expand Down Expand Up @@ -131,12 +134,13 @@ impl NativeThreadRuntime {

let core_alloc = self.config.core_allocation.clone();
let priority = self.config.priority;
let policy = parse_policy(&self.config.policy);
let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector());
let jh = std::thread::Builder::new()
.name(name)
.stack_size(self.config.stack_size_bytes)
.spawn(move || {
apply_policy(&core_alloc, priority, &chosen_cores_mask);
apply_policy(&core_alloc, policy, priority, &chosen_cores_mask);
f()
})?;
let rc = self.running_count.fetch_add(1, Ordering::Relaxed);
Expand Down
Loading

0 comments on commit e00029a

Please sign in to comment.