From 491b7e1f670610a3fa6afa428216e899036fa1ad Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 18 Nov 2024 12:56:43 +0100 Subject: [PATCH] ref(system): Spawn with custom task ID --- relay-system/src/runtime.rs | 103 +++++++++++++++++++++++------------- relay-system/src/service.rs | 4 +- 2 files changed, 67 insertions(+), 40 deletions(-) diff --git a/relay-system/src/runtime.rs b/relay-system/src/runtime.rs index 77a2f4e96f..ddef607dcf 100644 --- a/relay-system/src/runtime.rs +++ b/relay-system/src/runtime.rs @@ -3,75 +3,82 @@ use tokio::task::JoinHandle; use crate::statsd::SystemCounters; -/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. +/// Spawns an instrumented task with an automatically generated [`TaskId`]. /// -/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`]. +/// Returns a [`JoinHandle`]. #[macro_export] macro_rules! spawn { ($future:expr) => {{ - static _TASK_ID: ::std::sync::OnceLock<$crate::TaskId> = ::std::sync::OnceLock::new(); - let task_id = _TASK_ID.get_or_init(|| (*::std::panic::Location::caller()).into()); - $crate::_spawn_inner(task_id, $future) + static _FILE_LINE: ::std::sync::OnceLock<(String, String, String)> = + ::std::sync::OnceLock::new(); + let (id, file, line) = _FILE_LINE.get_or_init(|| { + let caller = *::std::panic::Location::caller(); + let id = format!("{}:{}", caller.file(), caller.line()); + (id, caller.file().to_owned(), caller.line().to_string()) + }); + $crate::spawn((id.as_str(), file.as_str(), line.as_str()), $future) }}; } -#[doc(hidden)] -#[allow(clippy::disallowed_methods)] -pub fn _spawn_inner(task_id: &'static TaskId, future: F) -> JoinHandle +/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. +/// +/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`]. +pub fn spawn(task_id: impl Into, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { - tokio::spawn(Task::new(task_id, future)) + tokio::spawn(Task::new(task_id.into(), future)) } -/// An internal id for a spawned task. #[doc(hidden)] pub struct TaskId { - id: String, - file: String, - line: String, + id: &'static str, + file: &'static str, + line: &'static str, } -impl From> for TaskId { - fn from(value: std::panic::Location<'_>) -> Self { - Self { - id: format!("{}:{}", value.file(), value.line()), - file: value.file().to_owned(), - line: value.line().to_string(), +impl TaskId { + fn emit_metric(&self, metric: SystemCounters) { + let Self { id, file, line } = self; + relay_statsd::metric!(counter(metric) += 1, id = id, file = file, line = line); + } +} + +impl From<&'static str> for TaskId { + fn from(id: &'static str) -> Self { + TaskId { + id, + file: "", + line: "", } } } +impl From<(&'static str, &'static str, &'static str)> for TaskId { + fn from((id, file, line): (&'static str, &'static str, &'static str)) -> Self { + Self { id, file, line } + } +} + pin_project_lite::pin_project! { /// Wraps a future and emits related task metrics. struct Task { - id: &'static TaskId, + id: TaskId, #[pin] inner: T, } impl PinnedDrop for Task { fn drop(this: Pin<&mut Self>) { - let this = this.project(); - relay_statsd::metric!( - counter(SystemCounters::RuntimeTaskTerminated) += 1, - id = this.id.id.as_str(), - file = this.id.file.as_str(), - line = this.id.line.as_str(), - ); + this.id.emit_metric(SystemCounters::RuntimeTaskTerminated); } } } impl Task { - fn new(id: &'static TaskId, inner: T) -> Self { - relay_statsd::metric!( - counter(SystemCounters::RuntimeTaskCreated) += 1, - id = id.id.as_str(), - file = id.file.as_str(), - line = id.line.as_str(), - ); + fn new(id: TaskId, inner: T) -> Self { + id.emit_metric(SystemCounters::RuntimeTaskCreated); Self { id, inner } } } @@ -107,15 +114,35 @@ mod tests { #[cfg(not(windows))] assert_debug_snapshot!(captures, @r###" [ - "runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103", - "runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103", + "runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:110,file:relay-system/src/runtime.rs,line:110", + "runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:110,file:relay-system/src/runtime.rs,line:110", ] "###); #[cfg(windows)] assert_debug_snapshot!(captures, @r###" [ - "runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103", - "runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103", + "runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:110,file:relay-system\\src\\runtime.rs,line:110", + "runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:110,file:relay-system\\src\\runtime.rs,line:110", + ] + "###); + } + + #[test] + fn test_spawn_with_custom_id() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + let captures = relay_statsd::with_capturing_test_client(|| { + rt.block_on(async { + let _ = crate::spawn("my-task", async {}).await; + }) + }); + + assert_debug_snapshot!(captures, @r###" + [ + "runtime.task.spawn.created:1|c|#id:my-task,file:,line:", + "runtime.task.spawn.terminated:1|c|#id:my-task,file:,line:", ] "###); } diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index 9add08d35c..b656efc214 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -1009,7 +1009,7 @@ pub trait Service: Sized { /// for tests. fn start_detached(self) -> Addr { let (addr, rx) = channel(Self::name()); - spawn!(self.run(rx)); + spawn(Self::name(), self.run(rx)); addr } @@ -1043,7 +1043,7 @@ impl ServiceRunner { /// Starts a service and starts tracking its join handle, given a predefined receiver. pub fn start_with(&mut self, service: S, rx: Receiver) { - self.0.push(spawn!(service.run(rx))); + self.0.push(spawn(S::name(), service.run(rx))); } /// Awaits until all services have finished.