diff --git a/consensus/core/src/commit_syncer.rs b/consensus/core/src/commit_syncer.rs index 5110d95d002ae..33e6352a6ff86 100644 --- a/consensus/core/src/commit_syncer.rs +++ b/consensus/core/src/commit_syncer.rs @@ -373,6 +373,15 @@ impl CommitSyncer { inner: Arc>, commit_range: CommitRange, ) -> (CommitIndex, Vec, Vec) { + // Individual request base timeout. + const TIMEOUT: Duration = Duration::from_secs(10); + // Max per-request timeout will be base timeout times a multiplier. + // At the extreme, this means there will be 120s timeout to fetch max_blocks_per_fetch blocks. + const MAX_TIMEOUT_MULTIPLIER: u32 = 12; + // timeout * max number of targets should be reasonably small, so the + // system can adjust to slow network or large data sizes quickly. + const MAX_NUM_TARGETS: usize = 24; + let mut timeout_multiplier = 0; let _timer = inner .context .metrics @@ -381,7 +390,8 @@ impl CommitSyncer { .start_timer(); info!("Starting to fetch commits in {commit_range:?} ...",); loop { - let mut all_authorities = inner + // Attempt to fetch commits and blocks through min(committee size, MAX_NUM_TARGETS) peers. + let mut target_authorities = inner .context .committee .authorities() @@ -393,21 +403,42 @@ impl CommitSyncer { } }) .collect_vec(); - all_authorities.shuffle(&mut ThreadRng::default()); - for authority in all_authorities { - match Self::fetch_once(inner.clone(), authority, commit_range.clone()).await { - Ok((commits, blocks)) => { + target_authorities.shuffle(&mut ThreadRng::default()); + target_authorities.truncate(MAX_NUM_TARGETS); + // Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER. + timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER); + let request_timeout = TIMEOUT * timeout_multiplier; + // Give enough overall timeout for fetching commits and blocks. + // - Timeout for fetching commits and commit certifying blocks. + // - Timeout for fetching blocks referenced by the commits. + // - Time spent on pipelining requests to fetch blocks. + // - Another headroom to allow fetch_once() to timeout gracefully if possible. + let fetch_timeout = request_timeout * 4; + // Try fetching from selected target authority. + for authority in target_authorities { + match tokio::time::timeout( + fetch_timeout, + Self::fetch_once( + inner.clone(), + authority, + commit_range.clone(), + request_timeout, + ), + ) + .await + { + Ok(Ok((commits, blocks))) => { info!("Finished fetching commits in {commit_range:?}",); return (commit_range.end(), commits, blocks); } - Err(e) => { + Ok(Err(e)) => { let hostname = inner .context .committee .authority(authority) .hostname .clone(); - warn!("Failed to fetch from {hostname}: {}", e); + warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e); let error: &'static str = e.into(); inner .context @@ -417,6 +448,22 @@ impl CommitSyncer { .with_label_values(&[&hostname, error]) .inc(); } + Err(_) => { + let hostname = inner + .context + .committee + .authority(authority) + .hostname + .clone(); + warn!("Timed out fetching {commit_range:?} from {authority}",); + inner + .context + .metrics + .node_metrics + .commit_sync_fetch_once_errors + .with_label_values(&[&hostname, "FetchTimeout"]) + .inc(); + } } } } @@ -429,10 +476,8 @@ impl CommitSyncer { inner: Arc>, target_authority: AuthorityIndex, commit_range: CommitRange, + timeout: Duration, ) -> ConsensusResult<(Vec, Vec)> { - const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(15); - const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(120); - let _timer = inner .context .metrics @@ -443,11 +488,7 @@ impl CommitSyncer { // 1. Fetch commits in the commit range from the target authority. let (serialized_commits, serialized_blocks) = inner .network_client - .fetch_commits( - target_authority, - commit_range.clone(), - FETCH_COMMITS_TIMEOUT, - ) + .fetch_commits(target_authority, commit_range.clone(), timeout) .await?; // 2. Verify the response contains blocks that can certify the last returned commit, @@ -462,15 +503,18 @@ impl CommitSyncer { // 3. Fetch blocks referenced by the commits, from the same authority. let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect(); + let num_chunks = block_refs + .len() + .div_ceil(inner.context.parameters.max_blocks_per_fetch) + as u32; let mut requests: FuturesOrdered<_> = block_refs .chunks(inner.context.parameters.max_blocks_per_fetch) .enumerate() .map(|(i, request_block_refs)| { - let i = i as u32; let inner = inner.clone(); async move { // 4. Send out pipelined fetch requests to avoid overloading the target authority. - sleep(Duration::from_millis(200) * i).await; + sleep(timeout * i as u32 / num_chunks).await; // TODO: add some retries. let serialized_blocks = inner .network_client @@ -478,7 +522,7 @@ impl CommitSyncer { target_authority, request_block_refs.to_vec(), vec![], - FETCH_BLOCKS_TIMEOUT, + timeout, ) .await?; // 5. Verify the same number of blocks are returned as requested.