Skip to content

Commit

Permalink
Add cookie to RootNode request/response
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Oct 1, 2024
1 parent 57fbe20 commit 0779811
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 77 deletions.
101 changes: 72 additions & 29 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,18 @@ impl Inner {
self.vault.monitor.responses_received.increment(1);

match response {
Response::RootNode(proof, block_presence, debug) => {
persistable.push(PersistableResponse::RootNode(
Response::RootNode {
proof,
cookie,
block_presence,
debug,
} => {
persistable.push(PersistableResponse::RootNode {
proof,
cookie,
block_presence,
debug,
));
});
}
Response::InnerNodes(nodes, _, debug) => {
persistable.push(PersistableResponse::InnerNodes(nodes.into(), debug));
Expand All @@ -131,9 +137,11 @@ impl Inner {
Response::BlockOffer(block_id, debug) => {
ephemeral.push(EphemeralResponse::BlockOffer(block_id, debug));
}
Response::RootNodeError(writer_id, _) => {
Response::RootNodeError {
writer_id, cookie, ..
} => {
self.request_tracker
.failure(MessageKey::RootNode(writer_id));
.failure(MessageKey::RootNode(writer_id, cookie));
}
Response::ChildNodesError(hash, _, _) => {
self.request_tracker.failure(MessageKey::ChildNodes(hash));
Expand Down Expand Up @@ -172,8 +180,13 @@ impl Inner {

for response in batch.drain(..) {
match response {
PersistableResponse::RootNode(proof, block_presence, debug) => {
self.handle_root_node(&mut writer, proof, block_presence, debug)
PersistableResponse::RootNode {
proof,
cookie,
block_presence,
debug,
} => {
self.handle_root_node(&mut writer, proof, cookie, block_presence, debug)
.await?;
}
PersistableResponse::InnerNodes(nodes, debug) => {
Expand Down Expand Up @@ -219,6 +232,7 @@ impl Inner {
vv = ?proof.version_vector,
hash = ?proof.hash,
?block_presence,
cookie = cookie,
?debug_payload,
),
err(Debug)
Expand All @@ -227,6 +241,7 @@ impl Inner {
&self,
writer: &mut ClientWriter,
proof: UntrustedProof,
cookie: u64,
block_presence: MultiBlockPresence,
debug_payload: DebugResponse,
) -> Result<()> {
Expand All @@ -249,19 +264,21 @@ impl Inner {

tracing::debug!("Received root node - {status}");

if status.request_children() {
self.request_tracker.success(
MessageKey::RootNode(writer_id),
vec![PendingRequest {
self.request_tracker.success(
MessageKey::RootNode(writer_id, cookie),
status
.request_children()
.then_some(PendingRequest {
request: Request::ChildNodes(
hash,
ResponseDisambiguator::new(block_presence),
debug_payload.follow_up(),
),
block_presence,
}],
);
}
})
.into_iter()
.collect(),
);

Ok(())
}
Expand Down Expand Up @@ -438,12 +455,15 @@ impl Inner {
// before the block is marked as offered and only then we proceed with requesting it. This
// can take arbitrarily long (even indefinitely).
//
// By requesting the root node again immediatelly, we ensure that the missing block is
// By requesting the root node again immediately, we ensure that the missing block is
// requested as soon as possible.
fn refresh_branches(&self, branches: impl IntoIterator<Item = PublicKey>) {
for branch_id in branches {
self.request_tracker
.initial(Request::RootNode(branch_id, DebugRequest::start()));
for writer_id in branches {
self.request_tracker.initial(Request::RootNode {
writer_id,
cookie: next_root_node_cookie(),
debug: DebugRequest::start(),
});
}
}

Expand Down Expand Up @@ -495,7 +515,12 @@ enum EphemeralResponse {

/// Response whose processing requires write access to the store.
enum PersistableResponse {
RootNode(UntrustedProof, MultiBlockPresence, DebugResponse),
RootNode {
proof: UntrustedProof,
cookie: u64,
block_presence: MultiBlockPresence,
debug: DebugResponse,
},
InnerNodes(CacheHash<InnerNodes>, DebugResponse),
LeafNodes(CacheHash<LeafNodes>, DebugResponse),
Block(Block, DebugResponse),
Expand All @@ -518,6 +543,22 @@ fn block_offer_state(root_node_state: NodeState) -> Option<BlockOfferState> {
}
}

// Generate cookie for the next `RootNode` request. This value is guaranteed to be non-zero (zero is
// used for unsolicited responses).
fn next_root_node_cookie() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};

static NEXT: AtomicU64 = AtomicU64::new(1);

loop {
let next = NEXT.fetch_add(1, Ordering::Relaxed);

if next != 0 {
break next;
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -544,17 +585,18 @@ mod tests {
// Receive invalid root node from the remote replica.
let invalid_write_keys = Keypair::random();
inner
.handle_persistable_responses(&mut vec![PersistableResponse::RootNode(
Proof::new(
.handle_persistable_responses(&mut vec![PersistableResponse::RootNode {
proof: Proof::new(
remote_id,
VersionVector::first(remote_id),
*EMPTY_INNER_HASH,
&invalid_write_keys,
)
.into(),
MultiBlockPresence::None,
DebugResponse::unsolicited(),
)])
block_presence: MultiBlockPresence::None,
cookie: 0,
debug: DebugResponse::unsolicited(),
}])
.await
.unwrap();

Expand All @@ -578,17 +620,18 @@ mod tests {
let remote_id = PublicKey::random();

inner
.handle_persistable_responses(&mut vec![PersistableResponse::RootNode(
Proof::new(
.handle_persistable_responses(&mut vec![PersistableResponse::RootNode {
proof: Proof::new(
remote_id,
VersionVector::new(),
*EMPTY_INNER_HASH,
&secrets.write_keys,
)
.into(),
MultiBlockPresence::None,
DebugResponse::unsolicited(),
)])
block_presence: MultiBlockPresence::None,
cookie: 0,
debug: DebugResponse::unsolicited(),
}])
.await
.unwrap();

Expand Down
24 changes: 21 additions & 3 deletions lib/src/network/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub(crate) enum Request {
/// Request the latest root node of the given writer.
RootNode(PublicKey, DebugRequest),
RootNode {
writer_id: PublicKey,
// This value is returned in the response without change. It can be used to distinguish
// multiple otherwise identical requests. This is useful because multiple identical
// `RootNode` requests can yield different responses.
cookie: u64,
debug: DebugRequest,
},
/// Request child nodes of the given parent node.
ChildNodes(Hash, ResponseDisambiguator, DebugRequest),
/// Request block with the given id.
Expand All @@ -38,9 +45,20 @@ pub(crate) enum Response {
/// Send the latest root node of this replica to another replica.
/// NOTE: This is both a response and notification - the server sends this as a response to
/// `Request::RootNode` but also on its own when it detects change in the repo.
RootNode(UntrustedProof, MultiBlockPresence, DebugResponse),
RootNode {
proof: UntrustedProof,
block_presence: MultiBlockPresence,
// If this is a reponse, the `cookie` value from the request. If this is a notification,
// zero.
cookie: u64,
debug: DebugResponse,
},
/// Send that a RootNode request failed
RootNodeError(PublicKey, DebugResponse),
RootNodeError {
writer_id: PublicKey,
cookie: u64,
debug: DebugResponse,
},
/// Send inner nodes.
InnerNodes(InnerNodes, ResponseDisambiguator, DebugResponse),
/// Send leaf nodes.
Expand Down
6 changes: 4 additions & 2 deletions lib/src/network/request_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ pub(super) struct PendingRequest {
/// Key identifying a request and its corresponding response.
#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd, Debug)]
pub(super) enum MessageKey {
RootNode(PublicKey),
RootNode(PublicKey, u64),
ChildNodes(Hash),
Block(BlockId),
}

impl<'a> From<&'a Request> for MessageKey {
fn from(request: &'a Request) -> Self {
match request {
Request::RootNode(writer_id, _) => MessageKey::RootNode(*writer_id),
Request::RootNode {
writer_id, cookie, ..
} => MessageKey::RootNode(*writer_id, *cookie),
Request::ChildNodes(hash, _, _) => MessageKey::ChildNodes(*hash),
Request::Block(block_id, _) => MessageKey::Block(*block_id),
}
Expand Down
61 changes: 39 additions & 22 deletions lib/src/network/request_tracker/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ impl Simulation {
// In case of failure, cancel the request so it can be retried without it
// triggering assertion failure.
let key = match response {
Response::RootNodeError(writer_id, _) => {
Some(MessageKey::RootNode(writer_id))
}
Response::RootNodeError {
writer_id, cookie, ..
} => Some(MessageKey::RootNode(writer_id, cookie)),
Response::ChildNodesError(hash, _, _) => {
Some(MessageKey::ChildNodes(hash))
}
Response::BlockError(block_id, _) => Some(MessageKey::Block(block_id)),
Response::RootNode(..)
Response::RootNode { .. }
| Response::InnerNodes(..)
| Response::LeafNodes(..)
| Response::Block(..)
Expand Down Expand Up @@ -189,22 +189,27 @@ impl TestClient {

fn handle_response(&mut self, response: Response, snapshot: &mut Snapshot) {
match response {
Response::RootNode(proof, block_presence, debug_payload) => {
Response::RootNode {
proof,
block_presence,
cookie,
debug,
} => {
let requests = snapshot
.insert_root(proof.hash, block_presence)
.then_some(PendingRequest {
request: Request::ChildNodes(
proof.hash,
ResponseDisambiguator::new(block_presence),
debug_payload.follow_up(),
debug.follow_up(),
),
block_presence,
})
.into_iter()
.collect();

self.tracker_client
.success(MessageKey::RootNode(proof.writer_id), requests);
.success(MessageKey::RootNode(proof.writer_id, cookie), requests);
}
Response::InnerNodes(nodes, _disambiguator, debug_payload) => {
let parent_hash = nodes.hash();
Expand Down Expand Up @@ -248,8 +253,11 @@ impl TestClient {
self.tracker_client
.success(MessageKey::Block(block_id), vec![]);
}
Response::RootNodeError(writer_id, _debug_payload) => {
self.tracker_client.failure(MessageKey::RootNode(writer_id));
Response::RootNodeError {
writer_id, cookie, ..
} => {
self.tracker_client
.failure(MessageKey::RootNode(writer_id, cookie));
}
Response::ChildNodesError(hash, _disambiguator, _debug_payload) => {
self.tracker_client.failure(MessageKey::ChildNodes(hash));
Expand Down Expand Up @@ -286,11 +294,12 @@ impl TestServer {
&write_keys,
));

let outbox = [Response::RootNode(
proof.clone(),
snapshot.root_summary().block_presence,
DebugResponse::unsolicited(),
)]
let outbox = [Response::RootNode {
proof: proof.clone(),
block_presence: snapshot.root_summary().block_presence,
cookie: 0,
debug: DebugResponse::unsolicited(),
}]
.into();

Self {
Expand All @@ -303,7 +312,11 @@ impl TestServer {

fn handle_request(&mut self, request: Request) {
match request {
Request::RootNode(writer_id, debug_payload) => {
Request::RootNode {
writer_id,
cookie,
debug,
} => {
if writer_id == self.writer_id {
let proof = Proof::new(
writer_id,
Expand All @@ -312,14 +325,18 @@ impl TestServer {
&self.write_keys,
);

self.outbox.push_back(Response::RootNode(
proof.into(),
self.snapshot.root_summary().block_presence,
debug_payload.reply(),
));
self.outbox.push_back(Response::RootNode {
proof: proof.into(),
block_presence: self.snapshot.root_summary().block_presence,
cookie,
debug: debug.reply(),
});
} else {
self.outbox
.push_back(Response::RootNodeError(writer_id, debug_payload.reply()));
self.outbox.push_back(Response::RootNodeError {
writer_id,
cookie,
debug: debug.reply(),
});
}
}
Request::ChildNodes(hash, disambiguator, debug_payload) => {
Expand Down
Loading

0 comments on commit 0779811

Please sign in to comment.