From d80dd7b4bf971865e1327342978a3d9278cfcc15 Mon Sep 17 00:00:00 2001 From: Aaron Siddhartha Mondal Date: Sun, 24 Nov 2024 01:39:45 +0100 Subject: [PATCH] Make stores and schedulers lists of named specs To migrate, change your config like this (similar for schedulers): ``` // Old: "stores": { "SOMESTORE": { "memory": {} } } // New: "stores": [ { "name": "SOMESTORE", "memory": {} } ] ``` For now this is not a breaking change, but a backwards compatible one. The backwards-compatibility code will be removed after the next release in a breaking change. Closes #834 --- Cargo.lock | 5 +- .../docker-compose/local-storage-cas.json | 11 +- .../docker-compose/scheduler.json | 18 ++- .../docker-compose/worker.json | 15 +- flake.nix | 1 + kubernetes/configmaps/cas.json | 11 +- kubernetes/configmaps/scheduler.json | 18 ++- kubernetes/configmaps/worker.json | 15 +- nativelink-config/BUILD.bazel | 2 + nativelink-config/Cargo.toml | 1 + nativelink-config/examples/basic_cas.json | 18 ++- .../examples/filesystem_cas.json | 22 +-- nativelink-config/examples/redis.json | 19 +-- .../s3_backend_with_local_fast_cas.json | 18 ++- nativelink-config/src/cas_server.rs | 8 +- nativelink-config/src/lib.rs | 144 ++++++++++++++++++ .../tests/backwards_compat_test.rs | 55 +++++++ src/bin/nativelink.rs | 25 ++- 18 files changed, 312 insertions(+), 94 deletions(-) create mode 100644 nativelink-config/tests/backwards_compat_test.rs diff --git a/Cargo.lock b/Cargo.lock index c9609016a..57bc9dd77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1771,6 +1771,7 @@ dependencies = [ "humantime", "pretty_assertions", "serde", + "serde_json", "serde_json5", "shellexpand", ] @@ -2817,9 +2818,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.128" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "indexmap 2.6.0", "itoa", diff --git a/deployment-examples/docker-compose/local-storage-cas.json b/deployment-examples/docker-compose/local-storage-cas.json index e7b622043..09dd656cd 100644 --- a/deployment-examples/docker-compose/local-storage-cas.json +++ b/deployment-examples/docker-compose/local-storage-cas.json @@ -4,8 +4,9 @@ // so objects are compressed, deduplicated and uses some in-memory // optimizations for certain hot paths. { - "stores": { - "CAS_MAIN_STORE": { + "stores": [ + { + "name": "CAS_MAIN_STORE", "compression": { "compression_algorithm": { "lz4": {} @@ -21,8 +22,8 @@ } } } - }, - "AC_MAIN_STORE": { + }, { + "name": "AC_MAIN_STORE", "filesystem": { "content_path": "~/.cache/nativelink/content_path-ac", "temp_path": "~/.cache/nativelink/tmp_path-ac", @@ -32,7 +33,7 @@ } } } - }, + ], "servers": [{ "listener": { "http": { diff --git a/deployment-examples/docker-compose/scheduler.json b/deployment-examples/docker-compose/scheduler.json index 4af051738..e971ab2d9 100644 --- a/deployment-examples/docker-compose/scheduler.json +++ b/deployment-examples/docker-compose/scheduler.json @@ -1,6 +1,7 @@ { - "stores": { - "GRPC_LOCAL_STORE": { + "stores": [ + { + "name": "GRPC_LOCAL_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -9,8 +10,8 @@ ], "store_type": "cas" } - }, - "GRPC_LOCAL_AC_STORE": { + }, { + "name": "GRPC_LOCAL_AC_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -20,9 +21,10 @@ "store_type": "ac" } } - }, - "schedulers": { - "MAIN_SCHEDULER": { + ], + "schedulers": [ + { + "name": "MAIN_SCHEDULER", "simple": { "supported_platform_properties": { "cpu_count": "minimum", @@ -31,7 +33,7 @@ } } } - }, + ], "servers": [{ "listener": { "http": { diff --git a/deployment-examples/docker-compose/worker.json b/deployment-examples/docker-compose/worker.json index 186722e01..7e6d36fc7 100644 --- a/deployment-examples/docker-compose/worker.json +++ b/deployment-examples/docker-compose/worker.json @@ -1,6 +1,7 @@ { - "stores": { - "GRPC_LOCAL_STORE": { + "stores": [ + { + "name": "GRPC_LOCAL_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -9,8 +10,8 @@ ], "store_type": "cas" } - }, - "GRPC_LOCAL_AC_STORE": { + }, { + "name": "GRPC_LOCAL_AC_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -19,8 +20,8 @@ ], "store_type": "ac" } - }, - "WORKER_FAST_SLOW_STORE": { + }, { + "name": "WORKER_FAST_SLOW_STORE", "fast_slow": { "fast": { "filesystem": { @@ -39,7 +40,7 @@ } } } - }, + ], "workers": [{ "local": { "worker_api_endpoint": { diff --git a/flake.nix b/flake.nix index eed093dee..26d3ab3e4 100644 --- a/flake.nix +++ b/flake.nix @@ -502,6 +502,7 @@ pkgs.cosign pkgs.kubectl pkgs.kubernetes-helm + pkgs.kubectx pkgs.cilium-cli pkgs.vale pkgs.trivy diff --git a/kubernetes/configmaps/cas.json b/kubernetes/configmaps/cas.json index e26b9c1cb..3487a7074 100644 --- a/kubernetes/configmaps/cas.json +++ b/kubernetes/configmaps/cas.json @@ -2,8 +2,9 @@ // `~/.cache/nativelink`. When this location is mounted as a PersistentVolume // it persists the cache across restarts. { - "stores": { - "CAS_MAIN_STORE": { + "stores": [ + { + "name": "CAS_MAIN_STORE", "existence_cache": { "backend": { "compression": { @@ -23,8 +24,8 @@ } } } - }, - "AC_MAIN_STORE": { + }, { + "name": "AC_MAIN_STORE", "completeness_checking": { "backend": { "filesystem": { @@ -43,7 +44,7 @@ } } } - }, + ], "servers": [{ "listener": { "http": { diff --git a/kubernetes/configmaps/scheduler.json b/kubernetes/configmaps/scheduler.json index e41060209..0341efb14 100644 --- a/kubernetes/configmaps/scheduler.json +++ b/kubernetes/configmaps/scheduler.json @@ -1,6 +1,7 @@ { - "stores": { - "GRPC_LOCAL_STORE": { + "stores": [ + { + "name": "GRPC_LOCAL_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -9,8 +10,8 @@ ], "store_type": "cas" } - }, - "GRPC_LOCAL_AC_STORE": { + }, { + "name": "GRPC_LOCAL_AC_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -20,9 +21,10 @@ "store_type": "ac" } } - }, - "schedulers": { - "MAIN_SCHEDULER": { + ], + "schedulers": [ + { + "name": "MAIN_SCHEDULER", // TODO(adams): use the right scheduler because reclient doesn't use the cached results? // TODO(adams): max_bytes_per_stream "simple": { @@ -46,7 +48,7 @@ } } } - }, + ], "servers": [{ "listener": { "http": { diff --git a/kubernetes/configmaps/worker.json b/kubernetes/configmaps/worker.json index 2a3d2911d..c5b8b1545 100644 --- a/kubernetes/configmaps/worker.json +++ b/kubernetes/configmaps/worker.json @@ -1,6 +1,7 @@ { - "stores": { - "GRPC_LOCAL_STORE": { + "stores": [ + { + "name": "GRPC_LOCAL_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -9,8 +10,8 @@ ], "store_type": "cas" } - }, - "GRPC_LOCAL_AC_STORE": { + }, { + "name": "GRPC_LOCAL_AC_STORE", // Note: This file is used to test GRPC store. "grpc": { "instance_name": "main", @@ -19,8 +20,8 @@ ], "store_type": "ac" } - }, - "WORKER_FAST_SLOW_STORE": { + }, { + "name": "WORKER_FAST_SLOW_STORE", "fast_slow": { "fast": { "filesystem": { @@ -39,7 +40,7 @@ } } } - }, + ], "workers": [{ "local": { "worker_api_endpoint": { diff --git a/nativelink-config/BUILD.bazel b/nativelink-config/BUILD.bazel index 3d36b2998..0a6f99924 100644 --- a/nativelink-config/BUILD.bazel +++ b/nativelink-config/BUILD.bazel @@ -31,6 +31,7 @@ rust_test_suite( name = "integration", timeout = "short", srcs = [ + "tests/backwards_compat_test.rs", "tests/deserialization_test.rs", ], deps = [ @@ -40,6 +41,7 @@ rust_test_suite( "@crates//:humantime", "@crates//:pretty_assertions", "@crates//:serde", + "@crates//:serde_json", "@crates//:serde_json5", ], ) diff --git a/nativelink-config/Cargo.toml b/nativelink-config/Cargo.toml index c4881854b..6a5ad22f4 100644 --- a/nativelink-config/Cargo.toml +++ b/nativelink-config/Cargo.toml @@ -12,3 +12,4 @@ shellexpand = { version = "3.1.0", default-features = false, features = ["base-0 [dev-dependencies] pretty_assertions = { version = "1.4.1", features = ["std"] } +serde_json = "1.0.133" diff --git a/nativelink-config/examples/basic_cas.json b/nativelink-config/examples/basic_cas.json index 173951deb..ff8f03449 100644 --- a/nativelink-config/examples/basic_cas.json +++ b/nativelink-config/examples/basic_cas.json @@ -1,6 +1,7 @@ { - "stores": { - "AC_MAIN_STORE": { + "stores": [ + { + "name": "AC_MAIN_STORE", "filesystem": { "content_path": "/tmp/nativelink/data-worker-test/content_path-ac", "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-ac", @@ -9,8 +10,8 @@ "max_bytes": 1000000000, } } - }, - "WORKER_FAST_SLOW_STORE": { + }, { + "name": "WORKER_FAST_SLOW_STORE", "fast_slow": { // "fast" must be a "filesystem" store because the worker uses it to make // hardlinks on disk to a directory where the jobs are running. @@ -34,9 +35,10 @@ } } } - }, - "schedulers": { - "MAIN_SCHEDULER": { + ], + "schedulers": [ + { + "name": "MAIN_SCHEDULER", "simple": { "supported_platform_properties": { "cpu_count": "minimum", @@ -61,7 +63,7 @@ } } } - }, + ], "workers": [{ "local": { "worker_api_endpoint": { diff --git a/nativelink-config/examples/filesystem_cas.json b/nativelink-config/examples/filesystem_cas.json index 6f43bf6be..067f41fc2 100644 --- a/nativelink-config/examples/filesystem_cas.json +++ b/nativelink-config/examples/filesystem_cas.json @@ -4,8 +4,9 @@ // so objects are compressed, deduplicated and uses some in-memory // optimizations for certain hot paths. { - "stores": { - "FS_CONTENT_STORE": { + "stores": [ + { + "name": "FS_CONTENT_STORE", "compression": { "compression_algorithm": { "lz4": {} @@ -21,8 +22,8 @@ } } } - }, - "CAS_MAIN_STORE": { + }, { + "name": "CAS_MAIN_STORE", "verify": { "backend": { // Because we are using a dedup store, we can bypass small objects @@ -78,8 +79,8 @@ "verify_size": true, "verify_hash": true } - }, - "AC_MAIN_STORE": { + }, { + "name": "AC_MAIN_STORE", "filesystem": { "content_path": "/tmp/nativelink/data/content_path-ac", "temp_path": "/tmp/nativelink/data/tmp_path-ac", @@ -89,9 +90,10 @@ } } } - }, - "schedulers": { - "MAIN_SCHEDULER": { + ], + "schedulers": [ + { + "name": "MAIN_SCHEDULER", "simple": { "supported_platform_properties": { "cpu_count": "minimum", @@ -112,7 +114,7 @@ } } } - }, + ], "servers": [{ "listener": { "http": { diff --git a/nativelink-config/examples/redis.json b/nativelink-config/examples/redis.json index 1f06fe560..a990c28e9 100644 --- a/nativelink-config/examples/redis.json +++ b/nativelink-config/examples/redis.json @@ -1,18 +1,19 @@ { - "stores": { - "CAS_FAST_SLOW_STORE": { + "stores": [ + { + "name": "CAS_FAST_SLOW_STORE", "redis_store": { "addresses": ["redis://127.0.0.1:6379/"], "mode": "cluster" } - }, - "AC_FAST_SLOW_STORE": { + }, { + "name": "AC_FAST_SLOW_STORE", "redis_store": { "addresses": ["redis://127.0.0.1:6379/"], "mode": "cluster" } - }, - "AC_MAIN_STORE": { + }, { + "name": "AC_MAIN_STORE", "completeness_checking": { "backend": { "ref_store": { @@ -25,8 +26,8 @@ } } } - }, - "CAS_MAIN_STORE": { + }, { + "name": "CAS_MAIN_STORE", "existence_cache": { "backend": { "compression": { @@ -42,7 +43,7 @@ } } } - }, + ], "servers": [ { "listener": { diff --git a/nativelink-config/examples/s3_backend_with_local_fast_cas.json b/nativelink-config/examples/s3_backend_with_local_fast_cas.json index d74dd910d..031812969 100644 --- a/nativelink-config/examples/s3_backend_with_local_fast_cas.json +++ b/nativelink-config/examples/s3_backend_with_local_fast_cas.json @@ -1,6 +1,7 @@ { - "stores": { - "CAS_MAIN_STORE": { + "stores": [ + { + "name": "CAS_MAIN_STORE", "verify": { "backend": { "dedup": { @@ -70,8 +71,8 @@ "verify_size": true, "hash_verification_function": "sha256" } - }, - "AC_MAIN_STORE": { + }, { + "name": "AC_MAIN_STORE", "fast_slow": { "fast": { "memory": { @@ -104,9 +105,10 @@ } } } - }, - "schedulers": { - "MAIN_SCHEDULER": { + ], + "schedulers": [ + { + "name": "MAIN_SCHEDULER", "simple": { "supported_platform_properties": { "cpu_count": "minimum", @@ -127,7 +129,7 @@ } } } - }, + ], "servers": [{ "listener": { "http": { diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index a258b6867..5191b1d58 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -16,14 +16,14 @@ use std::collections::HashMap; use serde::Deserialize; -use crate::schedulers::SchedulerSpec; use crate::serde_utils::{ convert_data_size_with_shellexpand, convert_duration_with_shellexpand, convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand, convert_optional_string_with_shellexpand, convert_string_with_shellexpand, convert_vec_string_with_shellexpand, }; -use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreRefName, StoreSpec}; +use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreRefName}; +use crate::{SchedulerConfigs, StoreConfigs}; /// Name of the scheduler. This type will be used when referencing a /// scheduler in the `CasConfig::schedulers`'s map key. @@ -725,7 +725,7 @@ pub struct GlobalConfig { pub struct CasConfig { /// List of stores available to use in this config. /// The keys can be used in other configs when needing to reference a store. - pub stores: HashMap, + pub stores: StoreConfigs, /// Worker configurations used to execute jobs. pub workers: Option>, @@ -733,7 +733,7 @@ pub struct CasConfig { /// List of schedulers available to use in this config. /// The keys can be used in other configs when needing to reference a /// scheduler. - pub schedulers: Option>, + pub schedulers: Option, /// Servers to setup for this process. pub servers: Vec, diff --git a/nativelink-config/src/lib.rs b/nativelink-config/src/lib.rs index 0607e28c5..2bcd45cf5 100644 --- a/nativelink-config/src/lib.rs +++ b/nativelink-config/src/lib.rs @@ -16,3 +16,147 @@ pub mod cas_server; pub mod schedulers; pub mod serde_utils; pub mod stores; + +use std::any::type_name; +use std::collections::HashMap; +use std::fmt; +use std::marker::PhantomData; + +use serde::de::{MapAccess, SeqAccess, Visitor}; +use serde::{Deserialize, Deserializer}; + +#[derive(Debug, Clone, Deserialize)] +pub struct NamedConfig { + pub name: String, + #[serde(flatten)] + pub spec: Spec, +} + +pub type StoreConfig = NamedConfig; +pub type SchedulerConfig = NamedConfig; + +// TODO(aaronmondal): Remove all the iterator impls and the Deserializer once we +// fully migrate to the new config schema. +pub type StoreConfigs = NamedConfigs; +pub type SchedulerConfigs = NamedConfigs; + +#[derive(Debug)] +pub struct NamedConfigs(pub Vec>); + +impl NamedConfigs { + pub fn iter(&self) -> std::slice::Iter<'_, NamedConfig> { + self.0.iter() + } +} + +impl IntoIterator for NamedConfigs { + type Item = NamedConfig; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl<'a, T> IntoIterator for &'a NamedConfigs { + type Item = &'a NamedConfig; + type IntoIter = std::slice::Iter<'a, NamedConfig>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + +struct NamedConfigsVisitor { + phantom: PhantomData, +} + +impl NamedConfigsVisitor { + fn new() -> Self { + NamedConfigsVisitor { + phantom: PhantomData, + } + } +} + +impl<'de, T: Deserialize<'de>> Visitor<'de> for NamedConfigsVisitor { + type Value = NamedConfigs; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a sequence or map of named configs") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut vec = Vec::new(); + while let Some(config) = seq.next_element()? { + vec.push(config); + } + Ok(NamedConfigs(vec)) + } + + fn visit_map(self, mut access: M) -> Result + where + M: MapAccess<'de>, + { + let config_type = if type_name::().contains("StoreSpec") { + "stores" + } else if type_name::().contains("SchedulerSpec") { + "schedulers" + } else { + "stores and schedulers" + }; + eprintln!( + r#" +WARNING: Using deprecated map format for {config_type}. Please migrate to the new array format: + + // Old: + "stores": {{ + "SOMESTORE": {{ + "memory": {{}} + }} + }}, + "schedulers": {{ + "SOMESCHEDULER": {{ + "simple": {{}} + }} + }} + + // New: + "stores": [ + {{ + "name": "SOMESTORE", + "memory": {{}} + }} + ], + "schedulers": [ + {{ + "name": "SOMESCHEDULER", + "simple": {{}} + }} + ] +"# + ); + + let mut map = HashMap::new(); + while let Some((key, value)) = access.next_entry()? { + map.insert(key, value); + } + Ok(NamedConfigs( + map.into_iter() + .map(|(name, spec)| NamedConfig { name, spec }) + .collect(), + )) + } +} + +impl<'de, T: Deserialize<'de>> Deserialize<'de> for NamedConfigs { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(NamedConfigsVisitor::new()) + } +} diff --git a/nativelink-config/tests/backwards_compat_test.rs b/nativelink-config/tests/backwards_compat_test.rs new file mode 100644 index 000000000..26261e843 --- /dev/null +++ b/nativelink-config/tests/backwards_compat_test.rs @@ -0,0 +1,55 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use nativelink_config::NamedConfigs; +use serde::Deserialize; +use serde_json::json; + +#[derive(Debug, Deserialize, PartialEq)] +struct DummySpec { + memory: HashMap, +} + +#[test] +fn test_configs_deserialization() { + let expected = json!([ + { + "name": "store1", + "memory": {} + }, + { + "name": "store2", + "memory": {} + } + ]); + + let config: NamedConfigs = serde_json::from_value(expected.clone()).unwrap(); + let mut new_stores: Vec<_> = config.into_iter().collect(); + new_stores.sort_by(|a, b| a.name.cmp(&b.name)); + + let old_format = json!({ + "store1": { "memory": {} }, + "store2": { "memory": {} } + }); + + let config: NamedConfigs = serde_json::from_value(old_format).unwrap(); + let mut transformed_stores: Vec<_> = config.into_iter().collect(); + transformed_stores.sort_by(|a, b| a.name.cmp(&b.name)); + + assert_eq!(new_stores.len(), transformed_stores.len()); + assert_eq!(new_stores[0].name, transformed_stores[0].name); + assert_eq!(new_stores[1].name, transformed_stores[1].name); +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index f30c8a2df..63e2dfe3d 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -30,6 +30,7 @@ use nativelink_config::cas_server::{ CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, ServerConfig, WorkerConfig, }; use nativelink_config::stores::ConfigDigestHashFunction; +use nativelink_config::{SchedulerConfig, StoreConfig}; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_metric::{ MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, RootMetricsComponent, @@ -183,11 +184,11 @@ async fn inner_main( { let mut health_registry_lock = health_registry_builder.lock().await; - for (name, store_cfg) in cfg.stores { + for StoreConfig { name, spec } in cfg.stores { let health_component_name = format!("stores/{name}"); let mut health_register_store = health_registry_lock.sub_builder(&health_component_name); - let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store)) + let store = store_factory(&spec, &store_manager, Some(&mut health_register_store)) .await .err_tip(|| format!("Failed to create store '{name}'"))?; store_manager.add_store(&name, store); @@ -196,17 +197,15 @@ async fn inner_main( let mut action_schedulers = HashMap::new(); let mut worker_schedulers = HashMap::new(); - if let Some(schedulers_cfg) = cfg.schedulers { - for (name, scheduler_cfg) in schedulers_cfg { - let (maybe_action_scheduler, maybe_worker_scheduler) = - scheduler_factory(&scheduler_cfg, &store_manager) - .err_tip(|| format!("Failed to create scheduler '{name}'"))?; - if let Some(action_scheduler) = maybe_action_scheduler { - action_schedulers.insert(name.clone(), action_scheduler.clone()); - } - if let Some(worker_scheduler) = maybe_worker_scheduler { - worker_schedulers.insert(name.clone(), worker_scheduler.clone()); - } + for SchedulerConfig { name, spec } in cfg.schedulers.iter().flatten() { + let (maybe_action_scheduler, maybe_worker_scheduler) = + scheduler_factory(spec, &store_manager) + .err_tip(|| format!("Failed to create scheduler '{name}'"))?; + if let Some(action_scheduler) = maybe_action_scheduler { + action_schedulers.insert(name.clone(), action_scheduler.clone()); + } + if let Some(worker_scheduler) = maybe_worker_scheduler { + worker_schedulers.insert(name.clone(), worker_scheduler.clone()); } }