Skip to content

Commit

Permalink
ref(system): Spawn with custom task ID
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Nov 18, 2024
1 parent 1af06f1 commit 491b7e1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 40 deletions.
103 changes: 65 additions & 38 deletions relay-system/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,82 @@ use tokio::task::JoinHandle;

use crate::statsd::SystemCounters;

/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
/// Spawns an instrumented task with an automatically generated [`TaskId`].
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
/// Returns a [`JoinHandle`].
#[macro_export]
macro_rules! spawn {
($future:expr) => {{
static _TASK_ID: ::std::sync::OnceLock<$crate::TaskId> = ::std::sync::OnceLock::new();
let task_id = _TASK_ID.get_or_init(|| (*::std::panic::Location::caller()).into());
$crate::_spawn_inner(task_id, $future)
static _FILE_LINE: ::std::sync::OnceLock<(String, String, String)> =
::std::sync::OnceLock::new();
let (id, file, line) = _FILE_LINE.get_or_init(|| {
let caller = *::std::panic::Location::caller();
let id = format!("{}:{}", caller.file(), caller.line());
(id, caller.file().to_owned(), caller.line().to_string())
});
$crate::spawn((id.as_str(), file.as_str(), line.as_str()), $future)
}};
}

#[doc(hidden)]
#[allow(clippy::disallowed_methods)]
pub fn _spawn_inner<F>(task_id: &'static TaskId, future: F) -> JoinHandle<F::Output>
/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
pub fn spawn<F>(task_id: impl Into<TaskId>, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(Task::new(task_id, future))
tokio::spawn(Task::new(task_id.into(), future))
}

/// An internal id for a spawned task.
#[doc(hidden)]
pub struct TaskId {
id: String,
file: String,
line: String,
id: &'static str,
file: &'static str,
line: &'static str,
}

impl From<std::panic::Location<'_>> for TaskId {
fn from(value: std::panic::Location<'_>) -> Self {
Self {
id: format!("{}:{}", value.file(), value.line()),
file: value.file().to_owned(),
line: value.line().to_string(),
impl TaskId {
fn emit_metric(&self, metric: SystemCounters) {
let Self { id, file, line } = self;
relay_statsd::metric!(counter(metric) += 1, id = id, file = file, line = line);
}
}

impl From<&'static str> for TaskId {
fn from(id: &'static str) -> Self {
TaskId {
id,
file: "",
line: "",
}
}
}

impl From<(&'static str, &'static str, &'static str)> for TaskId {
fn from((id, file, line): (&'static str, &'static str, &'static str)) -> Self {
Self { id, file, line }
}
}

pin_project_lite::pin_project! {
/// Wraps a future and emits related task metrics.
struct Task<T> {
id: &'static TaskId,
id: TaskId,
#[pin]
inner: T,
}

impl<T> PinnedDrop for Task<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
relay_statsd::metric!(
counter(SystemCounters::RuntimeTaskTerminated) += 1,
id = this.id.id.as_str(),
file = this.id.file.as_str(),
line = this.id.line.as_str(),
);
this.id.emit_metric(SystemCounters::RuntimeTaskTerminated);
}
}
}

impl<T> Task<T> {
fn new(id: &'static TaskId, inner: T) -> Self {
relay_statsd::metric!(
counter(SystemCounters::RuntimeTaskCreated) += 1,
id = id.id.as_str(),
file = id.file.as_str(),
line = id.line.as_str(),
);
fn new(id: TaskId, inner: T) -> Self {
id.emit_metric(SystemCounters::RuntimeTaskCreated);
Self { id, inner }
}
}
Expand Down Expand Up @@ -107,15 +114,35 @@ mod tests {
#[cfg(not(windows))]
assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103",
"runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103",
"runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:110,file:relay-system/src/runtime.rs,line:110",
"runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:110,file:relay-system/src/runtime.rs,line:110",
]
"###);
#[cfg(windows)]
assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103",
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103",
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:110,file:relay-system\\src\\runtime.rs,line:110",
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:110,file:relay-system\\src\\runtime.rs,line:110",
]
"###);
}

#[test]
fn test_spawn_with_custom_id() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let captures = relay_statsd::with_capturing_test_client(|| {
rt.block_on(async {
let _ = crate::spawn("my-task", async {}).await;
})
});

assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:my-task,file:,line:",
"runtime.task.spawn.terminated:1|c|#id:my-task,file:,line:",
]
"###);
}
Expand Down
4 changes: 2 additions & 2 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ pub trait Service: Sized {
/// for tests.
fn start_detached(self) -> Addr<Self::Interface> {
let (addr, rx) = channel(Self::name());
spawn!(self.run(rx));
spawn(Self::name(), self.run(rx));
addr
}

Expand Down Expand Up @@ -1043,7 +1043,7 @@ impl ServiceRunner {

/// Starts a service and starts tracking its join handle, given a predefined receiver.
pub fn start_with<S: Service>(&mut self, service: S, rx: Receiver<S::Interface>) {
self.0.push(spawn!(service.run(rx)));
self.0.push(spawn(S::name(), service.run(rx)));
}

/// Awaits until all services have finished.
Expand Down

0 comments on commit 491b7e1

Please sign in to comment.