From 90b893ceead1aabff34713a152376fffbf2b9e0a Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 10 Jun 2024 15:27:55 +0100 Subject: [PATCH] BFT-465: Twins tests (#117) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Add tests that run multiple Twins scenarios with random number of replicas as twins. - [x] Add `Network::Twins` type with a partition schedule - [x] Add `run_nodes_twins` to execute the test with arbitrary network partitions and potentially multiple ports per validator - [x] Buffer messages that can't be delivered and deliver them sometime later (potentially unless superseded by a newer message from source to target) - [x] Simulate the gossiping of finalized blocks - [x] Run test with no twins and no partitions - [x] Run test with no twins but random partitions - [x] Run test with twins and random partitions ### Investigation ```shell cargo nextest run -p zksync_consensus_bft twins_network --no-capture ``` Initially the test failed when partitions are introduced. I'll try to understand if this is because the leader got isolated and an important message got lost. Would like to understand if eventual delivery is an absolute requirement even if all partitions are at least as large as the quorum size. 🔍 I think the reason for the test failing is because it looks for all nodes having persisted a certain number of blocks, but if a proposal with the payload is missed due to a partition preventing the message from being delivered, then there is no mechanism in the machinery instantiated by the test to procure these later, and those nodes are stuck. 🔧 I implemented a message stashing mechanism in the mock network but it still doesn't finalise blocks 👀 🔍 The problem seems to be that my unstashing mechanism only kicked in when node A tried to send a new message to node B and wasn't blocked any more. However if A didn't try to send, then the stashed messages to B weren't delivered, which causes longer and longer timeouts as nobody is making progress for one reason or another. Meanwhile for example C can be already in a new view, so if we see that, we could conclude that A-to-B should be unstashed as well even if there are no messages from A to B in that round. 🔧 I'll try testing after merging https://github.com/matter-labs/era-consensus/pull/119 which should trigger unstashes in each round. 🔍 It's still failing after adding the replica-to-replica broadcast. For example one replica A is isolated in a round 1 and doesn't get a LeaderCommit; then in the next round replica B is isolated, and A gets all the missing messages from round 1, plus the new LeaderPrepare, but it doesn't respond with a ReplicaCommit because it doesn't have the block from round 1, and therefore cannot even store proposal 2. The consensus relies on the external gossiping mechanism to eventually propagate the FinalBlock to it; until then the node is stuck. I need to simulate the effect of gossiping in the test. 🔧 I implemented a simulated gossip using the following mechanism: if one node is about to send/unstash a message to another and they are in a gossip relationship, and the message contains a CommitQC, and the sender has the finalized block for the committed height, then it inserts the block directly into the target store. 🔍 The tests now work without twins, but fail with 1 or 2 twins, albeit not on every scenario 🔧 Changed the simulated gossip to push all ancestors of a finalized block into the target blockstore, not just the one in the CommitQC that is in the latest message. This simulates the ability of the target to fetch all missing blocks. ## Why ❔ To check that the consensus does not fail as long as the number of twins does not exceed the tolerable number of Byzantine nodes. --- node/actors/bft/src/testonly/node.rs | 5 + node/actors/bft/src/testonly/run.rs | 530 ++++++++++++++++++++--- node/actors/bft/src/tests.rs | 212 ++++++++- node/actors/network/src/gossip/fetch.rs | 2 +- node/actors/network/src/gossip/mod.rs | 2 + node/actors/network/src/gossip/runner.rs | 1 + node/actors/network/src/testonly.rs | 16 +- node/libs/utils/src/pipe.rs | 18 + 8 files changed, 713 insertions(+), 73 deletions(-) diff --git a/node/actors/bft/src/testonly/node.rs b/node/actors/bft/src/testonly/node.rs index d08a52d0..7b348e15 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/actors/bft/src/testonly/node.rs @@ -62,6 +62,7 @@ impl Node { let mut con_recv = consensus_pipe.recv; let con_send = consensus_pipe.send; scope::run!(ctx, |ctx, s| async { + // Run the consensus actor s.spawn(async { let validator_key = self.net.validator_key.clone().unwrap(); crate::Config { @@ -75,6 +76,8 @@ impl Node { .await .context("consensus.run()") }); + // Forward input messages received from the network to the actor; + // turns output from others to input for this instance. s.spawn(async { while let Ok(network_message) = net_recv.recv(ctx).await { match network_message { @@ -85,6 +88,8 @@ impl Node { } Ok(()) }); + // Forward output messages from the actor to the network; + // turns output from this to inputs for others. // Get the next message from the channel. Our response depends on what type of replica we are. while let Ok(msg) = con_recv.recv(ctx).await { match msg { diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 9605e92d..102b28f4 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,18 +1,39 @@ use super::{Behavior, Node}; -use std::collections::HashMap; +use anyhow::bail; +use network::{io, Config}; +use rand::seq::SliceRandom; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, oneshot, scope}; +use zksync_concurrency::{ + ctx::{ + self, + channel::{self, UnboundedReceiver, UnboundedSender}, + }, + oneshot, scope, +}; use zksync_consensus_network as network; use zksync_consensus_roles::validator; -use zksync_consensus_storage::testonly::new_store; +use zksync_consensus_storage::{testonly::new_store, BlockStore}; use zksync_consensus_utils::pipe; -#[derive(Clone, Copy)] +#[derive(Clone)] pub(crate) enum Network { Real, Mock, + Twins(PortSplitSchedule), } +// Identify different network identities of twins by their listener port. +// They are all expected to be on localhost, but `ListenerAddr` can't be +// directly used as a map key. +pub(crate) type Port = u16; +pub(crate) type PortPartition = HashSet; +pub(crate) type PortSplit = Vec; +pub(crate) type PortSplitSchedule = Vec; + /// Config for the test. Determines the parameters to run the test with. #[derive(Clone)] pub(crate) struct Test { @@ -22,7 +43,7 @@ pub(crate) struct Test { } impl Test { - /// Run a test with the given parameters. + /// Run a test with the given parameters and a random network setup. pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let rng = &mut ctx.rng(); let setup = validator::testonly::Setup::new_with_weights( @@ -30,11 +51,21 @@ impl Test { self.nodes.iter().map(|(_, w)| *w).collect(), ); let nets: Vec<_> = network::testonly::new_configs(rng, &setup, 1); + self.run_with_config(ctx, nets, &setup.genesis).await + } + + /// Run a test with the given parameters and network configuration. + pub(crate) async fn run_with_config( + &self, + ctx: &ctx::Ctx, + nets: Vec, + genesis: &validator::Genesis, + ) -> anyhow::Result<()> { let mut nodes = vec![]; let mut honest = vec![]; scope::run!(ctx, |ctx, s| async { for (i, net) in nets.into_iter().enumerate() { - let (store, runner) = new_store(ctx, &setup.genesis).await; + let (store, runner) = new_store(ctx, genesis).await; s.spawn_bg(runner.run(ctx)); if self.nodes[i].0 == Behavior::Honest { honest.push(store.clone()); @@ -46,11 +77,11 @@ impl Test { }); } assert!(!honest.is_empty()); - s.spawn_bg(run_nodes(ctx, self.network, &nodes)); + s.spawn_bg(run_nodes(ctx, &self.network, &nodes)); // Run the nodes until all honest nodes store enough finalized blocks. assert!(self.blocks_to_finalize > 0); - let first = setup.genesis.first_block; + let first = genesis.first_block; let last = first + (self.blocks_to_finalize as u64 - 1); for store in &honest { store.wait_until_queued(ctx, last).await?; @@ -59,9 +90,14 @@ impl Test { // Check that the stored blocks are consistent. for i in 0..self.blocks_to_finalize as u64 { let i = first + i; - let want = honest[0].block(ctx, i).await?; + // Only comparing the payload; the justification might be different. + let want = honest[0] + .block(ctx, i) + .await? + .expect("checked its existence") + .payload; for store in &honest[1..] { - assert_eq!(want, store.block(ctx, i).await?); + assert_eq!(want, store.block(ctx, i).await?.unwrap().payload); } } Ok(()) @@ -71,77 +107,435 @@ impl Test { } /// Run a set of nodes. -async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow::Result<()> { +async fn run_nodes(ctx: &ctx::Ctx, network: &Network, specs: &[Node]) -> anyhow::Result<()> { + match network { + Network::Real => run_nodes_real(ctx, specs).await, + Network::Mock => run_nodes_mock(ctx, specs).await, + Network::Twins(splits) => run_nodes_twins(ctx, specs, splits).await, + } +} + +/// Run a set of nodes with a real network. +async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { - match network { - Network::Real => { - let mut nodes = vec![]; - for (i, spec) in specs.iter().enumerate() { - let (node, runner) = network::testonly::Instance::new( - spec.net.clone(), - spec.block_store.clone(), - ); - s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); - nodes.push(node); + let mut nodes = vec![]; + for (i, spec) in specs.iter().enumerate() { + let (node, runner) = + network::testonly::Instance::new(spec.net.clone(), spec.block_store.clone()); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + nodes.push(node); + } + network::testonly::instant_network(ctx, nodes.iter()).await?; + for (i, node) in nodes.into_iter().enumerate() { + let spec = &specs[i]; + s.spawn( + async { + let mut node = node; + spec.run(ctx, node.pipe()).await } - network::testonly::instant_network(ctx, nodes.iter()).await?; - for (i, node) in nodes.into_iter().enumerate() { - let spec = &specs[i]; - s.spawn( - async { - let mut node = node; - spec.run(ctx, node.pipe()).await + .instrument(tracing::info_span!("node", i)), + ); + } + Ok(()) + }) + .await +} + +/// Run a set of nodes with a mock network. +async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + // Actor inputs, ie. where the test can send messages to the consensus. + let mut sends = HashMap::new(); + // Actor outputs, ie. the messages the actor wants to send to the others. + let mut recvs = vec![]; + for (i, spec) in specs.iter().enumerate() { + let (actor_pipe, dispatcher_pipe) = pipe::new(); + let key = spec.net.validator_key.as_ref().unwrap().public(); + sends.insert(key, actor_pipe.send); + recvs.push(actor_pipe.recv); + // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors + s.spawn( + async { + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await + } + .instrument(tracing::info_span!("node", i)), + ); + } + // Run mock networks by receiving the output-turned-input from all consensus + // instances and forwarding them to the others. + scope::run!(ctx, |ctx, s| async { + for recv in recvs { + s.spawn(async { + use zksync_consensus_network::io; + let mut recv = recv; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + match message.recipient { + io::Target::Validator(v) => sends.get(&v).unwrap().send(msg()), + io::Target::Broadcast => sends.values().for_each(|s| s.send(msg())), } - .instrument(tracing::info_span!("node", i)), - ); + } + Ok(()) + }); + } + anyhow::Ok(()) + }) + .await + }) + .await +} + +/// Run a set of nodes with a Twins network configuration. +async fn run_nodes_twins( + ctx: &ctx::Ctx, + specs: &[Node], + splits: &PortSplitSchedule, +) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + // All known network ports of a validator, so that we can tell if any of + // those addresses are in the same partition as the sender. + let mut validator_ports: HashMap<_, Vec> = HashMap::new(); + // Outbox of consensus instances, paired with their network identity, + // so we can tell which partition they are in in the given view. + let mut recvs = vec![]; + // Inbox of the consensus instances, indexed by their network identity, + // so that we can send to the one which is in the same partition as the sender. + let mut sends = HashMap::new(); + // Blockstores of nodes, indexed by port; used to simulate the effect of gossip. + let mut stores = HashMap::new(); + // Outbound gossip relationships to simulate the effects of block fetching. + let mut gossip_targets = HashMap::new(); + + for (i, spec) in specs.iter().enumerate() { + let (actor_pipe, dispatcher_pipe) = pipe::new(); + let validator_key = spec.net.validator_key.as_ref().unwrap().public(); + let port = spec.net.server_addr.port(); + + validator_ports.entry(validator_key).or_default().push(port); + + sends.insert(port, actor_pipe.send); + recvs.push((port, actor_pipe.recv)); + stores.insert(port, spec.block_store.clone()); + gossip_targets.insert( + port, + spec.net + .gossip + .static_outbound + .values() + .map(|host| { + let addr: std::net::SocketAddr = host.0.parse().expect("valid address"); + addr.port() + }) + .collect(), + ); + + // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors + s.spawn( + async { + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await } + .instrument(tracing::info_span!("node", i)), + ); + } + // Taking these references is necessary for the `scope::run!` environment lifetime rules to compile + // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. + // Potentially `ctx::NoCopy` could be used with `port`. + let validator_ports = &validator_ports; + let sends = &sends; + let stores = &stores; + let gossip_targets = &gossip_targets; + let (gossip_send, gossip_recv) = channel::unbounded(); + + // Run networks by receiving from all consensus instances: + // * identify the view they are in from the message + // * identify the partition they are in based on their network id + // * either broadcast to all other instances in the partition, or find out the network + // identity of the target validator and send to it iff they are in the same partition + // * simulate the gossiping of finalized blockss + scope::run!(ctx, |ctx, s| async move { + for (port, recv) in recvs { + let gossip_send = gossip_send.clone(); + s.spawn(async move { + twins_receive_loop( + ctx, + splits, + validator_ports, + sends, + TwinsGossipConfig { + targets: &gossip_targets[&port], + send: gossip_send, + }, + port, + recv, + ) + .await + }); + } + s.spawn(async { twins_gossip_loop(ctx, stores, gossip_recv).await }); + anyhow::Ok(()) + }) + .await + }) + .await +} + +/// Receive input messages from the consensus actor and send them to the others +/// according to the partition schedule of the port associated with this instance. +/// +/// We have to simulate the gossip layer which isn't instantiated by these tests. +/// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload +/// and won't be able to finalize the block, and won't participate further in the consensus. +async fn twins_receive_loop( + ctx: &ctx::Ctx, + splits: &PortSplitSchedule, + validator_ports: &HashMap>, + sends: &HashMap>, + gossip: TwinsGossipConfig<'_>, + port: Port, + mut recv: UnboundedReceiver, +) -> anyhow::Result<()> { + let rng = &mut ctx.rng(); + let start = std::time::Instant::now(); // Just to give an idea of how long time passes between rounds. + + // Finalized block number iff this node can gossip to the target and the message contains a QC. + let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { + if !gossip.targets.contains(&target_port) { + return None; + } + output_msg_commit_qc(msg).map(|qc| qc.header().number) + }; + + // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. + // The spec says that the network is expected to deliver messages eventually, potentially out of order, + // caveated by the fact that the actual implementation only keeps retrying the last message.. + // A separate issue is the definition of "later", without actually adding timing assumptions: + // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas + // don't move on from a view until they reach quorum, then "later" could be defined by so many + // messages in a view that it indicates a timeout loop. NB the consensus might not actually have + // an actual timeout loop and might rely on the network trying to re-broadcast forever. + // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that + // can move on to the next view, in which a new partition configuration will allow them to broadcast + // to previously isolated peers. + // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer + // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff + // out of the box, because replicas only communicate with their leader, so if for example B missed + // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because + // they can't commit the earlier block until they get a new message from A. However since + // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, + // so we shouldn't have to wait long for A to unstash its messages to B. + // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages + // and unstash them as soon as someone moves on to a new view. + let mut stashes: HashMap> = HashMap::new(); + + // Either stash a message, or unstash all messages and send them to the target along with the new one. + let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let stash = stashes.entry(target_port).or_default(); + let view = output_msg_view_number(&msg); + let kind = output_msg_label(&msg); + + // Remove any previously stashed message of the same kind, because the network will only + // try to send the last one of each, not all pending messages. + stash.retain(|stashed| output_msg_label(stashed) != kind); + + if !can_send { + tracing::info!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); + stash.push(msg); + return; + } + + let s = &sends[&target_port]; + + // Send after taking note of potentially gossipable blocks. + let send = |msg| { + if let Some(number) = block_to_gossip(target_port, &msg) { + gossip.send.send(TwinsGossipMessage { + from: port, + to: target_port, + number, + }); } - Network::Mock => { - let mut sends = HashMap::new(); - let mut recvs = vec![]; - for (i, spec) in specs.iter().enumerate() { - let (actor_pipe, pipe) = pipe::new(); - let key = spec.net.validator_key.as_ref().unwrap().public(); - sends.insert(key, actor_pipe.send); - recvs.push(actor_pipe.recv); - s.spawn( - async { - let mut pipe = pipe; - spec.run(ctx, &mut pipe).await + s.send(msg); + }; + + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); + + for unstashed in stash.drain(0..) { + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + tracing::info!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); + send(unstashed); + } + tracing::info!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); + send(msg); + }; + + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let view_number = message.message.msg.view().number.0 as usize; + // Here we assume that all instances start from view 0 in the tests. + // If the view is higher than what we have planned for, assume no partitions. + // Every node is guaranteed to be present in only one partition. + let partitions_opt = splits.get(view_number); + + if partitions_opt.is_none() { + bail!( + "ran out of scheduled rounds; most likely cannot finalize blocks even if we go on" + ); + } + + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + + match message.recipient { + io::Target::Broadcast => match partitions_opt { + None => { + tracing::info!("broadcasting view={view_number} from={port} target=all"); + for target_port in sends.keys() { + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + tracing::info!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); + for target_port in p { + send_or_stash(can_send, *target_port, msg()); } - .instrument(tracing::info_span!("node", i)), - ); + } } - scope::run!(ctx, |ctx, s| async { - for recv in recvs { - s.spawn(async { - use zksync_consensus_network::io; - let mut recv = recv; - while let Ok(io::InputMessage::Consensus(message)) = - recv.recv(ctx).await - { - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) - }; - match message.recipient { - io::Target::Validator(v) => sends.get(&v).unwrap().send(msg()), - io::Target::Broadcast => { - sends.values().for_each(|s| s.send(msg())) - } + }, + io::Target::Validator(v) => { + let target_ports = &validator_ports[&v]; + + match partitions_opt { + None => { + for target_port in target_ports { + tracing::info!( + "unicasting view={view_number} from={port} target={target_port}" + ); + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + for target_port in target_ports { + if p.contains(target_port) { + tracing::info!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); + send_or_stash(can_send, *target_port, msg()); } } - Ok(()) - }); + } } - anyhow::Ok(()) - }) - .await?; + } + } + } + } + Ok(()) +} + +/// Simulate the effects of gossip: if the source node can gossip to the target, +/// and the message being sent contains a CommitQC, and the sender has the +/// referenced finalized block in its store, then assume that the target +/// could fetch this block if they wanted, and insert it directly into their store. +/// +/// This happens concurrently with the actual message passing, so it's just an +/// approximation. It also happens out of order. This method only contains the +/// send loop, to deal with the spawning of store operations. +async fn twins_gossip_loop( + ctx: &ctx::Ctx, + stores: &HashMap>, + mut recv: UnboundedReceiver, +) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async move { + while let Ok(TwinsGossipMessage { + from, + to, + mut number, + }) = recv.recv(ctx).await + { + let local_store = &stores[&from]; + let remote_store = &stores[&to]; + let first_needed = remote_store.queued().next(); + + loop { + // Stop going back if the target already has the block. + if number < first_needed { + break; + } + // Stop if the source doesn't actually have this block to give. + let Ok(Some(block)) = local_store.block(ctx, number).await else { + break; + }; + // Perform the storing operation asynchronously because `queue_block` will + // wait for all dependencies to be inserted first. + s.spawn_bg(async move { + let _ = remote_store.queue_block(ctx, block).await; + Ok(()) + }); + // Be pessimistic and try to insert all ancestors, to minimise the chance that + // for some reason a node doesn't get a finalized block from anyone. + // Without this some scenarios with twins actually fail. + if let Some(prev) = number.prev() { + number = prev; + } else { + break; + }; } } Ok(()) }) .await } + +fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber { + match msg { + io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number, + } +} + +fn output_msg_label(msg: &io::OutputMessage) -> &str { + match msg { + io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), + } +} + +fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> { + use validator::ConsensusMsg; + match msg { + io::OutputMessage::Consensus(cr) => match &cr.msg.msg { + ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(), + ConsensusMsg::LeaderPrepare(lp) => lp.justification.high_qc(), + ConsensusMsg::ReplicaCommit(_) => None, + ConsensusMsg::LeaderCommit(lc) => Some(&lc.justification), + }, + } +} + +struct TwinsGossipMessage { + from: Port, + to: Port, + number: validator::BlockNumber, +} + +struct TwinsGossipConfig<'a> { + /// Ports to which this node should gossip to. + targets: &'a HashSet, + /// Channel over which to send gossip messages. + send: UnboundedSender, +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index eaddccb2..36d57fbd 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -1,6 +1,20 @@ -use crate::testonly::{ut_harness::UTHarness, Behavior, Network, Test}; -use zksync_concurrency::{ctx, scope, time}; -use zksync_consensus_roles::validator; +use std::collections::HashMap; + +use crate::testonly::{ + twins::{Cluster, HasKey, ScenarioGenerator, Twin}, + ut_harness::UTHarness, + Behavior, Network, PortSplitSchedule, Test, +}; +use zksync_concurrency::{ + ctx::{self, Ctx}, + scope, time, +}; +use zksync_consensus_network::testonly::new_configs_for_validators; +use zksync_consensus_roles::validator::{ + self, + testonly::{Setup, SetupSpec}, + LeaderSelectionMode, PublicKey, SecretKey, +}; async fn run_test(behavior: Behavior, network: Network) { let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); @@ -195,3 +209,195 @@ async fn non_proposing_leader() { .await .unwrap() } + +/// Run Twins scenarios without actual twins, and with so few nodes that all +/// of them are required for a quorum, which means (currently) there won't be +/// any partitions. +/// +/// This should be a simple sanity check that the network works and consensus +/// is achieved under the most favourable conditions. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_wo_twins_wo_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n<6 implies f=0 and q=n + run_twins(ctx, 5, 0, 10).await.unwrap(); +} + +/// Run Twins scenarios without actual twins, but enough replicas that partitions +/// can play a role, isolating certain nodes (potentially the leader) in some +/// rounds. +/// +/// This should be a sanity check that without Byzantine behaviour the consensus +/// is resilient to temporary network partitions. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_wo_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible. + run_twins(ctx, 6, 0, 5).await.unwrap(); +} + +/// Run Twins scenarios with random number of nodes and 1 twin. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_w1_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n>=6 implies f>=1 and q=n-f + for num_replicas in 6..=10 { + // let num_honest = validator::threshold(num_replicas as u64) as usize; + // let max_faulty = num_replicas - num_honest; + // let num_twins = rng.gen_range(1..=max_faulty); + run_twins(ctx, num_replicas, 1, 5).await.unwrap(); + } +} + +/// Run Twins scenarios with higher number of nodes and 2 twins. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_w2_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n>=11 implies f>=2 and q=n-f + run_twins(ctx, 11, 2, 5).await.unwrap(); +} + +/// Create network configuration for a given number of replicas and twins and run [Test]. +async fn run_twins( + ctx: &Ctx, + num_replicas: usize, + num_twins: usize, + num_scenarios: usize, +) -> anyhow::Result<()> { + zksync_concurrency::testonly::abort_on_panic(); + // Use a single timeout for all scenarios to finish. + // A single scenario with 11 replicas took 3-5 seconds. + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60)); + + #[derive(PartialEq, Debug)] + struct Replica { + id: i64, // non-zero ID + public_key: PublicKey, + secret_key: SecretKey, + } + + impl HasKey for Replica { + type Key = PublicKey; + + fn key(&self) -> &Self::Key { + &self.public_key + } + } + + impl Twin for Replica { + fn to_twin(&self) -> Self { + Self { + id: -self.id, + public_key: self.public_key.clone(), + secret_key: self.secret_key.clone(), + } + } + } + + let rng = &mut ctx.rng(); + + // The existing test machinery uses the number of finalized blocks as an exit criteria. + let blocks_to_finalize = 3; + // The test is going to disrupt the communication by partitioning nodes, + // where the leader might not be in a partition with enough replicas to + // form a quorum, therefore to allow N blocks to be finalized we need to + // go longer. + let num_rounds = blocks_to_finalize * 10; + // The paper considers 2 or 3 partitions enough. + let max_partitions = 3; + + // Every validator has equal power of 1. + const WEIGHT: u64 = 1; + let mut spec = SetupSpec::new_with_weights(rng, vec![WEIGHT; num_replicas]); + + let replicas = spec + .validator_weights + .iter() + .enumerate() + .map(|(i, (sk, _))| Replica { + id: i as i64 + 1, + public_key: sk.public(), + secret_key: sk.clone(), + }) + .collect::>(); + + let cluster = Cluster::new(replicas, num_twins); + let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions); + + // Create network config for all nodes in the cluster (assigns unique network addresses). + let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1); + + let node_to_port = cluster + .nodes() + .iter() + .zip(nets.iter()) + .map(|(node, net)| (node.id, net.server_addr.port())) + .collect::>(); + + assert_eq!(node_to_port.len(), cluster.num_nodes()); + + // Every network needs a behaviour. They are all honest, just some might be duplicated. + let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; + + // Reuse the same cluster and network setup to run a few scenarios. + for i in 0..num_scenarios { + // Generate a permutation of partitions and leaders for the given number of rounds. + let scenario = scenarios.generate_one(rng); + + // Assign the leadership schedule to the consensus. + spec.leader_selection = + LeaderSelectionMode::Rota(scenario.rounds.iter().map(|rc| rc.leader.clone()).collect()); + + // Generate a new setup with this leadership schedule. + let setup = Setup::from(spec.clone()); + + // Create a network with the partition schedule of the scenario. + let splits: PortSplitSchedule = scenario + .rounds + .iter() + .map(|rc| { + rc.partitions + .iter() + .map(|p| p.iter().map(|r| node_to_port[&r.id]).collect()) + .collect() + }) + .collect(); + + tracing::info!( + "num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}", + cluster.num_nodes() + ); + + for (r, rc) in scenario.rounds.iter().enumerate() { + let partitions = &splits[r]; + + let leader_ports = cluster + .nodes() + .iter() + .filter(|n| n.public_key == *rc.leader) + .map(|n| node_to_port[&n.id]) + .collect::>(); + + let leader_partition_sizes = leader_ports + .iter() + .map(|lp| partitions.iter().find(|p| p.contains(lp)).unwrap().len()) + .collect::>(); + + let leader_isolated = leader_partition_sizes + .iter() + .all(|s| *s < cluster.quorum_size()); + + tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); + } + + Test { + network: Network::Twins(splits), + nodes: nodes.clone(), + blocks_to_finalize, + } + .run_with_config(ctx, nets.clone(), &setup.genesis) + .await? + } + + Ok(()) +} diff --git a/node/actors/network/src/gossip/fetch.rs b/node/actors/network/src/gossip/fetch.rs index 115852bd..e9ec37fc 100644 --- a/node/actors/network/src/gossip/fetch.rs +++ b/node/actors/network/src/gossip/fetch.rs @@ -64,7 +64,7 @@ impl Queue { ) -> ctx::OrCanceled { let sub = &mut self.0.subscribe(); while ctx.is_active() { - // Wait for the lowest requested block to be available. + // Wait for the lowest requested block to be available on the remote peer. // This scope is always cancelled, so we ignore the result. let mut block_number = None; let _: Result<(), _> = scope::run!(ctx, |ctx, s| async { diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 7091c643..7021ff28 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -49,6 +49,8 @@ pub(crate) struct Network { /// Output pipe of the network actor. pub(crate) sender: channel::UnboundedSender, /// Queue of block fetching requests. + /// + /// These are blocks that this node wants to request from remote peers via RPC. pub(crate) fetch_queue: fetch::Queue, /// Last viewed QC. pub(crate) last_viewed_qc: Option, diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index af727905..bc7eace0 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -204,6 +204,7 @@ impl Network { // Perform get_block calls to peer. s.spawn::<()>(async { + // Gossiped state of what range of blocks is available on the remote peer. let state = &mut push_block_store_state_server.state.subscribe(); loop { let call = get_block_client.reserve(ctx).await?; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 5e08dbbb..897d1354 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -77,7 +77,21 @@ pub fn new_configs( setup: &validator::testonly::Setup, gossip_peers: usize, ) -> Vec { - let configs = setup.validator_keys.iter().map(|validator_key| { + new_configs_for_validators(rng, setup.validator_keys.iter(), gossip_peers) +} + +/// Construct configs for `n` validators of the consensus. +/// +/// This version allows for repeating keys used in Twins tests. +pub fn new_configs_for_validators<'a, I>( + rng: &mut impl Rng, + validator_keys: I, + gossip_peers: usize, +) -> Vec +where + I: Iterator, +{ + let configs = validator_keys.map(|validator_key| { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, diff --git a/node/libs/utils/src/pipe.rs b/node/libs/utils/src/pipe.rs index e8db302c..d1b3a0d7 100644 --- a/node/libs/utils/src/pipe.rs +++ b/node/libs/utils/src/pipe.rs @@ -5,12 +5,30 @@ use std::future::Future; use zksync_concurrency::ctx::{self, channel, Ctx}; /// This is the end of the Pipe that should be held by the actor. +/// +/// The actor can receive `In` and send back `Out` messages. pub type ActorPipe = Pipe; /// This is the end of the Pipe that should be held by the dispatcher. +/// +/// The dispatcher can send `In` messages and receive `Out` back. pub type DispatcherPipe = Pipe; /// This is a generic Pipe end. +/// +/// It is used to receive `In` and send `Out` messages. +/// +/// When viewed from the perspective of [new]: +/// * messages of type `In` are going right-to-left +/// * messages of type `Out` are going left-to-right +/// +/// ```text +/// In <- -------- <- In +/// | Pipe | +/// Out -> -------- -> Out +/// ^ ^ +/// Actor Dispatcher +/// ``` #[derive(Debug)] pub struct Pipe { /// This is the channel that receives messages.