Skip to content

Commit

Permalink
Make request timeout configurable at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Oct 3, 2024
1 parent bd12d4c commit 960bc0f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
11 changes: 11 additions & 0 deletions lib/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub use self::{
runtime_id::{PublicRuntimeId, SecretRuntimeId},
stats::Stats,
};
use constants::REQUEST_TIMEOUT;
pub use net::stun::NatBehavior;
use request_tracker::RequestTracker;

Expand Down Expand Up @@ -356,6 +357,8 @@ impl Network {
pex.set_enabled(pex_enabled);

let request_tracker = RequestTracker::new();
request_tracker.set_timeout(REQUEST_TIMEOUT);

// TODO: This should be global, not per repo
let response_limiter = Arc::new(Semaphore::new(MAX_UNCHOKED_COUNT));
let stats_tracker = StatsTracker::default();
Expand Down Expand Up @@ -400,6 +403,14 @@ impl Network {

shutdown_peers(peers).await;
}

/// Change the sync protocol request timeout. Useful mostly for testing and benchmarking as the
/// default value should be sufficient for most use cases.
pub fn set_request_timeout(&self, timeout: Duration) {
for (_, holder) in &self.inner.registry.lock().unwrap().repos {
holder.request_tracker.set_timeout(timeout);
}
}
}

pub struct Registration {
Expand Down
26 changes: 21 additions & 5 deletions lib/src/network/request_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod simulation;
mod tests;

use self::graph::{Graph, Key as GraphKey};
use super::{constants::REQUEST_TIMEOUT, message::Request};
use super::message::Request;
use crate::{
collections::HashMap,
crypto::{sign::PublicKey, Hash},
Expand All @@ -16,13 +16,16 @@ use std::{
collections::{hash_map::Entry, VecDeque},
fmt, iter, mem,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use tokio::{select, sync::mpsc, task};
use tokio_stream::StreamExt;
use tokio_util::time::{delay_queue, DelayQueue};
use tracing::{instrument, Instrument, Span};
use xxhash_rust::xxh3::Xxh3Default;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);

/// Keeps track of in-flight requests. Falls back on another peer in case the request failed (due to
/// error response, timeout or disconnection). Evenly distributes the requests between the peers
/// and ensures every request is only sent to one peer at a time.
Expand All @@ -32,13 +35,16 @@ pub(super) struct RequestTracker {
}

impl RequestTracker {
// TODO: Make request timeout configurable
pub fn new() -> Self {
let (this, worker) = build();
task::spawn(worker.run().instrument(Span::current()));
this
}

pub fn set_timeout(&self, timeout: Duration) {
self.command_tx.send(Command::SetTimeout { timeout }).ok();
}

pub fn new_client(
&self,
) -> (
Expand Down Expand Up @@ -252,6 +258,7 @@ struct Worker {
clients: HashMap<ClientId, ClientState>,
requests: Graph<RequestState>,
timer: DelayQueue<(ClientId, MessageKey)>,
timeout: Duration,
}

impl Worker {
Expand All @@ -261,6 +268,7 @@ impl Worker {
clients: HashMap::default(),
requests: Graph::new(),
timer: DelayQueue::new(),
timeout: DEFAULT_TIMEOUT,
}
}

Expand Down Expand Up @@ -306,6 +314,11 @@ impl Worker {
Command::RemoveClient { client_id } => {
self.remove_client(client_id);
}
Command::SetTimeout { timeout } => {
// Note: for simplicity, the new timeout is be applied to future requests only,
// not the ones that've been already scheduled.
self.timeout = timeout;
}
Command::HandleInitial { client_id, request } => {
self.handle_initial(client_id, request);
}
Expand Down Expand Up @@ -482,7 +495,7 @@ impl Worker {

let sender_timer_key = self
.timer
.insert((sender_client_id, request_key), REQUEST_TIMEOUT);
.insert((sender_client_id, request_key), self.timeout);
sender_client_state
.request_tx
.send(node.request().clone())
Expand Down Expand Up @@ -585,7 +598,7 @@ impl Worker {
*node.value_mut() = match initial_state {
InitialRequestState::InFlight => {
let timer_key =
self.timer.insert((client_id, request_key), REQUEST_TIMEOUT);
self.timer.insert((client_id, request_key), self.timeout);
client_state.request_tx.send(node.request().clone()).ok();

RequestState::InFlight {
Expand Down Expand Up @@ -669,7 +682,7 @@ impl Worker {
let sender_client_id = next_client_id;
let sender_timer_key = self
.timer
.insert((next_client_id, request_key), REQUEST_TIMEOUT);
.insert((next_client_id, request_key), self.timeout);
let waiters = mem::take(waiters);

*state = RequestState::InFlight {
Expand Down Expand Up @@ -734,6 +747,9 @@ enum Command {
RemoveClient {
client_id: ClientId,
},
SetTimeout {
timeout: Duration,
},
HandleInitial {
client_id: ClientId,
request: CandidateRequest,
Expand Down
2 changes: 1 addition & 1 deletion lib/src/network/request_tracker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn timeout() {
);

// Wait until the request timeout passes
time::timeout(REQUEST_TIMEOUT + Duration::from_millis(1), &mut work)
time::timeout(DEFAULT_TIMEOUT + Duration::from_millis(1), &mut work)
.await
.ok();

Expand Down
7 changes: 7 additions & 0 deletions lib/tests/malice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#[macro_use]
mod common;

use std::time::Duration;

use common::{actor, Env, Proto, DEFAULT_REPO};
use ouisync::{AccessMode, Error, Repository, StoreError};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -45,6 +47,11 @@ fn block_nonce_tamper() {
env.actor("bob", async move {
let (network, repo, _reg) = actor::setup().await;

// Bob first sends the block requests to Mallory but never receives the correct responses.
// Those requests first need to timeout before Bob retries them to Alice. By default that
// would make this test take too long. Decrease the timeout to make it faster.
network.set_request_timeout(Duration::from_secs(5));

let (alice_id, alice_expected_vv) = mallory_to_bob_rx.recv().await.unwrap();

// Connect to Mallory and wait until index is synced (blocks should be rejected).
Expand Down

0 comments on commit 960bc0f

Please sign in to comment.