diff --git a/core/src/libraries/helpers/mod.rs b/core/src/libraries/helpers/mod.rs index 19659472..0d96df73 100644 --- a/core/src/libraries/helpers/mod.rs +++ b/core/src/libraries/helpers/mod.rs @@ -4,15 +4,15 @@ mod capabilities; mod healthcheck; -mod timeout; pub mod constants; pub mod keys; pub mod lua; +use std::{num::ParseIntError, time::Duration}; + pub use capabilities::*; pub use healthcheck::{wait_for, wait_for_key}; -pub use timeout::Timeout; /// Splits the input string into two parts at the first occurence of the separator pub fn split_into_two(input: &str, separator: &'static str) -> Option<(String, String)> { @@ -47,3 +47,10 @@ pub fn load_config(name: &str) -> String { pub fn replace_config_variable(config: String, key: &str, value: &str) -> String { config.replace(&format!("{{{{{}}}}}", key), &value.to_string()) } + +/// Parses a Duration from a string containing seconds. +/// Useful for command line parsing +pub fn parse_seconds(src: &str) -> Result { + let seconds = src.parse::()?; + Ok(Duration::from_secs(seconds)) +} diff --git a/core/src/libraries/helpers/timeout.rs b/core/src/libraries/helpers/timeout.rs deleted file mode 100644 index a492d996..00000000 --- a/core/src/libraries/helpers/timeout.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Defaults and database accessors for timeout values - -use log::{info, trace}; -use redis::{aio::ConnectionLike, AsyncCommands}; -use std::fmt; - -/// Timeout value accessors in seconds -#[derive(Debug)] -pub enum Timeout { - /// How long a session creation request may wait for an orchestrator - Queue, - /// Maximum duration a session may take to be scheduled by an orchestrator - Scheduling, - /// How long a session may take to start up - /// - /// Note that this can include the time it takes for a provisioner like Kubernetes to assign a pod! - /// If this timeout is hit it might indicate a scheduling problem in your cluster. - NodeStartup, - /// How long the WebDriver executable may take to become responsive - DriverStartup, - /// Maximum idle duration of a session - SessionTermination, - /// Interval at which orphaned slots are reclaimed - SlotReclaimInterval, -} - -impl fmt::Display for Timeout { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:?}", self) - } -} - -impl Timeout { - /// Default timeout value if nothing is set in the database - fn default(&self) -> usize { - match *self { - // Manager - Timeout::Queue => 600, - Timeout::Scheduling => 60, - Timeout::NodeStartup => 120, - // Node - Timeout::DriverStartup => 30, - Timeout::SessionTermination => 900, - // Orchestrator - Timeout::SlotReclaimInterval => 300, - } - } - - /// Retrieve either a value set in the database or initializes it to the default - pub async fn get(&self, con: &mut C) -> usize { - let key = format!("{}", self).to_lowercase(); - - trace!("Reading timeout {}", key); - let timeout: Option = con.hget("timeouts", &key).await.ok(); - - match timeout { - Some(timeout) => timeout, - None => { - info!("Initializing timeout {} to default value", key); - let default = self.default(); - let _: Option<()> = con.hset("timeouts", key, default).await.ok(); - default - } - } - } -} diff --git a/core/src/services/api/schema/query.rs b/core/src/services/api/schema/query.rs index 1d982597..21ab34cf 100644 --- a/core/src/services/api/schema/query.rs +++ b/core/src/services/api/schema/query.rs @@ -1,5 +1,5 @@ use super::{ - types::{InputDictionaryEntry, Orchestrator, Session, SessionState, Timeouts}, + types::{InputDictionaryEntry, Orchestrator, Session, SessionState}, GqlContext, }; use crate::libraries::helpers::keys; @@ -26,10 +26,6 @@ impl Query { #[graphql_object(context = GqlContext)] impl Query { - fn timeouts() -> Timeouts { - Timeouts::new() - } - async fn sessions( &self, state: Option, diff --git a/core/src/services/api/schema/types/config.rs b/core/src/services/api/schema/types/config.rs deleted file mode 100644 index ff37be9b..00000000 --- a/core/src/services/api/schema/types/config.rs +++ /dev/null @@ -1,48 +0,0 @@ -use super::super::GqlContext; -use crate::libraries::helpers::Timeout; -use juniper::graphql_object; - -pub struct Timeouts; - -impl Timeouts { - pub fn new() -> Self { - Self {} - } -} - -#[graphql_object(context = GqlContext)] -impl Timeouts { - pub async fn queue(context: &GqlContext) -> i32 { - Timeout::Queue.get(&mut *context.redis.lock().await).await as i32 - } - - pub async fn scheduling(context: &GqlContext) -> i32 { - Timeout::Scheduling - .get(&mut *context.redis.lock().await) - .await as i32 - } - - pub async fn nodeStartup(context: &GqlContext) -> i32 { - Timeout::NodeStartup - .get(&mut *context.redis.lock().await) - .await as i32 - } - - pub async fn driverStartup(context: &GqlContext) -> i32 { - Timeout::DriverStartup - .get(&mut *context.redis.lock().await) - .await as i32 - } - - pub async fn sessionTermination(context: &GqlContext) -> i32 { - Timeout::SessionTermination - .get(&mut *context.redis.lock().await) - .await as i32 - } - - pub async fn slotReclaimInterval(context: &GqlContext) -> i32 { - Timeout::SlotReclaimInterval - .get(&mut *context.redis.lock().await) - .await as i32 - } -} diff --git a/core/src/services/api/schema/types/mod.rs b/core/src/services/api/schema/types/mod.rs index 8d1bc3cd..d9be41dd 100644 --- a/core/src/services/api/schema/types/mod.rs +++ b/core/src/services/api/schema/types/mod.rs @@ -1,7 +1,5 @@ -mod config; mod orchestrator; mod session; -pub use config::*; pub use orchestrator::*; pub use session::*; diff --git a/core/src/services/manager/context.rs b/core/src/services/manager/context.rs index 4a072784..bd2571e5 100644 --- a/core/src/services/manager/context.rs +++ b/core/src/services/manager/context.rs @@ -1,3 +1,4 @@ +use super::Options; use crate::libraries::{lifecycle::HeartBeat, resources::ResourceManagerProvider}; use crate::libraries::{metrics::MetricsProcessor, resources::DefaultResourceManager}; use opentelemetry::Context as TelemetryContext; @@ -8,16 +9,18 @@ pub struct Context { resource_manager: DefaultResourceManager, pub heart_beat: HeartBeat, pub metrics: MetricsProcessor, + pub options: Options, } impl Context { - pub async fn new(redis_url: String) -> Self { + pub async fn new(redis_url: String, options: Options) -> Self { let heart_beat = HeartBeat::new(); Self { resource_manager: DefaultResourceManager::new(redis_url), heart_beat, metrics: MetricsProcessor::default(), + options, } } } diff --git a/core/src/services/manager/mod.rs b/core/src/services/manager/mod.rs index 7075f7ca..7ac42635 100644 --- a/core/src/services/manager/mod.rs +++ b/core/src/services/manager/mod.rs @@ -1,8 +1,10 @@ //! Endpoint for handling session creation +use std::time::Duration; + use super::SharedOptions; use crate::libraries::{ - helpers::constants, + helpers::{constants, parse_seconds}, tracing::{self, constants::service}, }; use crate::libraries::{ @@ -23,7 +25,7 @@ use context::Context; use jobs::SessionHandlerJob; pub use structures::*; -#[derive(Debug, StructOpt)] +#[derive(Debug, StructOpt, Clone)] /// Endpoint for handling session creation /// /// Handles scheduling and provisioning lifecycle of sessions. @@ -39,6 +41,14 @@ pub struct Options { /// Port on which the HTTP server will listen #[structopt(short, long, default_value = constants::PORT_MANAGER)] port: u16, + + /// Maximum duration to wait in queue; in seconds + #[structopt(long, env, default_value = "600", parse(try_from_str = parse_seconds))] + timeout_queue: Duration, + + /// Maximum duration to wait for a session to become provisioned; in seconds + #[structopt(long, env, default_value = "300", parse(try_from_str = parse_seconds))] + timeout_provisioning: Duration, } pub async fn run(shared_options: SharedOptions, options: Options) -> Result<()> { @@ -51,7 +61,7 @@ pub async fn run(shared_options: SharedOptions, options: Options) -> Result<()> let (mut heart, _) = Heart::new(); let endpoint = format!("{}:{}", options.host, options.port); - let context = Context::new(shared_options.redis).await; + let context = Context::new(shared_options.redis, options.clone()).await; let scheduler = JobScheduler::default(); let status_job = StatusServer::new(&scheduler, shared_options.status_server); diff --git a/core/src/services/manager/tasks/create_session.rs b/core/src/services/manager/tasks/create_session.rs index 5adda1da..f0203404 100644 --- a/core/src/services/manager/tasks/create_session.rs +++ b/core/src/services/manager/tasks/create_session.rs @@ -3,7 +3,7 @@ use crate::libraries::metrics::MetricsEntry; use crate::libraries::resources::{ResourceManager, ResourceManagerProvider}; use crate::libraries::tracing::StringPropagator; use crate::libraries::{ - helpers::{keys, parse_browser_string, CapabilitiesRequest, Timeout}, + helpers::{keys, parse_browser_string, CapabilitiesRequest}, tracing::global_tracer, }; use crate::with_redis_resource; @@ -62,13 +62,21 @@ pub async fn create_session( // Create startup routine let startup = async { + let timeout_queue = manager.context.options.timeout_queue; + let timeout_provisioning = manager.context.options.timeout_provisioning; + // Request a slot - subtasks::request_slot(&mut con, &session_id, &manager.context.capabilities) - .with_context(telemetry_context.clone()) - .await?; + subtasks::request_slot( + &mut con, + &session_id, + &manager.context.capabilities, + timeout_queue, + ) + .with_context(telemetry_context.clone()) + .await?; // Await scheduling & startup - subtasks::await_scheduling(&mut con, &session_id) + subtasks::await_provisioning(&mut con, &session_id, timeout_provisioning) .with_context(telemetry_context.clone()) .await?; @@ -110,7 +118,7 @@ pub async fn create_session( mod subtasks { use super::*; use crate::libraries::tracing::constants::trace; - use std::collections::HashMap; + use std::{collections::HashMap, time::Duration}; pub async fn create_new_session( con: &mut C, @@ -177,10 +185,10 @@ mod subtasks { con: &mut C, session_id: &str, capabilities: &str, + timeout: Duration, ) -> Result<(), RequestError> { let tracer = global_tracer(); let mut span = tracer.start("Request slot"); - let queue_timeout = Timeout::Queue.get(con).await; let mut queues: Vec = helpers::match_orchestrators(con, capabilities) .await? @@ -202,7 +210,7 @@ mod subtasks { span.add_event("Entering queue".to_string(), vec![]); let response: Option<(String, String)> = con - .blpop(queues, queue_timeout) + .blpop(queues, timeout.as_secs() as usize) .map_err(RequestError::RedisError) .await?; @@ -252,18 +260,18 @@ mod subtasks { } } - pub async fn await_scheduling( + pub async fn await_provisioning( con: &mut C, session_id: &str, + timeout: Duration, ) -> Result<(), RequestError> { let tracer = global_tracer(); - let mut span = tracer.start("Await scheduling"); + let mut span = tracer.start("Await provisioning"); - let scheduling_timeout = Timeout::Scheduling.get(con).await; let scheduling_key = keys::session::orchestrator(session_id); let res: Option<()> = con - .brpoplpush(&scheduling_key, &scheduling_key, scheduling_timeout) + .brpoplpush(&scheduling_key, &scheduling_key, timeout.as_secs() as usize) .map_err(RequestError::RedisError) .await?; diff --git a/core/src/services/node/mod.rs b/core/src/services/node/mod.rs index c1cc5cbd..441512fd 100644 --- a/core/src/services/node/mod.rs +++ b/core/src/services/node/mod.rs @@ -3,7 +3,7 @@ use super::SharedOptions; use crate::{ libraries::{ - helpers::constants, + helpers::{constants, parse_seconds}, net::{advertise::ServiceAdvertisorJob, discovery::ServiceDescriptor}, recording::VideoQualityPreset, tracing::{self, constants::service}, @@ -14,7 +14,7 @@ use anyhow::Result; use jatsl::{schedule, JobScheduler, StatusServer}; use log::{info, warn}; use opentelemetry::trace::TraceContextExt; -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; use structopt::StructOpt; use uuid::Uuid; @@ -107,6 +107,14 @@ pub struct Options { /// https://trac.ffmpeg.org/wiki/Encode/H.264 #[structopt(long, env, default_value = "450000")] max_bitrate: usize, + + /// Maximum idle duration after which the node self-terminates; in seconds + #[structopt(long, env, default_value = "300", parse(try_from_str = parse_seconds))] + timeout_idle: Duration, + + /// Maximum duration the WebDriver may take to become responsive; in seconds + #[structopt(long, env, default_value = "30", parse(try_from_str = parse_seconds))] + timeout_driver_startup: Duration, } impl Options { diff --git a/core/src/services/node/tasks/driver.rs b/core/src/services/node/tasks/driver.rs index d49c8493..9c5d233e 100644 --- a/core/src/services/node/tasks/driver.rs +++ b/core/src/services/node/tasks/driver.rs @@ -1,11 +1,6 @@ use super::super::{structs::NodeError, Context}; -use crate::libraries::resources::{ResourceManager, ResourceManagerProvider}; use crate::libraries::tracing::global_tracer; -use crate::with_redis_resource; -use crate::{ - libraries::helpers::{wait_for, Timeout}, - services::node::context::StartupContext, -}; +use crate::{libraries::helpers::wait_for, services::node::context::StartupContext}; use jatsl::TaskManager; use log::{error, info}; use opentelemetry::trace::StatusCode; @@ -38,8 +33,7 @@ pub async fn start_driver(manager: TaskManager) -> Result<(), No let mut span = global_tracer() .start_with_context("Start driver", manager.context.telemetry_context.clone()); - let mut con = with_redis_resource!(manager); - let startup_timeout = Timeout::DriverStartup.get(&mut con).await; + let startup_timeout = manager.context.options.timeout_driver_startup; let driver = &manager.context.options.driver; let driver_port = manager.context.options.driver_port; let browser = &manager.context.options.browser; @@ -98,7 +92,10 @@ mod subtasks { } } - pub async fn await_driver_startup(timeout: usize, driver_port: u16) -> Result<(), NodeError> { + pub async fn await_driver_startup( + timeout: Duration, + driver_port: u16, + ) -> Result<(), NodeError> { let span = global_tracer().start("Awaiting driver startup"); info!("Awaiting driver startup"); @@ -106,7 +103,7 @@ mod subtasks { let url = format!("http://{}/status", socket_addr); let telemetry_context = TelemetryContext::current_with_span(span); - match wait_for(&url, Duration::from_secs(timeout as u64)) + match wait_for(&url, timeout) .with_context(telemetry_context.clone()) .await { diff --git a/core/src/services/node/tasks/init_service.rs b/core/src/services/node/tasks/init_service.rs index febe419f..b1ad68f9 100644 --- a/core/src/services/node/tasks/init_service.rs +++ b/core/src/services/node/tasks/init_service.rs @@ -6,15 +6,11 @@ use crate::libraries::{ tracing::global_tracer, }; use crate::with_redis_resource; -use crate::{ - libraries::helpers::{keys, Timeout}, - services::node::context::StartupContext, -}; +use crate::{libraries::helpers::keys, services::node::context::StartupContext}; use jatsl::TaskManager; use log::error; use opentelemetry::trace::{Span, Tracer}; use redis::AsyncCommands; -use std::time::Duration; pub async fn initialize_service( manager: TaskManager, @@ -26,9 +22,7 @@ pub async fn initialize_service( manager.context.telemetry_context.clone(), ); - let (heart, heart_stone) = Heart::with_lifetime(Duration::from_secs( - Timeout::SessionTermination.get(&mut con).await as u64, - )); + let (heart, heart_stone) = Heart::with_lifetime(manager.context.options.timeout_idle); if let Some(storage_directory) = &manager.context.options.storage_directory { span.add_event("Load storage information".to_string(), vec![]); diff --git a/core/src/services/orchestrator/core/context.rs b/core/src/services/orchestrator/core/context.rs index b6063fc4..5f42a961 100644 --- a/core/src/services/orchestrator/core/context.rs +++ b/core/src/services/orchestrator/core/context.rs @@ -3,8 +3,8 @@ use crate::libraries::resources::DefaultResourceManager; use crate::libraries::{helpers::keys, resources::ResourceManagerProvider}; use crate::libraries::{lifecycle::HeartBeat, net::discovery::ServiceDiscovery}; use opentelemetry::Context as TelemetryContext; -use std::ops::Deref; use std::sync::Arc; +use std::{ops::Deref, time::Duration}; #[derive(Clone)] pub struct Context { @@ -12,6 +12,7 @@ pub struct Context { pub heart_beat: HeartBeat, pub provisioner: Arc>, pub provisioner_type: ProvisionerType, + pub timeout_startup: Duration, pub id: String, } @@ -20,6 +21,7 @@ impl Context { provisioner_type: ProvisionerType, provisioner: P, redis_url: String, + timeout_startup: Duration, id: String, ) -> Self { let heart_beat = HeartBeat::new(); @@ -38,6 +40,7 @@ impl Context { provisioner: Arc::new(Box::new(provisioner)), provisioner_type, id, + timeout_startup, } } diff --git a/core/src/services/orchestrator/core/jobs/slot_reclaim.rs b/core/src/services/orchestrator/core/jobs/slot_reclaim.rs index 9ad8be83..2fa3ba11 100644 --- a/core/src/services/orchestrator/core/jobs/slot_reclaim.rs +++ b/core/src/services/orchestrator/core/jobs/slot_reclaim.rs @@ -1,5 +1,5 @@ use super::super::Context; -use crate::libraries::helpers::{lua::terminate_session, Timeout}; +use crate::libraries::helpers::lua::terminate_session; use crate::libraries::resources::{ResourceManager, ResourceManagerProvider}; use crate::with_shared_redis_resource; use anyhow::Result; @@ -21,8 +21,7 @@ impl Job for SlotReclaimJob { async fn execute(&self, manager: TaskManager) -> Result<()> { let mut con = with_shared_redis_resource!(manager); - let interval_seconds = Timeout::SlotReclaimInterval.get(&mut con).await as u64; - let mut interval = time::interval(Duration::from_secs(interval_seconds)); + let mut interval = time::interval(Duration::from_secs(300)); let orchestrator_id = manager.context.id.clone(); manager.ready().await; diff --git a/core/src/services/orchestrator/core/mod.rs b/core/src/services/orchestrator/core/mod.rs index d5c77a5e..73bdacd7 100644 --- a/core/src/services/orchestrator/core/mod.rs +++ b/core/src/services/orchestrator/core/mod.rs @@ -1,5 +1,8 @@ +use std::time::Duration; + use super::super::SharedOptions; use crate::libraries::{ + helpers::parse_seconds, lifecycle::Heart, net::discovery::ServiceDiscovery, tracing::{self, constants::service}, @@ -32,6 +35,10 @@ pub struct Options { /// Number of concurrent sessions #[structopt(long, env)] pub slot_count: usize, + + /// Maximum duration to wait for a session start up; in seconds + #[structopt(long, env, default_value = "300", parse(try_from_str = parse_seconds))] + timeout_startup: Duration, } pub async fn start( @@ -52,6 +59,7 @@ pub async fn start( provisioner_type, provisioner, shared_options.redis, + options.timeout_startup, options.id, ) .await; diff --git a/core/src/services/orchestrator/core/tasks/provision.rs b/core/src/services/orchestrator/core/tasks/provision.rs index 6d64d83c..9a2508b3 100644 --- a/core/src/services/orchestrator/core/tasks/provision.rs +++ b/core/src/services/orchestrator/core/tasks/provision.rs @@ -55,13 +55,13 @@ pub async fn provision_session(manager: TaskManager) -> Res mod subtasks { use super::*; use crate::libraries::{ - helpers::{wait_for, wait_for_key, Timeout}, + helpers::{wait_for, wait_for_key}, net::discovery::ServiceDescriptor, tracing::global_tracer, }; use futures::TryFutureExt; use opentelemetry::trace::{Span, Tracer}; - use std::time::{Duration, Instant}; + use std::time::Instant; use uuid::Uuid; pub async fn provision_session( @@ -100,8 +100,7 @@ mod subtasks { ) -> Result<()> { let tracer = global_tracer(); let mut span = tracer.start("Await session startup"); - let timeout = Timeout::NodeStartup.get(con).await as u64; - let timeout_duration = Duration::from_secs(timeout); + let timeout_duration = context.timeout_startup; let healthcheck_start = Instant::now(); // Wait for the node to send heart-beats diff --git a/distribution/kubernetes/chart/templates/configMap.yaml b/distribution/kubernetes/chart/templates/configMap.yaml index 43973dbb..59b6879f 100644 --- a/distribution/kubernetes/chart/templates/configMap.yaml +++ b/distribution/kubernetes/chart/templates/configMap.yaml @@ -103,6 +103,10 @@ data: value: "{{ .Values.recording.quality.crf }}" - name: MAX_BITRATE value: "{{ .Values.recording.quality.maxBitrate }}" + - name: TIMEOUT_IDLE + value: "{{ .Values.config.timeouts.idle }}" + - name: TIMEOUT_DRIVER_STARTUP + value: "{{ .Values.config.timeouts.webdriver }}" - name: HOST valueFrom: fieldRef: diff --git a/distribution/kubernetes/chart/templates/manager.yaml b/distribution/kubernetes/chart/templates/manager.yaml index 84451cd8..79d692c8 100644 --- a/distribution/kubernetes/chart/templates/manager.yaml +++ b/distribution/kubernetes/chart/templates/manager.yaml @@ -56,6 +56,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: TIMEOUT_QUEUE + value: "{{ .Values.config.timeouts.queue }}" + - name: TIMEOUT_PROVISIONING + value: "{{ .Values.config.timeouts.provisioning }}" - name: RUST_LOG value: {{ .Values.logLevel }} {{- if .Values.telemetry.enabled }} diff --git a/distribution/kubernetes/chart/templates/orchestrator.yaml b/distribution/kubernetes/chart/templates/orchestrator.yaml index d4b13ed0..6780a38f 100644 --- a/distribution/kubernetes/chart/templates/orchestrator.yaml +++ b/distribution/kubernetes/chart/templates/orchestrator.yaml @@ -67,6 +67,8 @@ spec: value: "/configs" - name: WEBGRID_RESOURCE_PREFIX value: "{{ include "web-grid.fullname" . }}-" + - name: TIMEOUT_STARTUP + value: "{{ .Values.config.timeouts.startup }}" - name: NAMESPACE valueFrom: fieldRef: diff --git a/distribution/kubernetes/chart/values.yaml b/distribution/kubernetes/chart/values.yaml index 08a9caa3..57c2504e 100644 --- a/distribution/kubernetes/chart/values.yaml +++ b/distribution/kubernetes/chart/values.yaml @@ -9,6 +9,17 @@ service: port: 80 config: + timeouts: + # Maximum duration to wait in queue + queue: 600 + # How long a session pod may take to become responsive + startup: 300 + # Maximum duration the WebDriver may take to become responsive + webdriver: 30 + # Maximum duration to wait for an orchestrator to provision a node + provisioning: 300 + # Maximum idle duration after which sessions self-terminate + idle: 300 orchestrator: # Number of concurrent sessions allowed *per* orchestrator maxSessions: 5