Skip to content

Commit

Permalink
BFT-465: Twins gossip loop
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 30, 2024
1 parent 9946567 commit 76cfc9c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 17 deletions.
131 changes: 115 additions & 16 deletions node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use super::{Behavior, Node};
use network::{io, Config};
use rand::prelude::SliceRandom;
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::Instrument as _;
use zksync_concurrency::{
ctx::{
self,
channel::{UnboundedReceiver, UnboundedSender},
channel::{self, UnboundedReceiver, UnboundedSender},
},
oneshot, scope,
};
use zksync_consensus_network as network;
use zksync_consensus_roles::validator::{self, Genesis, PublicKey};
use zksync_consensus_storage::testonly::new_store;
use zksync_consensus_roles::validator::{
self, BlockNumber, CommitQC, ConsensusMsg, Genesis, PublicKey,
};
use zksync_consensus_storage::{testonly::new_store, BlockStore};
use zksync_consensus_utils::pipe;

#[derive(Clone)]
Expand Down Expand Up @@ -200,6 +205,10 @@ async fn run_nodes_twins(
// Inbox of the consensus instances, indexed by their network identity,
// so that we can send to the one which is in the same partition as the sender.
let mut sends = HashMap::new();
// Blockstores of nodes, indexed by port; used to simulate the effect of gossip.
let mut stores = HashMap::new();
// Outbound gossip relationships to simulate the effects of block fetching.
let mut gossip_targets = HashMap::new();

for (i, spec) in specs.iter().enumerate() {
let (actor_pipe, dispatcher_pipe) = pipe::new();
Expand All @@ -210,6 +219,19 @@ async fn run_nodes_twins(

sends.insert(port, actor_pipe.send);
recvs.push((port, actor_pipe.recv));
stores.insert(port, spec.block_store.clone());
gossip_targets.insert(
port,
spec.net
.gossip
.static_outbound
.values()
.map(|host| {
let addr: std::net::SocketAddr = host.0.parse().expect("valid address");
addr.port()
})
.collect(),
);

// Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to:
// * send Output messages from other actors to this consensus instance
Expand All @@ -225,20 +247,36 @@ async fn run_nodes_twins(
// Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile
// with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`.
// Potentially `ctx::NoCopy` could be used with `port`.
let sends = &sends;
let validator_ports = &validator_ports;
let sends = &sends;
let stores = &stores;
let gossip_targets = &gossip_targets;
let (gossip_send, gossip_recv) = channel::unbounded();

// Run networks by receiving from all consensus instances:
// * identify the view they are in from the message
// * identify the partition they are in based on their network id
// * either broadcast to all other instances in the partition, or find out the network
// identity of the target validator and send to it _iff_ they are in the same partition
// * simulating the gossiping of finalized blockss
scope::run!(ctx, |ctx, s| async move {
for (port, recv) in recvs {
let gossip_send = gossip_send.clone();
s.spawn(async move {
twins_receive_loop(ctx, port, recv, sends, validator_ports, splits).await
twins_receive_loop(
ctx,
splits,
validator_ports,
sends,
&gossip_targets[&port],
gossip_send,
port,
recv,
)
.await
});
}
s.spawn(async { twins_gossip_loop(ctx, stores, gossip_recv).await });
anyhow::Ok(())
})
.await
Expand All @@ -248,17 +286,32 @@ async fn run_nodes_twins(

/// Receive input messages from the consensus actor and send them to the others
/// according to the partition schedule of the port associated with this instance.
///
/// We have to simulate the gossip layer which isn't instantiated by these tests.
/// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload
/// and won't be able to finalize the block, and won't participate further in the consensus.
#[allow(clippy::too_many_arguments)]
async fn twins_receive_loop(
ctx: &ctx::Ctx,
splits: &PortSplitSchedule,
validator_ports: &HashMap<PublicKey, Vec<Port>>,
sends: &HashMap<Port, UnboundedSender<io::OutputMessage>>,
gossip_targets: &HashSet<Port>,
gossip_send: UnboundedSender<(Port, Port, BlockNumber)>,
port: Port,
mut recv: UnboundedReceiver<io::InputMessage>,
sends: &HashMap<Port, UnboundedSender<io::OutputMessage>>,
validator_ports: &HashMap<PublicKey, Vec<Port>>,
splits: &PortSplitSchedule,
) -> anyhow::Result<()> {
let rng = &mut ctx.rng();
let start = &ctx.now();

// Finalized block number iff this node can gossip to the target and the message contains a QC.
let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| {
if !gossip_targets.contains(&target_port) {
return None;
}
output_msg_commit_qc(msg).map(|qc| qc.header().number)
};

// We need to buffer messages that cannot be delivered due to partitioning, and deliver them later.
// The spec says that the network is expected to deliver messages eventually, potentially out of order,
// caveated by the fact that the actual implementation only keep retrying the last message.
Expand Down Expand Up @@ -293,24 +346,28 @@ async fn twins_receive_loop(
if can_send {
let s = &sends[&target_port];

// Send after taking note of potentially gossipable blocks.
let send = |msg| {
if let Some(bn) = block_to_gossip(target_port, &msg) {
gossip_send.send((port, target_port, bn));
}
s.send(msg);
};

// Messages can be delivered in arbitrary order.
stash.shuffle(rng);

for unstashed in stash.drain(0..) {
let view = output_msg_view_number(&unstashed);
let kind = output_msg_label(&unstashed);
eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}");
s.send(unstashed);
send(unstashed);
}
eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}");
s.send(msg);
// TODO: If the message is a LeaderPrepare then remember the block sent,
// then if the next message is a LeaderCommit with a CommitQC then
// prepare a FinalBlock and pretend that we have a gossip layer by
// by calling block_store.queue_block on every other node.
send(msg);
} else {
eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}");
stash.push(msg)
stash.push(msg);
}
};

Expand Down Expand Up @@ -376,6 +433,37 @@ async fn twins_receive_loop(
Ok(())
}

/// Simulate the effects of gossip: if the source node can gossip to the target,
/// and the message being sent contains a CommitQC, and the sender has the
/// referenced finalized block in its store, then assume that the target
/// could fetch this block if they wanted, and insert it directly into their store.
///
/// This happens concurrently with the actual message passing, so it's just an
/// approximation. It also happens out of order. This method only contains the
/// send loop, to deal with the spawning of store operations.
async fn twins_gossip_loop(
ctx: &ctx::Ctx,
stores: &HashMap<Port, Arc<BlockStore>>,
mut recv: UnboundedReceiver<(Port, Port, BlockNumber)>,
) -> anyhow::Result<()> {
scope::run!(ctx, |ctx, s| async move {
while let Ok((from, to, number)) = recv.recv(ctx).await {
// Perform the storage operations asynchronously because `queue_block` will
// wait for all dependencies to be inserted first.
s.spawn_bg(async move {
let local_store = &stores[&from];
let remote_store = &stores[&to];
if let Ok(Some(block)) = local_store.block(ctx, number).await {
let _ = remote_store.queue_block(ctx, block).await;
}
Ok(())
});
}
Ok(())
})
.await
}

fn output_msg_view_number(msg: &io::OutputMessage) -> u64 {
match msg {
io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0,
Expand All @@ -387,3 +475,14 @@ fn output_msg_label(msg: &io::OutputMessage) -> &str {
io::OutputMessage::Consensus(cr) => cr.msg.msg.label(),
}
}

fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> {
match msg {
io::OutputMessage::Consensus(cr) => match &cr.msg.msg {
ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(),
ConsensusMsg::LeaderPrepare(lp) => lp.justification.high_qc(),
ConsensusMsg::ReplicaCommit(_) => None,
ConsensusMsg::LeaderCommit(lc) => Some(&lc.justification),
},
}
}
2 changes: 1 addition & 1 deletion node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async fn twins_network_wo_twins_wo_partitions() {
async fn twins_network_wo_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(5.0));
// TODO: At the moment this test doesn't work with partitions, so just try to do a single scenario to debug.
run_twins(ctx, 6, false, 1).await.unwrap();
run_twins(ctx, 6, false, 10).await.unwrap();
}

/// Create network configuration for a given number of replicas with a random number of twins and run [Test].
Expand Down

0 comments on commit 76cfc9c

Please sign in to comment.