>);
diff --git a/crates/primitives/utils/src/service.rs b/crates/primitives/utils/src/service.rs
index e8d5b1da0..ddba4eb93 100644
--- a/crates/primitives/utils/src/service.rs
+++ b/crates/primitives/utils/src/service.rs
@@ -1,127 +1,646 @@
-//! Service trait and combinators.
+//! Madara Services Architecture
+//!
+//! Madara follows a [microservice](microservices) architecture to simplify the
+//! composability and parallelism of its services. That is to say services can
+//! be started in different orders, at different points in the program's
+//! execution, stopped and even restarted. The advantage in parallelism arises
+//! from the fact that each services runs as its own non-blocking asynchronous
+//! task which allows for high throughput. Inter-service communication is done
+//! via [tokio::sync] or more often through direct database reads and writes.
+//!
+//! ---
+//!
+//! # The [Service] trait
+//!
+//! This is the backbone of Madara services and serves as a common interface to
+//! all. The [Service] trait specifies how a service must start as well as how
+//! to _identify_ it. For reasons of atomicity, services are currently
+//! identified by a single [std::sync::atomic::AtomicU64]. More about this later.
+//!
+//! Services are started with [Service::start] using [ServiceRunner::service_loop].
+//! [ServiceRunner::service_loop] is a function which takes in a future: this
+//! future represents the main loop of your service, and should run until your
+//! service completes or is cancelled.
+//!
+//! It is part of the contract of the [Service] trait that calls to
+//! [ServiceRunner::service_loop] should not complete until the service has
+//! _finished_ execution (this should be evident by the name) as this is used
+//! to mark a service as complete and therefore ready to restart. Services where
+//! [ServiceRunner::service_loop] completes _before_ the service has finished
+//! execution will be automatically marked for shutdown as a safety mechanism.
+//! This is done as a safeguard to avoid an invalid state where it would be
+//! impossible for the node to shutdown.
+//!
+//! > **Note**
+//! > It is assumed that services can and might be restarted. You have the
+//! > responsibility to ensure this is possible. This means you should make sure
+//! > not to use the like of [std::mem::take] or similar on your service inside
+//! > [Service::start]. In general, make sure your service still contains all
+//! > the necessary information it needs to restart. This might mean certain
+//! > attributes need to be stored as a [std::sync::Arc] and cloned so that the
+//! > future in [ServiceRunner::service_loop] can safely take ownership of them.
+//!
+//! ## An incorrect implementation of the [Service] trait
+//!
+//! ```rust
+//! # use mp_utils::service::Service;
+//! # use mp_utils::service::ServiceId;
+//! # use mp_utils::service::PowerOfTwo;
+//! # use mp_utils::service::ServiceRunner;
+//! # use mp_utils::service::MadaraServiceId;
+//!
+//! pub struct MyService;
+//!
+//! #[async_trait::async_trait]
+//! impl Service for MyService {
+//! async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> {
+//! runner.service_loop(move |ctx| async {
+//! tokio::task::spawn(async {
+//! tokio::time::sleep(std::time::Duration::MAX).await;
+//! });
+//!
+//! // This is incorrect, as the future passed to service_loop will
+//! // resolve before the task spawned above completes, meaning
+//! // Madara will incorrectly mark this service as ready to restart.
+//! // In a more complex scenario, this means we might enter an
+//! // invalid state!
+//! anyhow::Ok(())
+//! });
+//!
+//! anyhow::Ok(())
+//! }
+//! }
+//!
+//! impl ServiceId for MyService {
+//! fn svc_id(&self) -> PowerOfTwo {
+//! MadaraServiceId::Monitor.svc_id()
+//! }
+//! }
+//! ```
+//!
+//! ## A correct implementation of the [Service] trait
+//!
+//! ```rust
+//! # use mp_utils::service::Service;
+//! # use mp_utils::service::ServiceId;
+//! # use mp_utils::service::PowerOfTwo;
+//! # use mp_utils::service::ServiceRunner;
+//! # use mp_utils::service::MadaraServiceId;
+//!
+//! pub struct MyService;
+//!
+//! #[async_trait::async_trait]
+//! impl Service for MyService {
+//! async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> {
+//! runner.service_loop(move |mut ctx| async move {
+//! ctx.run_until_cancelled(tokio::time::sleep(std::time::Duration::MAX)).await;
+//!
+//! // This is correct, as the future passed to service_loop will
+//! // only resolve once the task above completes, so Madara can
+//! // correctly mark this service as ready to restart.
+//! anyhow::Ok(())
+//! });
+//!
+//! anyhow::Ok(())
+//! }
+//! }
+//!
+//! impl ServiceId for MyService {
+//! fn svc_id(&self) -> PowerOfTwo {
+//! MadaraServiceId::Monitor.svc_id()
+//! }
+//! }
+//! ```
+//!
+//! Or if you really need to spawn a background task:
+//!
+//! ```rust
+//! # use mp_utils::service::Service;
+//! # use mp_utils::service::ServiceId;
+//! # use mp_utils::service::PowerOfTwo;
+//! # use mp_utils::service::ServiceRunner;
+//! # use mp_utils::service::MadaraServiceId;
+//!
+//! pub struct MyService;
+//!
+//! #[async_trait::async_trait]
+//! impl Service for MyService {
+//! async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> {
+//! runner.service_loop(move |mut ctx| async move {
+//! let mut ctx1 = ctx.clone();
+//! tokio::task::spawn(async move {
+//! ctx1.run_until_cancelled(tokio::time::sleep(std::time::Duration::MAX)).await;
+//! });
+//!
+//! ctx.cancelled().await;
+//!
+//! // This is correct, as even though we are spawning a background
+//! // task we have implemented a cancellation mechanism with ctx
+//! // and are waiting for that cancellation in service_loop.
+//! anyhow::Ok(())
+//! });
+//!
+//! anyhow::Ok(())
+//! }
+//! }
+//!
+//! impl ServiceId for MyService {
+//! fn svc_id(&self) -> PowerOfTwo {
+//! MadaraServiceId::Monitor.svc_id()
+//! }
+//! }
+//! ```
+//!
+//! This sort of problem generally arises in cases similar to the above, where
+//! the service's role is to spawn another background task. This is can happen
+//! when the service needs to start a server for example. Either avoid spawning
+//! a detached task or use mechanisms such as [ServiceContext::cancelled] to
+//! await for the service's completion.
+//!
+//! Note that service shutdown is designed to be manual. We still implement a
+//! [SERVICE_GRACE_PERIOD] which is the maximum duration a service is allowed
+//! to take to shutdown, after which it is forcefully cancelled. This should not
+//! happen in practice and only serves to avoid cases where someone would forget
+//! to implement a cancellation check. More on this in the next section.
+//!
+//! ---
+//!
+//! # Cancellation status and inter-process requests
+//!
+//! Services are passed a [ServiceContext] as part of [ServiceRunner::service_loop]
+//! to be used during their execution to check for and request cancellation.
+//! Services can also start child services with [ServiceContext::child] to
+//! create a hierarchy of services.
+//!
+//! ## Cancellation checks
+//!
+//! The main advantage of [ServiceContext] is that it allows you to gracefully
+//! handle the shutdown of your services by checking for cancellation at logical
+//! points in the execution, such as every iteration of a service's main loop.
+//! You can use the following methods to check for cancellation, each with their
+//! own caveats.
+//!
+//! - [ServiceContext::is_cancelled]: synchronous, useful in non-blocking
+//! scenarios.
+//! - [ServiceContext::cancelled]: a future which resolves upon service
+//! cancellation. Useful to wait on a service or alongside [tokio::select].
+//!
+//! > **Warning**
+//! > It is your responsibility to check for cancellation inside of your
+//! > service. If you do not, or your service takes longer than
+//! > [SERVICE_GRACE_PERIOD] to shutdown, then your service will be forcefully
+//! > cancelled.
+//!
+//! ## Cancellation requests
+//!
+//! Any service with access to a [ServiceContext] can request the cancellation
+//! of _any other service, at any point during execution_. This can be used for
+//! error handling for example, by having a single service shut itself down
+//! without affecting other services, or for administrative and testing purposes
+//! by having a node operator toggle services on and off from a remote endpoint.
+//!
+//! You can use the following methods to request for the cancellation of a
+//! service:
+//!
+//! - [ServiceContext::cancel_global]: cancels all services.
+//! - [ServiceContext::cancel_local]: cancels this service and all its children.
+//! - [ServiceContext::service_remove]: cancel a specific service.
+//!
+//! ## Start requests
+//!
+//! You can _request_ for a service to be restarted by calling
+//! [ServiceContext::service_add]. This is not guaranteed to work, and will fail
+//! if the service is already running or if it has not been registered to
+//! [the set of global services](#service-orchestration) at the start of the
+//! program.
+//!
+//! ## Atomic status checks
+//!
+//! All service updates and checks are performed atomically with the use of
+//! [tokio_util::sync::CancellationToken] and [MadaraServiceMask], which is a
+//! [std::sync::atomic::AtomicU64] bitmask with strong [std::sync::atomic::Ordering::SeqCst]
+//! cross-thread ordering of operations. Services are represented as a unique
+//! [PowerOfTwo] which is provided through the [ServiceId] trait.
+//!
+//! > **Note**
+//! > The use of [std::sync::atomic::AtomicU64] limits the number of possible
+//! > services to 64. This might be increased in the future if there is a
+//! > demonstrable need for more services, but right now this limit seems
+//! > high enough.
+//!
+//! ---
+//!
+//! # Service orchestration
+//!
+//! Services are orchestrated by a [ServiceMonitor], which is responsible for
+//! registering services, marking them as active or inactive as well as starting
+//! and restarting them upon request. [ServiceMonitor] also handles the
+//! cancellation of all services upon receiving a `SIGINT` or `SIGTERM`.
+//!
+//! > **Important**
+//! > Services cannot be started or restarted if they have not been registered
+//! > with [ServiceMonitor::with].
+//!
+//! Services are run to completion until no service remains, at which point the
+//! node will automatically shutdown.
+//!
+//! [microservices]: https://en.wikipedia.org/wiki/Microservices
use anyhow::Context;
-use std::{fmt::Display, panic, sync::Arc};
+use futures::Future;
+use serde::{Deserialize, Serialize};
+use std::{
+ fmt::{Debug, Display},
+ panic,
+ sync::Arc,
+ time::Duration,
+};
use tokio::task::JoinSet;
-#[repr(u8)]
-#[derive(Clone, Copy, PartialEq, Eq, Default, Debug)]
-pub enum MadaraService {
+/// Maximum potential number of services that a [ServiceRunner] can run at once
+pub const SERVICE_COUNT_MAX: usize = 64;
+
+/// Maximum duration a service is allowed to take to shutdown, after which it
+/// will be forcefully cancelled
+pub const SERVICE_GRACE_PERIOD: Duration = Duration::from_secs(10);
+
+macro_rules! power_of_two {
+ ( $($pow:literal),* ) => {
+ paste::paste! {
+ #[repr(u64)]
+ #[derive(Clone, Copy, PartialEq, Eq, Default, Debug)]
+ pub enum PowerOfTwo {
+ #[default]
+ ZERO = 0,
+ $(
+ [] = 1u64 << $pow,
+ )*
+ }
+
+ impl PowerOfTwo {
+ /// Converts a [PowerOfTwo] into a unique index which can be
+ /// used in an arrray
+ pub fn index(&self) -> usize {
+ match self {
+ Self::ZERO => 0,
+ $(
+ Self::[
] => $pow,
+ )*
+ }
+ }
+ }
+
+ impl ServiceId for PowerOfTwo {
+ fn svc_id(&self) -> PowerOfTwo {
+ *self
+ }
+ }
+
+ impl Display for PowerOfTwo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", *self as u64)
+ }
+ }
+
+ impl TryFrom for PowerOfTwo {
+ type Error = anyhow::Error;
+
+ fn try_from(pow: u8) -> anyhow::Result {
+ TryFrom::::try_from(pow as u64)
+ }
+ }
+
+ impl TryFrom for PowerOfTwo {
+ type Error = anyhow::Error;
+
+ fn try_from(pow: u16) -> anyhow::Result {
+ TryFrom::::try_from(pow as u64)
+ }
+ }
+
+ impl TryFrom for PowerOfTwo {
+ type Error = anyhow::Error;
+
+ fn try_from(pow: u32) -> anyhow::Result {
+ TryFrom::::try_from(pow as u64)
+ }
+ }
+
+ impl TryFrom for PowerOfTwo
+ {
+ type Error = anyhow::Error;
+
+ fn try_from(pow: u64) -> anyhow::Result {
+ $(
+ const []: u64 = 1 << $pow;
+ )*
+
+ let pow: u64 = pow.into();
+ match pow {
+ 0 => anyhow::Ok(Self::ZERO),
+ $(
+ [
] => anyhow::Ok(Self::[
]),
+ )*
+ _ => anyhow::bail!("Not a power of two: {pow}"),
+ }
+ }
+ }
+ }
+ };
+}
+
+power_of_two!(
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30,
+ 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
+ 60, 61, 62, 63
+);
+
+/// The core [Service]s available in Madara.
+///
+/// Note that [PowerOfTwo::ZERO] represents [MadaraServiceId::Monitor] as
+/// [ServiceMonitor] is always running and therefore is the genesis state of all
+/// other services.
+#[derive(Clone, Copy, PartialEq, Eq, Default, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum MadaraServiceId {
#[default]
- None = 0,
- Database = 1,
- L1Sync = 2,
- L2Sync = 4,
- BlockProduction = 8,
- Rpc = 16,
- RpcAdmin = 32,
- Gateway = 64,
- Telemetry = 128,
-}
-
-impl Display for MadaraService {
+ #[serde(skip)]
+ Monitor,
+ #[serde(skip)]
+ Database,
+ L1Sync,
+ L2Sync,
+ BlockProduction,
+ #[serde(rename = "rpc")]
+ RpcUser,
+ #[serde(skip)]
+ RpcAdmin,
+ Gateway,
+ Telemetry,
+}
+
+impl ServiceId for MadaraServiceId {
+ #[inline(always)]
+ fn svc_id(&self) -> PowerOfTwo {
+ match self {
+ MadaraServiceId::Monitor => PowerOfTwo::ZERO,
+ MadaraServiceId::Database => PowerOfTwo::P0,
+ MadaraServiceId::L1Sync => PowerOfTwo::P1,
+ MadaraServiceId::L2Sync => PowerOfTwo::P2,
+ MadaraServiceId::BlockProduction => PowerOfTwo::P3,
+ MadaraServiceId::RpcUser => PowerOfTwo::P4,
+ MadaraServiceId::RpcAdmin => PowerOfTwo::P5,
+ MadaraServiceId::Gateway => PowerOfTwo::P6,
+ MadaraServiceId::Telemetry => PowerOfTwo::P7,
+ }
+ }
+}
+
+impl Display for MadaraServiceId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
- MadaraService::None => "none",
- MadaraService::Database => "database",
- MadaraService::L1Sync => "l1 sync",
- MadaraService::L2Sync => "l2 sync",
- MadaraService::BlockProduction => "block production",
- MadaraService::Rpc => "rpc",
- MadaraService::RpcAdmin => "rpc admin",
- MadaraService::Gateway => "gateway",
- MadaraService::Telemetry => "telemetry",
+ Self::Monitor => "monitor",
+ Self::Database => "database",
+ Self::L1Sync => "l1 sync",
+ Self::L2Sync => "l2 sync",
+ Self::BlockProduction => "block production",
+ Self::RpcUser => "rpc user",
+ Self::RpcAdmin => "rpc admin",
+ Self::Gateway => "gateway",
+ Self::Telemetry => "telemetry",
}
)
}
}
+impl std::ops::BitOr for MadaraServiceId {
+ type Output = u64;
+
+ fn bitor(self, rhs: Self) -> Self::Output {
+ self.svc_id() as u64 | rhs.svc_id() as u64
+ }
+}
+
+impl std::ops::BitAnd for MadaraServiceId {
+ type Output = u64;
+
+ fn bitand(self, rhs: Self) -> Self::Output {
+ self.svc_id() as u64 & rhs.svc_id() as u64
+ }
+}
+
+impl From for MadaraServiceId {
+ fn from(value: PowerOfTwo) -> Self {
+ match value {
+ PowerOfTwo::ZERO => Self::Monitor,
+ PowerOfTwo::P0 => Self::Database,
+ PowerOfTwo::P1 => Self::L1Sync,
+ PowerOfTwo::P2 => Self::L2Sync,
+ PowerOfTwo::P3 => Self::BlockProduction,
+ PowerOfTwo::P4 => Self::RpcUser,
+ PowerOfTwo::P5 => Self::RpcAdmin,
+ PowerOfTwo::P6 => Self::Gateway,
+ _ => Self::Telemetry,
+ }
+ }
+}
+
+// A boolean status enum, for clarity's sake
+#[derive(PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
+pub enum MadaraServiceStatus {
+ On,
+ #[default]
+ Off,
+}
+
+impl Display for MadaraServiceStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ match self {
+ Self::On => "on",
+ Self::Off => "off",
+ }
+ )
+ }
+}
+
+impl std::ops::BitOr for MadaraServiceStatus {
+ type Output = Self;
+
+ fn bitor(self, rhs: Self) -> Self::Output {
+ if self.is_on() || rhs.is_on() {
+ MadaraServiceStatus::On
+ } else {
+ MadaraServiceStatus::Off
+ }
+ }
+}
+
+impl std::ops::BitOr for &MadaraServiceStatus {
+ type Output = Self;
+
+ fn bitor(self, rhs: Self) -> Self::Output {
+ if self.is_on() || rhs.is_on() {
+ &MadaraServiceStatus::On
+ } else {
+ &MadaraServiceStatus::Off
+ }
+ }
+}
+
+impl std::ops::BitAnd for MadaraServiceStatus {
+ type Output = Self;
+
+ fn bitand(self, rhs: Self) -> Self::Output {
+ if self.is_on() && rhs.is_on() {
+ MadaraServiceStatus::On
+ } else {
+ MadaraServiceStatus::Off
+ }
+ }
+}
+
+impl std::ops::BitAnd for &MadaraServiceStatus {
+ type Output = Self;
+
+ fn bitand(self, rhs: Self) -> Self::Output {
+ if self.is_on() && rhs.is_on() {
+ &MadaraServiceStatus::On
+ } else {
+ &MadaraServiceStatus::Off
+ }
+ }
+}
+
+impl std::ops::BitOrAssign for MadaraServiceStatus {
+ fn bitor_assign(&mut self, rhs: Self) {
+ *self = if self.is_on() || rhs.is_on() { MadaraServiceStatus::On } else { MadaraServiceStatus::Off }
+ }
+}
+
+impl std::ops::BitAndAssign for MadaraServiceStatus {
+ fn bitand_assign(&mut self, rhs: Self) {
+ *self = if self.is_on() && rhs.is_on() { MadaraServiceStatus::On } else { MadaraServiceStatus::Off }
+ }
+}
+
+impl From for MadaraServiceStatus {
+ fn from(value: bool) -> Self {
+ match value {
+ true => Self::On,
+ false => Self::Off,
+ }
+ }
+}
+
+impl MadaraServiceStatus {
+ #[inline(always)]
+ pub fn is_on(&self) -> bool {
+ self == &MadaraServiceStatus::On
+ }
+
+ #[inline(always)]
+ pub fn is_off(&self) -> bool {
+ self == &MadaraServiceStatus::Off
+ }
+}
+
+/// An atomic bitmask of each [MadaraServiceId]'s status with strong
+/// [std::sync::atomic::Ordering::SeqCst] cross-thread ordering of operations.
#[repr(transparent)]
#[derive(Default)]
-pub struct MadaraServiceMask(std::sync::atomic::AtomicU8);
+pub struct MadaraServiceMask(std::sync::atomic::AtomicU64);
impl MadaraServiceMask {
#[cfg(feature = "testing")]
pub fn new_for_testing() -> Self {
- Self(std::sync::atomic::AtomicU8::new(u8::MAX))
+ Self(std::sync::atomic::AtomicU64::new(u64::MAX))
}
#[inline(always)]
- pub fn is_active(&self, cap: u8) -> bool {
- self.0.load(std::sync::atomic::Ordering::SeqCst) & cap > 0
+ pub fn status(&self, svc: impl ServiceId) -> MadaraServiceStatus {
+ (self.value() & svc.svc_id() as u64 > 0).into()
}
#[inline(always)]
- pub fn activate(&self, cap: MadaraService) -> bool {
- let prev = self.0.fetch_or(cap as u8, std::sync::atomic::Ordering::SeqCst);
- prev & cap as u8 > 0
+ pub fn is_active_some(&self) -> bool {
+ self.value() > 0
}
#[inline(always)]
- pub fn deactivate(&self, cap: MadaraService) -> bool {
- let cap = cap as u8;
- let prev = self.0.fetch_and(!cap, std::sync::atomic::Ordering::SeqCst);
- prev & cap > 0
+ pub fn activate(&self, svc: impl ServiceId) -> MadaraServiceStatus {
+ let prev = self.0.fetch_or(svc.svc_id() as u64, std::sync::atomic::Ordering::SeqCst);
+ (prev & svc.svc_id() as u64 > 0).into()
}
-}
-#[repr(u8)]
-#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
-pub enum MadaraState {
- #[default]
- Starting,
- Warp,
- Running,
- Shutdown,
-}
+ #[inline(always)]
+ pub fn deactivate(&self, svc: impl ServiceId) -> MadaraServiceStatus {
+ let svc = svc.svc_id() as u64;
+ let prev = self.0.fetch_and(!svc, std::sync::atomic::Ordering::SeqCst);
+ (prev & svc > 0).into()
+ }
-impl From for MadaraState {
- fn from(value: u8) -> Self {
- match value {
- 0 => Self::Starting,
- 1 => Self::Warp,
- 2 => Self::Running,
- _ => Self::Shutdown,
+ fn active_set(&self) -> Vec {
+ let mut i = MadaraServiceId::Telemetry.svc_id() as u64;
+ let state = self.value();
+ let mut set = Vec::with_capacity(SERVICE_COUNT_MAX);
+
+ while i > 0 {
+ let mask = state & i;
+
+ if mask > 0 {
+ let pow = PowerOfTwo::try_from(mask).expect("mask is a power of 2");
+ set.push(MadaraServiceId::from(pow));
+ }
+
+ i >>= 1;
}
+
+ set
+ }
+
+ fn value(&self) -> u64 {
+ self.0.load(std::sync::atomic::Ordering::SeqCst)
}
}
-/// Atomic state and cancellation context associated to a Service.
+/// Atomic state and cancellation context associated to a [Service].
///
/// # Scope
///
-/// You can create a hierarchy of services by calling `ServiceContext::branch_local`.
+/// You can create a hierarchy of services by calling [ServiceContext::child].
/// Services are said to be in the same _local scope_ if they inherit the same
-/// `token_local` cancellation token. You can think of services being local
-/// if they can cancel each other without affecting the rest of the app (this
-/// is not exact but it serves as a good mental model).
+/// `token_local` [tokio_util::sync::CancellationToken]. You can think of
+/// services being local if they can cancel each other without affecting the
+/// rest of the app.
///
-/// All services which descend from the same context are also said to be in the
-/// same _global scope_, that is to say any service in this scope can cancel
-/// _all_ other services in the same scope (including child services) at any
-/// time. This is true of services in the same [ServiceGroup] for example.
+/// All services which are derived from the same [ServiceContext] are said to
+/// be in the same _global scope_, that is to say any service in this scope can
+/// cancel _all_ other services in the same scope (including child services) at
+/// any time. This is true of services in the same [ServiceMonitor] for example.
///
-/// # Services
+/// # Service hierarchy
///
/// - A services is said to be a _child service_ if it uses a context created
-/// with `ServiceContext::branch_local`
+/// with [ServiceContext::child]
///
/// - A service is said to be a _parent service_ if it uses a context which was
/// used to create child services.
///
/// > A parent services can always cancel all of its child services, but a child
/// > service cannot cancel its parent service.
-#[cfg_attr(not(feature = "testing"), derive(Default))]
pub struct ServiceContext {
token_global: tokio_util::sync::CancellationToken,
token_local: Option,
services: Arc,
- services_notify: Arc,
- state: Arc,
- id: MadaraService,
+ service_update_sender: Arc>,
+ service_update_receiver: Option>,
+ id: PowerOfTwo,
}
impl Clone for ServiceContext {
@@ -130,142 +649,271 @@ impl Clone for ServiceContext {
token_global: self.token_global.clone(),
token_local: self.token_local.clone(),
services: Arc::clone(&self.services),
- services_notify: Arc::clone(&self.services_notify),
- state: Arc::clone(&self.state),
+ service_update_sender: Arc::clone(&self.service_update_sender),
+ service_update_receiver: None,
id: self.id,
}
}
}
-impl ServiceContext {
- pub fn new() -> Self {
+impl Default for ServiceContext {
+ fn default() -> Self {
Self {
token_global: tokio_util::sync::CancellationToken::new(),
token_local: None,
services: Arc::new(MadaraServiceMask::default()),
- services_notify: Arc::new(tokio::sync::Notify::new()),
- state: Arc::new(std::sync::atomic::AtomicU8::new(MadaraState::default() as u8)),
- id: MadaraService::default(),
+ service_update_sender: Arc::new(tokio::sync::broadcast::channel(SERVICE_COUNT_MAX).0),
+ service_update_receiver: None,
+ id: MadaraServiceId::Monitor.svc_id(),
}
}
+}
+
+impl ServiceContext {
+ /// Creates a new [Default] [ServiceContext]
+ pub fn new() -> Self {
+ Self::default()
+ }
#[cfg(feature = "testing")]
pub fn new_for_testing() -> Self {
- Self {
- token_global: tokio_util::sync::CancellationToken::new(),
- token_local: None,
- services: Arc::new(MadaraServiceMask::new_for_testing()),
- services_notify: Arc::new(tokio::sync::Notify::new()),
- state: Arc::new(std::sync::atomic::AtomicU8::new(MadaraState::default() as u8)),
- id: MadaraService::default(),
- }
+ Self { services: Arc::new(MadaraServiceMask::new_for_testing()), ..Default::default() }
+ }
+
+ /// Creates a new [Default] [ServiceContext] with the state of its services
+ /// set to the specified value.
+ pub fn new_with_services(services: Arc) -> Self {
+ Self { services, ..Default::default() }
}
/// Stops all services under the same global context scope.
pub fn cancel_global(&self) {
+ tracing::info!("🔌 Gracefully shutting down node");
+
self.token_global.cancel();
}
/// Stops all services under the same local context scope.
///
- /// A local context is created by calling `branch_local` and allows you to
- /// reduce the scope of cancellation only to those services which will use
- /// the new context.
+ /// A local context is created by calling [ServiceContext::child] and allows
+ /// you to reduce the scope of cancellation only to those services which
+ /// will use the new context.
pub fn cancel_local(&self) {
self.token_local.as_ref().unwrap_or(&self.token_global).cancel();
}
/// A future which completes when the service associated to this
- /// [ServiceContext] is canceled.
+ /// [ServiceContext] is cancelled.
///
- /// This happens after calling [ServiceContext::cancel_local] or
- /// [ServiceContext::cancel_global].
+ /// This allows for more manual implementation of cancellation logic than
+ /// [ServiceContext::run_until_cancelled], and should only be used in cases
+ /// where using `run_until_cancelled` is not possible or would be less
+ /// clear.
///
- /// Use this to race against other futures in a [tokio::select] for example.
+ /// A service is cancelled after calling [ServiceContext::cancel_local],
+ /// [ServiceContext::cancel_global] or if it is marked for removal with
+ /// [ServiceContext::service_remove].
+ ///
+ /// Use this to race against other futures in a [tokio::select] or keep a
+ /// coroutine alive for as long as the service itself.
#[inline(always)]
- pub async fn cancelled(&self) {
- if self.state() != MadaraState::Shutdown {
- match &self.token_local {
- Some(token_local) => tokio::select! {
- _ = self.token_global.cancelled() => {},
- _ = token_local.cancelled() => {}
- },
- None => tokio::select! {
- _ = self.token_global.cancelled() => {},
- },
+ pub async fn cancelled(&mut self) {
+ if self.service_update_receiver.is_none() {
+ self.service_update_receiver = Some(self.service_update_sender.subscribe());
+ }
+
+ let mut rx = self.service_update_receiver.take().expect("Receiver was set above");
+ let token_global = &self.token_global;
+ let token_local = self.token_local.as_ref().unwrap_or(&self.token_global);
+
+ loop {
+ // We keep checking for service status updates until a token has
+ // been cancelled or this service was deactivated
+ let res = tokio::select! {
+ svc = rx.recv() => svc.ok(),
+ _ = token_global.cancelled() => break,
+ _ = token_local.cancelled() => break
+ };
+
+ if let Some(ServiceTransport { svc_id, status }) = res {
+ if svc_id == self.id && status == MadaraServiceStatus::Off {
+ return;
+ }
}
}
}
- /// Check if the service associated to this [ServiceContext] was canceled.
+ /// Checks if the service associated to this [ServiceContext] was cancelled.
+ ///
+ /// This happens after calling [ServiceContext::cancel_local],
+ /// [ServiceContext::cancel_global] or [ServiceContext::service_remove].
///
- /// This happens after calling [ServiceContext::cancel_local] or
- /// [ServiceContext::cancel_global].
+ /// # Limitations
+ ///
+ /// This function should _not_ be used when waiting on potentially
+ /// blocking futures which can be cancelled without entering an invalid
+ /// state. The latter is important, so let's break this down.
+ ///
+ /// - _blocking future_: this is blocking at a [Service] level, not at the
+ /// node level. A blocking task in this sense in a task which prevents a
+ /// service from making progress in its execution, but not necessarily the
+ /// rest of the node. A prime example of this is when you are waiting on
+ /// a channel, and updates to that channel are sparse, or even unique.
+ ///
+ /// - _entering an invalid state_: the entire point of [ServiceContext] is
+ /// to allow services to gracefully shutdown. We do not want to be, for
+ /// example, racing each service against a global cancellation future, as
+ /// not every service might be cancellation safe (we still do this
+ /// somewhat with [SERVICE_GRACE_PERIOD] but this is a last resort and
+ /// should not execute in normal circumstances). Put differently, we do
+ /// not want to stop in the middle of a critical computation before it has
+ /// been saved to disk.
+ ///
+ /// Putting this together, [ServiceContext::is_cancelled] is only suitable
+ /// for checking cancellation alongside tasks which will not block the
+ /// running service, or in very specific circumstances where waiting on a
+ /// blocking future has higher precedence than shutting down the node.
+ ///
+ /// Examples of when to use [ServiceContext::is_cancelled]:
+ ///
+ /// - All your computation does is sleep or tick away a short period of
+ /// time.
+ /// - You are checking for cancellation inside of synchronous code.
+ ///
+ /// If this does not describe your usage, and you are waiting on a blocking
+ /// future, which is cancel-safe and which does not risk putting the node
+ /// in an invalid state if cancelled, then you should be using
+ /// [ServiceContext::cancelled] instead.
#[inline(always)]
pub fn is_cancelled(&self) -> bool {
self.token_global.is_cancelled()
|| self.token_local.as_ref().map(|t| t.is_cancelled()).unwrap_or(false)
- || !self.services.is_active(self.id as u8)
- || self.state() == MadaraState::Shutdown
+ || self.services.status(self.id) == MadaraServiceStatus::Off
+ }
+
+ /// Runs a [Future] until the [Service] associated to this [ServiceContext]
+ /// is cancelled.
+ ///
+ /// This happens after calling [ServiceContext::cancel_local],
+ /// [ServiceContext::cancel_global] or [ServiceContext::service_remove].
+ ///
+ /// # Cancellation safety
+ ///
+ /// It is important that the future you pass to this function is _cancel-
+ /// safe_ as it will be forcefully shutdown if ever the service is cancelled.
+ /// This means your future might be interrupted at _any_ point in its
+ /// execution.
+ ///
+ /// Futures can be considered as cancel-safe in the context of Madara if
+ /// their computation can be interrupted at any point without causing any
+ /// side-effects to the running node.
+ ///
+ /// # Returns
+ ///
+ /// The return value of the future wrapped in [Some], or [None] if the
+ /// service was cancelled.
+ pub async fn run_until_cancelled(&mut self, f: F) -> Option
+ where
+ T: Sized + Send + Sync,
+ F: Future