Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
🚸 Move timeout configuration to options
Browse files Browse the repository at this point in the history
  • Loading branch information
TilBlechschmidt committed May 15, 2021
1 parent d506a00 commit e7b4119
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 167 deletions.
11 changes: 9 additions & 2 deletions core/src/libraries/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
mod capabilities;
mod healthcheck;
mod timeout;

pub mod constants;
pub mod keys;
pub mod lua;

use std::{num::ParseIntError, time::Duration};

pub use capabilities::*;
pub use healthcheck::{wait_for, wait_for_key};
pub use timeout::Timeout;

/// Splits the input string into two parts at the first occurence of the separator
pub fn split_into_two(input: &str, separator: &'static str) -> Option<(String, String)> {
Expand Down Expand Up @@ -47,3 +47,10 @@ pub fn load_config(name: &str) -> String {
pub fn replace_config_variable(config: String, key: &str, value: &str) -> String {
config.replace(&format!("{{{{{}}}}}", key), &value.to_string())
}

/// Parses a Duration from a string containing seconds.
/// Useful for command line parsing
pub fn parse_seconds(src: &str) -> Result<Duration, ParseIntError> {
let seconds = src.parse::<u64>()?;
Ok(Duration::from_secs(seconds))
}
66 changes: 0 additions & 66 deletions core/src/libraries/helpers/timeout.rs

This file was deleted.

6 changes: 1 addition & 5 deletions core/src/services/api/schema/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
types::{InputDictionaryEntry, Orchestrator, Session, SessionState, Timeouts},
types::{InputDictionaryEntry, Orchestrator, Session, SessionState},
GqlContext,
};
use crate::libraries::helpers::keys;
Expand All @@ -26,10 +26,6 @@ impl Query {

#[graphql_object(context = GqlContext)]
impl Query {
fn timeouts() -> Timeouts {
Timeouts::new()
}

async fn sessions(
&self,
state: Option<SessionState>,
Expand Down
48 changes: 0 additions & 48 deletions core/src/services/api/schema/types/config.rs

This file was deleted.

2 changes: 0 additions & 2 deletions core/src/services/api/schema/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod config;
mod orchestrator;
mod session;

pub use config::*;
pub use orchestrator::*;
pub use session::*;
5 changes: 4 additions & 1 deletion core/src/services/manager/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Options;
use crate::libraries::{lifecycle::HeartBeat, resources::ResourceManagerProvider};
use crate::libraries::{metrics::MetricsProcessor, resources::DefaultResourceManager};
use opentelemetry::Context as TelemetryContext;
Expand All @@ -8,16 +9,18 @@ pub struct Context {
resource_manager: DefaultResourceManager,
pub heart_beat: HeartBeat<Self, DefaultResourceManager>,
pub metrics: MetricsProcessor<Self, DefaultResourceManager>,
pub options: Options,
}

impl Context {
pub async fn new(redis_url: String) -> Self {
pub async fn new(redis_url: String, options: Options) -> Self {
let heart_beat = HeartBeat::new();

Self {
resource_manager: DefaultResourceManager::new(redis_url),
heart_beat,
metrics: MetricsProcessor::default(),
options,
}
}
}
Expand Down
16 changes: 13 additions & 3 deletions core/src/services/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Endpoint for handling session creation
use std::time::Duration;

use super::SharedOptions;
use crate::libraries::{
helpers::constants,
helpers::{constants, parse_seconds},
tracing::{self, constants::service},
};
use crate::libraries::{
Expand All @@ -23,7 +25,7 @@ use context::Context;
use jobs::SessionHandlerJob;
pub use structures::*;

#[derive(Debug, StructOpt)]
#[derive(Debug, StructOpt, Clone)]
/// Endpoint for handling session creation
///
/// Handles scheduling and provisioning lifecycle of sessions.
Expand All @@ -39,6 +41,14 @@ pub struct Options {
/// Port on which the HTTP server will listen
#[structopt(short, long, default_value = constants::PORT_MANAGER)]
port: u16,

/// Maximum duration to wait in queue; in seconds
#[structopt(long, env, default_value = "600", parse(try_from_str = parse_seconds))]
timeout_queue: Duration,

/// Maximum duration to wait for a session to become provisioned; in seconds
#[structopt(long, env, default_value = "300", parse(try_from_str = parse_seconds))]
timeout_provisioning: Duration,
}

pub async fn run(shared_options: SharedOptions, options: Options) -> Result<()> {
Expand All @@ -51,7 +61,7 @@ pub async fn run(shared_options: SharedOptions, options: Options) -> Result<()>
let (mut heart, _) = Heart::new();

let endpoint = format!("{}:{}", options.host, options.port);
let context = Context::new(shared_options.redis).await;
let context = Context::new(shared_options.redis, options.clone()).await;
let scheduler = JobScheduler::default();

let status_job = StatusServer::new(&scheduler, shared_options.status_server);
Expand Down
32 changes: 20 additions & 12 deletions core/src/services/manager/tasks/create_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::libraries::metrics::MetricsEntry;
use crate::libraries::resources::{ResourceManager, ResourceManagerProvider};
use crate::libraries::tracing::StringPropagator;
use crate::libraries::{
helpers::{keys, parse_browser_string, CapabilitiesRequest, Timeout},
helpers::{keys, parse_browser_string, CapabilitiesRequest},
tracing::global_tracer,
};
use crate::with_redis_resource;
Expand Down Expand Up @@ -62,13 +62,21 @@ pub async fn create_session(

// Create startup routine
let startup = async {
let timeout_queue = manager.context.options.timeout_queue;
let timeout_provisioning = manager.context.options.timeout_provisioning;

// Request a slot
subtasks::request_slot(&mut con, &session_id, &manager.context.capabilities)
.with_context(telemetry_context.clone())
.await?;
subtasks::request_slot(
&mut con,
&session_id,
&manager.context.capabilities,
timeout_queue,
)
.with_context(telemetry_context.clone())
.await?;

// Await scheduling & startup
subtasks::await_scheduling(&mut con, &session_id)
subtasks::await_provisioning(&mut con, &session_id, timeout_provisioning)
.with_context(telemetry_context.clone())
.await?;

Expand Down Expand Up @@ -110,7 +118,7 @@ pub async fn create_session(
mod subtasks {
use super::*;
use crate::libraries::tracing::constants::trace;
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

pub async fn create_new_session<C: ConnectionLike + AsyncCommands>(
con: &mut C,
Expand Down Expand Up @@ -177,10 +185,10 @@ mod subtasks {
con: &mut C,
session_id: &str,
capabilities: &str,
timeout: Duration,
) -> Result<(), RequestError> {
let tracer = global_tracer();
let mut span = tracer.start("Request slot");
let queue_timeout = Timeout::Queue.get(con).await;

let mut queues: Vec<String> = helpers::match_orchestrators(con, capabilities)
.await?
Expand All @@ -202,7 +210,7 @@ mod subtasks {
span.add_event("Entering queue".to_string(), vec![]);

let response: Option<(String, String)> = con
.blpop(queues, queue_timeout)
.blpop(queues, timeout.as_secs() as usize)
.map_err(RequestError::RedisError)
.await?;

Expand Down Expand Up @@ -252,18 +260,18 @@ mod subtasks {
}
}

pub async fn await_scheduling<C: ConnectionLike + AsyncCommands>(
pub async fn await_provisioning<C: ConnectionLike + AsyncCommands>(
con: &mut C,
session_id: &str,
timeout: Duration,
) -> Result<(), RequestError> {
let tracer = global_tracer();
let mut span = tracer.start("Await scheduling");
let mut span = tracer.start("Await provisioning");

let scheduling_timeout = Timeout::Scheduling.get(con).await;
let scheduling_key = keys::session::orchestrator(session_id);

let res: Option<()> = con
.brpoplpush(&scheduling_key, &scheduling_key, scheduling_timeout)
.brpoplpush(&scheduling_key, &scheduling_key, timeout.as_secs() as usize)
.map_err(RequestError::RedisError)
.await?;

Expand Down
12 changes: 10 additions & 2 deletions core/src/services/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::SharedOptions;
use crate::{
libraries::{
helpers::constants,
helpers::{constants, parse_seconds},
net::{advertise::ServiceAdvertisorJob, discovery::ServiceDescriptor},
recording::VideoQualityPreset,
tracing::{self, constants::service},
Expand All @@ -14,7 +14,7 @@ use anyhow::Result;
use jatsl::{schedule, JobScheduler, StatusServer};
use log::{info, warn};
use opentelemetry::trace::TraceContextExt;
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};
use structopt::StructOpt;
use uuid::Uuid;

Expand Down Expand Up @@ -107,6 +107,14 @@ pub struct Options {
/// https://trac.ffmpeg.org/wiki/Encode/H.264
#[structopt(long, env, default_value = "450000")]
max_bitrate: usize,

/// Maximum idle duration after which the node self-terminates; in seconds
#[structopt(long, env, default_value = "300", parse(try_from_str = parse_seconds))]
timeout_idle: Duration,

/// Maximum duration the WebDriver may take to become responsive; in seconds
#[structopt(long, env, default_value = "30", parse(try_from_str = parse_seconds))]
timeout_driver_startup: Duration,
}

impl Options {
Expand Down
Loading

0 comments on commit e7b4119

Please sign in to comment.