From 3374ed3f9f8aa021ff1a829afae4468e251f0fa5 Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Sat, 23 May 2020 18:19:04 +0300 Subject: [PATCH] sync_peer tx request selection rewrite --- ethcore/sync/src/chain/mod.rs | 42 ++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 4ce6acb9e7e..57e26e69b35 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -723,7 +723,7 @@ pub struct ChainSync { sync_start_time: Option, /// Transactions propagation statistics transactions_stats: TransactionsStats, - /// Unfetched transactions + /// Transactions whose hash has been announced, but that we have not fetched unfetched_pooled_transactions: H256FastMap, /// Enable ancient block downloading download_old_blocks: bool, @@ -1176,22 +1176,32 @@ impl ChainSync { } } - // get some peers to give us transaction pool + // get the peer to give us at least some of announced but unfetched transactions if !self.unfetched_pooled_transactions.is_empty() { - if let Some(s) = &mut self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions { + if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions { let now = Instant::now(); let mut new_asking_pooled_transactions = s.iter().copied().collect::>(); - let mut new_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone(); - while new_asking_pooled_transactions.len() <= 256 { - for (hash, mut item) in self.unfetched_pooled_transactions.drain() { - if item.next_fetch < now { - new_asking_pooled_transactions.insert(hash); - item.tries += 1; - if item.tries < 5 { - item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2); - new_unfetched_pooled_transactions.insert(hash, item); - } + let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone(); + for (hash, mut item) in self.unfetched_pooled_transactions.drain() { + if new_asking_pooled_transactions.len() >= 256 { + // can't request any more transactions + break; + } + + // if enough time has passed since last attempt... + if item.next_fetch < now { + // ...queue this hash for requesting + new_asking_pooled_transactions.insert(hash); + item.tries += 1; + + // if we just started asking for it, queue it to be asked later on again + if item.tries < 5 { + item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2); + remaining_unfetched_pooled_transactions.insert(hash, item); + } else { + // ...otherwise we assume this transaction does not exist and remove its hash from request queue + remaining_unfetched_pooled_transactions.remove(&hash); } } } @@ -1199,10 +1209,12 @@ impl ChainSync { let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::>(); SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions); - self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions = Some(new_asking_pooled_transactions); - self.unfetched_pooled_transactions = new_unfetched_pooled_transactions; + self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions); + self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions; return; + } else { + trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id); } }