Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
⚡ Prevent session startup desync issue
Browse files Browse the repository at this point in the history
  • Loading branch information
TilBlechschmidt committed Mar 25, 2021
1 parent b81f5f0 commit e1368ff
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
34 changes: 34 additions & 0 deletions core/src/libraries/helpers/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use hyper::{body, Client, Uri};
use log::{debug, trace};
use redis::{aio::ConnectionLike, AsyncCommands, RedisResult};
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
Expand Down Expand Up @@ -51,3 +52,36 @@ pub async fn wait_for(url: &str, timeout_duration: Duration) -> Result<String, (
remaining_duration -= check_interval;
}
}

pub async fn wait_for_key<C: ConnectionLike + AsyncCommands>(
key: &str,
timeout_duration: Duration,
con: &mut C,
) -> Result<(), ()> {
let check_interval = Duration::from_millis(250);
let mut remaining_duration = timeout_duration;

debug!("Awaiting existence of redis key {}", key);

loop {
let result: RedisResult<bool> = con.exists(key).await;

if let Ok(exists) = result {
if exists {
return Ok(());
} else {
trace!("Expected redis key does not exist yet");
}
} else {
trace!("Unable to check existence of redis key! {:?}", result);
}

if remaining_duration.as_secs() == 0 {
debug!("Timeout while waiting for redis key {}", key);
return Err(());
}

sleep(check_interval).await;
remaining_duration -= check_interval;
}
}
2 changes: 1 addition & 1 deletion core/src/libraries/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod lua;

pub use backoff::Backoff;
pub use capabilities::*;
pub use healthcheck::wait_for;
pub use healthcheck::{wait_for, wait_for_key};
pub use timeout::Timeout;

/// Splits the input string into two parts at the first occurence of the separator
Expand Down
27 changes: 22 additions & 5 deletions core/src/services/manager/tasks/create_session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::super::{context::SessionCreationContext, RequestError, SessionReplyValue};
use crate::libraries::helpers::{
keys, parse_browser_string, wait_for, CapabilitiesRequest, Timeout,
keys, parse_browser_string, wait_for, wait_for_key, CapabilitiesRequest, Timeout,
};
use crate::libraries::lifecycle::logging::{LogCode, SessionLogger};
use crate::libraries::metrics::MetricsEntry;
Expand Down Expand Up @@ -256,18 +256,35 @@ mod subtasks {
pub async fn await_healthcheck<C: ConnectionLike + AsyncCommands>(
con: &mut C,
session_id: &str,
) -> Result<String, RequestError> {
) -> Result<(), RequestError> {
let (host, port): (String, String) = con
.hget(keys::session::upstream(session_id), &["host", "port"])
.map_err(RequestError::RedisError)
.await?;

let url = format!("http://{}:{}/status", host, port);
let timeout = Timeout::NodeStartup.get(con).await as u64;

wait_for(&url, Duration::from_secs(timeout))
let timeout_duration = Duration::from_secs(timeout);
let healthcheck_start = Instant::now();

// Wait for the node heartbeat in redis first to save some HTTP calls
wait_for_key(
&keys::session::heartbeat::node(session_id),
timeout_duration,
con,
)
.map_err(|_| RequestError::HealthCheckTimeout)
.await?;

// Spend the remaining timeout duration HTTP polling the webdrivers status endpoint
let remaining_duration =
timeout_duration - Instant::now().duration_since(healthcheck_start);

wait_for(&url, remaining_duration)
.map_err(|_| RequestError::HealthCheckTimeout)
.await
.await?;

Ok(())
}

mod helpers {
Expand Down

0 comments on commit e1368ff

Please sign in to comment.