Skip to content

Commit

Permalink
Fix both RequestTracker tests
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Sep 26, 2024
1 parent b23dba8 commit 9822844
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 39 deletions.
54 changes: 44 additions & 10 deletions lib/src/network/request_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl RequestTracker {
}

#[cfg_attr(not(test), expect(dead_code))]
pub fn new_client(&self) -> (RequestTrackerClient, mpsc::UnboundedReceiver<Request>) {
pub fn new_client(&self) -> (RequestTrackerClient, mpsc::UnboundedReceiver<SendPermit>) {
let client_id = ClientId::next();
let (request_tx, request_rx) = mpsc::unbounded_channel();

Expand Down Expand Up @@ -108,6 +108,17 @@ impl Drop for RequestTrackerClient {
}
}

/// Permit to send the specified request. Contains also the block presence as reported by the peer
/// who sent the response that triggered this request. That is mostly useful for diagnostics and
/// testing.
#[derive(Debug)]
pub(super) struct SendPermit {
#[cfg_attr(not(test), expect(dead_code))]
pub request: Request,
#[cfg_attr(not(test), expect(dead_code))]
pub block_presence: MultiBlockPresence,
}

/// Key identifying a request and its corresponding response.
#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd, Debug)]
pub(super) enum MessageKey {
Expand Down Expand Up @@ -214,13 +225,20 @@ impl Worker {
}

#[instrument(skip(self, request_tx))]
fn insert_client(&mut self, client_id: ClientId, request_tx: mpsc::UnboundedSender<Request>) {
fn insert_client(
&mut self,
client_id: ClientId,
request_tx: mpsc::UnboundedSender<SendPermit>,
) {
#[cfg(test)]
tracing::debug!("insert_client");

self.clients.insert(client_id, ClientState::new(request_tx));
}

#[instrument(skip(self))]
fn remove_client(&mut self, client_id: ClientId) {
#[cfg(test)]
tracing::debug!("remove_client");

let Some(client_state) = self.clients.remove(&client_id) else {
Expand All @@ -239,7 +257,9 @@ impl Worker {
request: Request,
block_presence: MultiBlockPresence,
) {
// tracing::debug!("handle_initial");
#[cfg(test)]
tracing::debug!("handle_initial");

self.insert_request(client_id, request, block_presence, None)
}

Expand All @@ -250,6 +270,7 @@ impl Worker {
request_key: MessageKey,
requests: Vec<(Request, MultiBlockPresence)>,
) {
#[cfg(test)]
tracing::debug!("handle_success");

let node_key = self
Expand Down Expand Up @@ -331,6 +352,7 @@ impl Worker {
request_key: MessageKey,
reason: FailureReason,
) {
#[cfg(test)]
tracing::debug!("handle_failure");

let Some(client_state) = self.clients.get_mut(&client_id) else {
Expand Down Expand Up @@ -388,7 +410,13 @@ impl Worker {
};

client_state.requests.insert(request_key, node_key);
client_state.request_tx.send(node.request().clone()).ok();
client_state
.request_tx
.send(SendPermit {
request: node.request().clone(),
block_presence: *node.block_presence(),
})
.ok();
}
}

Expand All @@ -406,7 +434,7 @@ impl Worker {
return;
};

let (request, state) = node.request_and_value_mut();
let (request, &block_presence, state) = node.parts_mut();

match state {
RequestState::InFlight {
Expand Down Expand Up @@ -435,8 +463,14 @@ impl Worker {
.timer
.insert((next_client_id, MessageKey::from(request)), REQUEST_TIMEOUT);

// Send the request to the new sender.
next_client_state.request_tx.send(request.clone()).ok();
// Send the permit to the new sender.
next_client_state
.request_tx
.send(SendPermit {
request: request.clone(),
block_presence,
})
.ok();

return;
} else {
Expand Down Expand Up @@ -492,7 +526,7 @@ impl ClientId {
enum Command {
InsertClient {
client_id: ClientId,
request_tx: mpsc::UnboundedSender<Request>,
request_tx: mpsc::UnboundedSender<SendPermit>,
},
RemoveClient {
client_id: ClientId,
Expand All @@ -514,12 +548,12 @@ enum Command {
}

struct ClientState {
request_tx: mpsc::UnboundedSender<Request>,
request_tx: mpsc::UnboundedSender<SendPermit>,
requests: HashMap<MessageKey, GraphKey>,
}

impl ClientState {
fn new(request_tx: mpsc::UnboundedSender<Request>) -> Self {
fn new(request_tx: mpsc::UnboundedSender<SendPermit>) -> Self {
Self {
request_tx,
requests: HashMap::default(),
Expand Down
8 changes: 6 additions & 2 deletions lib/src/network/request_tracker/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ impl<T> Node<T> {
&self.request
}

pub fn request_and_value_mut(&mut self) -> (&Request, &mut T) {
(&self.request, &mut self.value)
pub fn block_presence(&self) -> &MultiBlockPresence {
&self.block_presence
}

pub fn parts_mut(&mut self) -> (&Request, &MultiBlockPresence, &mut T) {
(&self.request, &self.block_presence, &mut self.value)
}

pub fn parents(&self) -> impl ExactSizeIterator<Item = Key> + '_ {
Expand Down
82 changes: 62 additions & 20 deletions lib/src/network/request_tracker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use rand::{
seq::SliceRandom,
CryptoRng, Rng, SeedableRng,
};
use std::collections::VecDeque;
use std::collections::{hash_map::Entry, VecDeque};

// Test syncing while peers keep joining and leaving the swarm.
//
Expand Down Expand Up @@ -109,14 +109,10 @@ async fn dynamic_swarm() {

// Test syncing with multiple peers where no peer has all the blocks but every block is present in
// at least one peer.
#[ignore = "duplicate request send checking is too strict"]
#[tokio::test]
async fn missing_blocks() {
crate::test_utils::init_log();

// let seed = rand::random();
let seed = 830380000365750606;
case(seed, 8, 2);
let seed = rand::random();
case(seed, 32, 4);

fn case(seed: u64, max_blocks: usize, max_peers: usize) {
let mut rng = StdRng::seed_from_u64(seed);
Expand Down Expand Up @@ -155,15 +151,19 @@ async fn missing_blocks() {

struct Simulation {
peers: Vec<TestPeer>,
requests: HashSet<MessageKey>,
// All requests sent by live peers. This is used to verify that every request is sent only once
// unless the peer that sent it died or the request failed. In those cases the request may be
// sent by another peer. It's also allowed to sent the same request more than once as long as
// each one has a different block presence.
requests: HashMap<MessageKey, HashSet<MultiBlockPresence>>,
snapshot: Snapshot,
}

impl Simulation {
fn new() -> Self {
Self {
peers: Vec::new(),
requests: HashSet::default(),
requests: HashMap::default(),
snapshot: Snapshot::default(),
}
}
Expand All @@ -188,16 +188,16 @@ impl Simulation {
self.peers.push(TestPeer {
client,
server,
requests: Vec::new(),
requests: HashMap::default(),
});
}

fn remove_peer<R: Rng>(&mut self, rng: &mut R) {
let index = rng.gen_range(0..self.peers.len());
let peer = self.peers.remove(index);

for key in peer.requests {
self.requests.remove(&key);
for (key, block_presence) in peer.requests {
cancel_request(&mut self.requests, key, block_presence);
}
}

Expand All @@ -220,22 +220,49 @@ impl Simulation {

match side {
Side::Client => {
if let Some(request) = peer.client.poll_request() {
if let Some(SendPermit {
request,
block_presence,
}) = peer.client.poll_request()
{
let key = MessageKey::from(&request);

assert!(
self.requests.insert(key),
"request sent more than once: {request:?}"
self.requests.entry(key).or_default().insert(block_presence),
"request sent more than once: {request:?} ({block_presence:?})"
);

peer.requests.push(key);
peer.requests.insert(key, block_presence);
peer.server.handle_request(request);

return true;
}
}
Side::Server => {
if let Some(response) = peer.server.poll_response() {
// In case of failure, cancel the request so it can be retried without it
// triggering assertion failure.
let key = match response {
Response::RootNodeError(writer_id, _) => {
Some(MessageKey::RootNode(writer_id))
}
Response::ChildNodesError(hash, _, _) => {
Some(MessageKey::ChildNodes(hash))
}
Response::BlockError(block_id, _) => Some(MessageKey::Block(block_id)),
Response::RootNode(..)
| Response::InnerNodes(..)
| Response::LeafNodes(..)
| Response::Block(..)
| Response::BlockOffer(..) => None,
};

if let Some(key) = key {
if let Some(block_presence) = peer.requests.get(&key) {
cancel_request(&mut self.requests, key, *block_presence);
}
}

peer.client.handle_response(response, &mut self.snapshot);
return true;
}
Expand All @@ -252,21 +279,36 @@ impl Simulation {
}
}

fn cancel_request(
requests: &mut HashMap<MessageKey, HashSet<MultiBlockPresence>>,
key: MessageKey,
block_presence: MultiBlockPresence,
) {
if let Entry::Occupied(mut entry) = requests.entry(key) {
entry.get_mut().remove(&block_presence);

if entry.get().is_empty() {
entry.remove();
}
}
}

struct TestPeer {
client: TestClient,
server: TestServer,
requests: Vec<MessageKey>,
// All requests sent by this peer.
requests: HashMap<MessageKey, MultiBlockPresence>,
}

struct TestClient {
tracker_client: RequestTrackerClient,
tracker_request_rx: mpsc::UnboundedReceiver<Request>,
tracker_request_rx: mpsc::UnboundedReceiver<SendPermit>,
}

impl TestClient {
fn new(
tracker_client: RequestTrackerClient,
tracker_request_rx: mpsc::UnboundedReceiver<Request>,
tracker_request_rx: mpsc::UnboundedReceiver<SendPermit>,
) -> Self {
Self {
tracker_client,
Expand Down Expand Up @@ -352,7 +394,7 @@ impl TestClient {
};
}

fn poll_request(&mut self) -> Option<Request> {
fn poll_request(&mut self) -> Option<SendPermit> {
self.tracker_request_rx.try_recv().ok()
}
}
Expand Down
10 changes: 6 additions & 4 deletions lib/src/protocol/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ impl Summary {
}
}

/// Checks whether the subtree at `self` is outdated compared to the subtree at `other` in
/// terms of present blocks. That is, whether `other` has some blocks present that `self` is
/// missing.
/// Checks whether the subtree at `self` is outdated compared to the subtree at `other` in terms
/// of completeness and block presence. That is, `self` is considered outdated if it's
/// incomplete (regardless of what `other` is) or if `other` has some blocks present that
/// `self` is missing.
///
/// NOTE: This function is NOT antisymetric, that is, `is_outdated(A, B)` does not imply
/// !is_outdated(B, A)` (and vice-versa).
pub fn is_outdated(&self, other: &Self) -> bool {
self.block_presence.is_outdated(&other.block_presence)
self.state == NodeState::Incomplete
|| self.block_presence.is_outdated(&other.block_presence)
}

pub fn with_state(self, state: NodeState) -> Self {
Expand Down
12 changes: 9 additions & 3 deletions lib/src/protocol/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{MultiBlockPresence, NodeState, SingleBlockPresence, Summary};
use super::{MultiBlockPresence, NodeState, SingleBlockPresence, Summary, EMPTY_INNER_HASH};
use crate::{
crypto::{Hash, Hashable},
protocol::{Block, BlockId, InnerNode, InnerNodes, LeafNode, LeafNodes, INNER_LAYER_COUNT},
Expand Down Expand Up @@ -188,8 +188,14 @@ impl Snapshot {
}
} else {
self.root_hash = hash;
// FIXME: if hash == EMPTY_INNER_HASH we should set this to `Complete`:
self.root_summary = Summary::INCOMPLETE;
self.root_summary = if hash == *EMPTY_INNER_HASH {
Summary {
state: NodeState::Complete,
block_presence: MultiBlockPresence::None,
}
} else {
Summary::INCOMPLETE
};

self.inners.clear();
self.leaves.clear();
Expand Down

0 comments on commit 9822844

Please sign in to comment.