diff --git a/node/actors/bft/src/testonly/node.rs b/node/actors/bft/src/testonly/node.rs index d08a52d0..7b348e15 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/actors/bft/src/testonly/node.rs @@ -62,6 +62,7 @@ impl Node { let mut con_recv = consensus_pipe.recv; let con_send = consensus_pipe.send; scope::run!(ctx, |ctx, s| async { + // Run the consensus actor s.spawn(async { let validator_key = self.net.validator_key.clone().unwrap(); crate::Config { @@ -75,6 +76,8 @@ impl Node { .await .context("consensus.run()") }); + // Forward input messages received from the network to the actor; + // turns output from others to input for this instance. s.spawn(async { while let Ok(network_message) = net_recv.recv(ctx).await { match network_message { @@ -85,6 +88,8 @@ impl Node { } Ok(()) }); + // Forward output messages from the actor to the network; + // turns output from this to inputs for others. // Get the next message from the channel. Our response depends on what type of replica we are. while let Ok(msg) = con_recv.recv(ctx).await { match msg { diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 9605e92d..102b28f4 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,18 +1,39 @@ use super::{Behavior, Node}; -use std::collections::HashMap; +use anyhow::bail; +use network::{io, Config}; +use rand::seq::SliceRandom; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, oneshot, scope}; +use zksync_concurrency::{ + ctx::{ + self, + channel::{self, UnboundedReceiver, UnboundedSender}, + }, + oneshot, scope, +}; use zksync_consensus_network as network; use zksync_consensus_roles::validator; -use zksync_consensus_storage::testonly::new_store; +use zksync_consensus_storage::{testonly::new_store, BlockStore}; use zksync_consensus_utils::pipe; -#[derive(Clone, Copy)] +#[derive(Clone)] pub(crate) enum Network { Real, Mock, + Twins(PortSplitSchedule), } +// Identify different network identities of twins by their listener port. +// They are all expected to be on localhost, but `ListenerAddr` can't be +// directly used as a map key. +pub(crate) type Port = u16; +pub(crate) type PortPartition = HashSet; +pub(crate) type PortSplit = Vec; +pub(crate) type PortSplitSchedule = Vec; + /// Config for the test. Determines the parameters to run the test with. #[derive(Clone)] pub(crate) struct Test { @@ -22,7 +43,7 @@ pub(crate) struct Test { } impl Test { - /// Run a test with the given parameters. + /// Run a test with the given parameters and a random network setup. pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let rng = &mut ctx.rng(); let setup = validator::testonly::Setup::new_with_weights( @@ -30,11 +51,21 @@ impl Test { self.nodes.iter().map(|(_, w)| *w).collect(), ); let nets: Vec<_> = network::testonly::new_configs(rng, &setup, 1); + self.run_with_config(ctx, nets, &setup.genesis).await + } + + /// Run a test with the given parameters and network configuration. + pub(crate) async fn run_with_config( + &self, + ctx: &ctx::Ctx, + nets: Vec, + genesis: &validator::Genesis, + ) -> anyhow::Result<()> { let mut nodes = vec![]; let mut honest = vec![]; scope::run!(ctx, |ctx, s| async { for (i, net) in nets.into_iter().enumerate() { - let (store, runner) = new_store(ctx, &setup.genesis).await; + let (store, runner) = new_store(ctx, genesis).await; s.spawn_bg(runner.run(ctx)); if self.nodes[i].0 == Behavior::Honest { honest.push(store.clone()); @@ -46,11 +77,11 @@ impl Test { }); } assert!(!honest.is_empty()); - s.spawn_bg(run_nodes(ctx, self.network, &nodes)); + s.spawn_bg(run_nodes(ctx, &self.network, &nodes)); // Run the nodes until all honest nodes store enough finalized blocks. assert!(self.blocks_to_finalize > 0); - let first = setup.genesis.first_block; + let first = genesis.first_block; let last = first + (self.blocks_to_finalize as u64 - 1); for store in &honest { store.wait_until_queued(ctx, last).await?; @@ -59,9 +90,14 @@ impl Test { // Check that the stored blocks are consistent. for i in 0..self.blocks_to_finalize as u64 { let i = first + i; - let want = honest[0].block(ctx, i).await?; + // Only comparing the payload; the justification might be different. + let want = honest[0] + .block(ctx, i) + .await? + .expect("checked its existence") + .payload; for store in &honest[1..] { - assert_eq!(want, store.block(ctx, i).await?); + assert_eq!(want, store.block(ctx, i).await?.unwrap().payload); } } Ok(()) @@ -71,77 +107,435 @@ impl Test { } /// Run a set of nodes. -async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow::Result<()> { +async fn run_nodes(ctx: &ctx::Ctx, network: &Network, specs: &[Node]) -> anyhow::Result<()> { + match network { + Network::Real => run_nodes_real(ctx, specs).await, + Network::Mock => run_nodes_mock(ctx, specs).await, + Network::Twins(splits) => run_nodes_twins(ctx, specs, splits).await, + } +} + +/// Run a set of nodes with a real network. +async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { - match network { - Network::Real => { - let mut nodes = vec![]; - for (i, spec) in specs.iter().enumerate() { - let (node, runner) = network::testonly::Instance::new( - spec.net.clone(), - spec.block_store.clone(), - ); - s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); - nodes.push(node); + let mut nodes = vec![]; + for (i, spec) in specs.iter().enumerate() { + let (node, runner) = + network::testonly::Instance::new(spec.net.clone(), spec.block_store.clone()); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + nodes.push(node); + } + network::testonly::instant_network(ctx, nodes.iter()).await?; + for (i, node) in nodes.into_iter().enumerate() { + let spec = &specs[i]; + s.spawn( + async { + let mut node = node; + spec.run(ctx, node.pipe()).await } - network::testonly::instant_network(ctx, nodes.iter()).await?; - for (i, node) in nodes.into_iter().enumerate() { - let spec = &specs[i]; - s.spawn( - async { - let mut node = node; - spec.run(ctx, node.pipe()).await + .instrument(tracing::info_span!("node", i)), + ); + } + Ok(()) + }) + .await +} + +/// Run a set of nodes with a mock network. +async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + // Actor inputs, ie. where the test can send messages to the consensus. + let mut sends = HashMap::new(); + // Actor outputs, ie. the messages the actor wants to send to the others. + let mut recvs = vec![]; + for (i, spec) in specs.iter().enumerate() { + let (actor_pipe, dispatcher_pipe) = pipe::new(); + let key = spec.net.validator_key.as_ref().unwrap().public(); + sends.insert(key, actor_pipe.send); + recvs.push(actor_pipe.recv); + // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors + s.spawn( + async { + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await + } + .instrument(tracing::info_span!("node", i)), + ); + } + // Run mock networks by receiving the output-turned-input from all consensus + // instances and forwarding them to the others. + scope::run!(ctx, |ctx, s| async { + for recv in recvs { + s.spawn(async { + use zksync_consensus_network::io; + let mut recv = recv; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + match message.recipient { + io::Target::Validator(v) => sends.get(&v).unwrap().send(msg()), + io::Target::Broadcast => sends.values().for_each(|s| s.send(msg())), } - .instrument(tracing::info_span!("node", i)), - ); + } + Ok(()) + }); + } + anyhow::Ok(()) + }) + .await + }) + .await +} + +/// Run a set of nodes with a Twins network configuration. +async fn run_nodes_twins( + ctx: &ctx::Ctx, + specs: &[Node], + splits: &PortSplitSchedule, +) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + // All known network ports of a validator, so that we can tell if any of + // those addresses are in the same partition as the sender. + let mut validator_ports: HashMap<_, Vec> = HashMap::new(); + // Outbox of consensus instances, paired with their network identity, + // so we can tell which partition they are in in the given view. + let mut recvs = vec![]; + // Inbox of the consensus instances, indexed by their network identity, + // so that we can send to the one which is in the same partition as the sender. + let mut sends = HashMap::new(); + // Blockstores of nodes, indexed by port; used to simulate the effect of gossip. + let mut stores = HashMap::new(); + // Outbound gossip relationships to simulate the effects of block fetching. + let mut gossip_targets = HashMap::new(); + + for (i, spec) in specs.iter().enumerate() { + let (actor_pipe, dispatcher_pipe) = pipe::new(); + let validator_key = spec.net.validator_key.as_ref().unwrap().public(); + let port = spec.net.server_addr.port(); + + validator_ports.entry(validator_key).or_default().push(port); + + sends.insert(port, actor_pipe.send); + recvs.push((port, actor_pipe.recv)); + stores.insert(port, spec.block_store.clone()); + gossip_targets.insert( + port, + spec.net + .gossip + .static_outbound + .values() + .map(|host| { + let addr: std::net::SocketAddr = host.0.parse().expect("valid address"); + addr.port() + }) + .collect(), + ); + + // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors + s.spawn( + async { + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await } + .instrument(tracing::info_span!("node", i)), + ); + } + // Taking these references is necessary for the `scope::run!` environment lifetime rules to compile + // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. + // Potentially `ctx::NoCopy` could be used with `port`. + let validator_ports = &validator_ports; + let sends = &sends; + let stores = &stores; + let gossip_targets = &gossip_targets; + let (gossip_send, gossip_recv) = channel::unbounded(); + + // Run networks by receiving from all consensus instances: + // * identify the view they are in from the message + // * identify the partition they are in based on their network id + // * either broadcast to all other instances in the partition, or find out the network + // identity of the target validator and send to it iff they are in the same partition + // * simulate the gossiping of finalized blockss + scope::run!(ctx, |ctx, s| async move { + for (port, recv) in recvs { + let gossip_send = gossip_send.clone(); + s.spawn(async move { + twins_receive_loop( + ctx, + splits, + validator_ports, + sends, + TwinsGossipConfig { + targets: &gossip_targets[&port], + send: gossip_send, + }, + port, + recv, + ) + .await + }); + } + s.spawn(async { twins_gossip_loop(ctx, stores, gossip_recv).await }); + anyhow::Ok(()) + }) + .await + }) + .await +} + +/// Receive input messages from the consensus actor and send them to the others +/// according to the partition schedule of the port associated with this instance. +/// +/// We have to simulate the gossip layer which isn't instantiated by these tests. +/// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload +/// and won't be able to finalize the block, and won't participate further in the consensus. +async fn twins_receive_loop( + ctx: &ctx::Ctx, + splits: &PortSplitSchedule, + validator_ports: &HashMap>, + sends: &HashMap>, + gossip: TwinsGossipConfig<'_>, + port: Port, + mut recv: UnboundedReceiver, +) -> anyhow::Result<()> { + let rng = &mut ctx.rng(); + let start = std::time::Instant::now(); // Just to give an idea of how long time passes between rounds. + + // Finalized block number iff this node can gossip to the target and the message contains a QC. + let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { + if !gossip.targets.contains(&target_port) { + return None; + } + output_msg_commit_qc(msg).map(|qc| qc.header().number) + }; + + // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. + // The spec says that the network is expected to deliver messages eventually, potentially out of order, + // caveated by the fact that the actual implementation only keeps retrying the last message.. + // A separate issue is the definition of "later", without actually adding timing assumptions: + // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas + // don't move on from a view until they reach quorum, then "later" could be defined by so many + // messages in a view that it indicates a timeout loop. NB the consensus might not actually have + // an actual timeout loop and might rely on the network trying to re-broadcast forever. + // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that + // can move on to the next view, in which a new partition configuration will allow them to broadcast + // to previously isolated peers. + // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer + // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff + // out of the box, because replicas only communicate with their leader, so if for example B missed + // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because + // they can't commit the earlier block until they get a new message from A. However since + // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, + // so we shouldn't have to wait long for A to unstash its messages to B. + // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages + // and unstash them as soon as someone moves on to a new view. + let mut stashes: HashMap> = HashMap::new(); + + // Either stash a message, or unstash all messages and send them to the target along with the new one. + let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let stash = stashes.entry(target_port).or_default(); + let view = output_msg_view_number(&msg); + let kind = output_msg_label(&msg); + + // Remove any previously stashed message of the same kind, because the network will only + // try to send the last one of each, not all pending messages. + stash.retain(|stashed| output_msg_label(stashed) != kind); + + if !can_send { + tracing::info!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); + stash.push(msg); + return; + } + + let s = &sends[&target_port]; + + // Send after taking note of potentially gossipable blocks. + let send = |msg| { + if let Some(number) = block_to_gossip(target_port, &msg) { + gossip.send.send(TwinsGossipMessage { + from: port, + to: target_port, + number, + }); } - Network::Mock => { - let mut sends = HashMap::new(); - let mut recvs = vec![]; - for (i, spec) in specs.iter().enumerate() { - let (actor_pipe, pipe) = pipe::new(); - let key = spec.net.validator_key.as_ref().unwrap().public(); - sends.insert(key, actor_pipe.send); - recvs.push(actor_pipe.recv); - s.spawn( - async { - let mut pipe = pipe; - spec.run(ctx, &mut pipe).await + s.send(msg); + }; + + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); + + for unstashed in stash.drain(0..) { + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + tracing::info!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); + send(unstashed); + } + tracing::info!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); + send(msg); + }; + + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let view_number = message.message.msg.view().number.0 as usize; + // Here we assume that all instances start from view 0 in the tests. + // If the view is higher than what we have planned for, assume no partitions. + // Every node is guaranteed to be present in only one partition. + let partitions_opt = splits.get(view_number); + + if partitions_opt.is_none() { + bail!( + "ran out of scheduled rounds; most likely cannot finalize blocks even if we go on" + ); + } + + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + + match message.recipient { + io::Target::Broadcast => match partitions_opt { + None => { + tracing::info!("broadcasting view={view_number} from={port} target=all"); + for target_port in sends.keys() { + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + tracing::info!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); + for target_port in p { + send_or_stash(can_send, *target_port, msg()); } - .instrument(tracing::info_span!("node", i)), - ); + } } - scope::run!(ctx, |ctx, s| async { - for recv in recvs { - s.spawn(async { - use zksync_consensus_network::io; - let mut recv = recv; - while let Ok(io::InputMessage::Consensus(message)) = - recv.recv(ctx).await - { - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) - }; - match message.recipient { - io::Target::Validator(v) => sends.get(&v).unwrap().send(msg()), - io::Target::Broadcast => { - sends.values().for_each(|s| s.send(msg())) - } + }, + io::Target::Validator(v) => { + let target_ports = &validator_ports[&v]; + + match partitions_opt { + None => { + for target_port in target_ports { + tracing::info!( + "unicasting view={view_number} from={port} target={target_port}" + ); + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + for target_port in target_ports { + if p.contains(target_port) { + tracing::info!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); + send_or_stash(can_send, *target_port, msg()); } } - Ok(()) - }); + } } - anyhow::Ok(()) - }) - .await?; + } + } + } + } + Ok(()) +} + +/// Simulate the effects of gossip: if the source node can gossip to the target, +/// and the message being sent contains a CommitQC, and the sender has the +/// referenced finalized block in its store, then assume that the target +/// could fetch this block if they wanted, and insert it directly into their store. +/// +/// This happens concurrently with the actual message passing, so it's just an +/// approximation. It also happens out of order. This method only contains the +/// send loop, to deal with the spawning of store operations. +async fn twins_gossip_loop( + ctx: &ctx::Ctx, + stores: &HashMap>, + mut recv: UnboundedReceiver, +) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async move { + while let Ok(TwinsGossipMessage { + from, + to, + mut number, + }) = recv.recv(ctx).await + { + let local_store = &stores[&from]; + let remote_store = &stores[&to]; + let first_needed = remote_store.queued().next(); + + loop { + // Stop going back if the target already has the block. + if number < first_needed { + break; + } + // Stop if the source doesn't actually have this block to give. + let Ok(Some(block)) = local_store.block(ctx, number).await else { + break; + }; + // Perform the storing operation asynchronously because `queue_block` will + // wait for all dependencies to be inserted first. + s.spawn_bg(async move { + let _ = remote_store.queue_block(ctx, block).await; + Ok(()) + }); + // Be pessimistic and try to insert all ancestors, to minimise the chance that + // for some reason a node doesn't get a finalized block from anyone. + // Without this some scenarios with twins actually fail. + if let Some(prev) = number.prev() { + number = prev; + } else { + break; + }; } } Ok(()) }) .await } + +fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber { + match msg { + io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number, + } +} + +fn output_msg_label(msg: &io::OutputMessage) -> &str { + match msg { + io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), + } +} + +fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> { + use validator::ConsensusMsg; + match msg { + io::OutputMessage::Consensus(cr) => match &cr.msg.msg { + ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(), + ConsensusMsg::LeaderPrepare(lp) => lp.justification.high_qc(), + ConsensusMsg::ReplicaCommit(_) => None, + ConsensusMsg::LeaderCommit(lc) => Some(&lc.justification), + }, + } +} + +struct TwinsGossipMessage { + from: Port, + to: Port, + number: validator::BlockNumber, +} + +struct TwinsGossipConfig<'a> { + /// Ports to which this node should gossip to. + targets: &'a HashSet, + /// Channel over which to send gossip messages. + send: UnboundedSender, +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index eaddccb2..36d57fbd 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -1,6 +1,20 @@ -use crate::testonly::{ut_harness::UTHarness, Behavior, Network, Test}; -use zksync_concurrency::{ctx, scope, time}; -use zksync_consensus_roles::validator; +use std::collections::HashMap; + +use crate::testonly::{ + twins::{Cluster, HasKey, ScenarioGenerator, Twin}, + ut_harness::UTHarness, + Behavior, Network, PortSplitSchedule, Test, +}; +use zksync_concurrency::{ + ctx::{self, Ctx}, + scope, time, +}; +use zksync_consensus_network::testonly::new_configs_for_validators; +use zksync_consensus_roles::validator::{ + self, + testonly::{Setup, SetupSpec}, + LeaderSelectionMode, PublicKey, SecretKey, +}; async fn run_test(behavior: Behavior, network: Network) { let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); @@ -195,3 +209,195 @@ async fn non_proposing_leader() { .await .unwrap() } + +/// Run Twins scenarios without actual twins, and with so few nodes that all +/// of them are required for a quorum, which means (currently) there won't be +/// any partitions. +/// +/// This should be a simple sanity check that the network works and consensus +/// is achieved under the most favourable conditions. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_wo_twins_wo_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n<6 implies f=0 and q=n + run_twins(ctx, 5, 0, 10).await.unwrap(); +} + +/// Run Twins scenarios without actual twins, but enough replicas that partitions +/// can play a role, isolating certain nodes (potentially the leader) in some +/// rounds. +/// +/// This should be a sanity check that without Byzantine behaviour the consensus +/// is resilient to temporary network partitions. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_wo_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible. + run_twins(ctx, 6, 0, 5).await.unwrap(); +} + +/// Run Twins scenarios with random number of nodes and 1 twin. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_w1_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n>=6 implies f>=1 and q=n-f + for num_replicas in 6..=10 { + // let num_honest = validator::threshold(num_replicas as u64) as usize; + // let max_faulty = num_replicas - num_honest; + // let num_twins = rng.gen_range(1..=max_faulty); + run_twins(ctx, num_replicas, 1, 5).await.unwrap(); + } +} + +/// Run Twins scenarios with higher number of nodes and 2 twins. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_w2_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n>=11 implies f>=2 and q=n-f + run_twins(ctx, 11, 2, 5).await.unwrap(); +} + +/// Create network configuration for a given number of replicas and twins and run [Test]. +async fn run_twins( + ctx: &Ctx, + num_replicas: usize, + num_twins: usize, + num_scenarios: usize, +) -> anyhow::Result<()> { + zksync_concurrency::testonly::abort_on_panic(); + // Use a single timeout for all scenarios to finish. + // A single scenario with 11 replicas took 3-5 seconds. + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60)); + + #[derive(PartialEq, Debug)] + struct Replica { + id: i64, // non-zero ID + public_key: PublicKey, + secret_key: SecretKey, + } + + impl HasKey for Replica { + type Key = PublicKey; + + fn key(&self) -> &Self::Key { + &self.public_key + } + } + + impl Twin for Replica { + fn to_twin(&self) -> Self { + Self { + id: -self.id, + public_key: self.public_key.clone(), + secret_key: self.secret_key.clone(), + } + } + } + + let rng = &mut ctx.rng(); + + // The existing test machinery uses the number of finalized blocks as an exit criteria. + let blocks_to_finalize = 3; + // The test is going to disrupt the communication by partitioning nodes, + // where the leader might not be in a partition with enough replicas to + // form a quorum, therefore to allow N blocks to be finalized we need to + // go longer. + let num_rounds = blocks_to_finalize * 10; + // The paper considers 2 or 3 partitions enough. + let max_partitions = 3; + + // Every validator has equal power of 1. + const WEIGHT: u64 = 1; + let mut spec = SetupSpec::new_with_weights(rng, vec![WEIGHT; num_replicas]); + + let replicas = spec + .validator_weights + .iter() + .enumerate() + .map(|(i, (sk, _))| Replica { + id: i as i64 + 1, + public_key: sk.public(), + secret_key: sk.clone(), + }) + .collect::>(); + + let cluster = Cluster::new(replicas, num_twins); + let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions); + + // Create network config for all nodes in the cluster (assigns unique network addresses). + let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1); + + let node_to_port = cluster + .nodes() + .iter() + .zip(nets.iter()) + .map(|(node, net)| (node.id, net.server_addr.port())) + .collect::>(); + + assert_eq!(node_to_port.len(), cluster.num_nodes()); + + // Every network needs a behaviour. They are all honest, just some might be duplicated. + let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; + + // Reuse the same cluster and network setup to run a few scenarios. + for i in 0..num_scenarios { + // Generate a permutation of partitions and leaders for the given number of rounds. + let scenario = scenarios.generate_one(rng); + + // Assign the leadership schedule to the consensus. + spec.leader_selection = + LeaderSelectionMode::Rota(scenario.rounds.iter().map(|rc| rc.leader.clone()).collect()); + + // Generate a new setup with this leadership schedule. + let setup = Setup::from(spec.clone()); + + // Create a network with the partition schedule of the scenario. + let splits: PortSplitSchedule = scenario + .rounds + .iter() + .map(|rc| { + rc.partitions + .iter() + .map(|p| p.iter().map(|r| node_to_port[&r.id]).collect()) + .collect() + }) + .collect(); + + tracing::info!( + "num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}", + cluster.num_nodes() + ); + + for (r, rc) in scenario.rounds.iter().enumerate() { + let partitions = &splits[r]; + + let leader_ports = cluster + .nodes() + .iter() + .filter(|n| n.public_key == *rc.leader) + .map(|n| node_to_port[&n.id]) + .collect::>(); + + let leader_partition_sizes = leader_ports + .iter() + .map(|lp| partitions.iter().find(|p| p.contains(lp)).unwrap().len()) + .collect::>(); + + let leader_isolated = leader_partition_sizes + .iter() + .all(|s| *s < cluster.quorum_size()); + + tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); + } + + Test { + network: Network::Twins(splits), + nodes: nodes.clone(), + blocks_to_finalize, + } + .run_with_config(ctx, nets.clone(), &setup.genesis) + .await? + } + + Ok(()) +} diff --git a/node/actors/network/src/gossip/fetch.rs b/node/actors/network/src/gossip/fetch.rs index 115852bd..e9ec37fc 100644 --- a/node/actors/network/src/gossip/fetch.rs +++ b/node/actors/network/src/gossip/fetch.rs @@ -64,7 +64,7 @@ impl Queue { ) -> ctx::OrCanceled { let sub = &mut self.0.subscribe(); while ctx.is_active() { - // Wait for the lowest requested block to be available. + // Wait for the lowest requested block to be available on the remote peer. // This scope is always cancelled, so we ignore the result. let mut block_number = None; let _: Result<(), _> = scope::run!(ctx, |ctx, s| async { diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 7091c643..7021ff28 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -49,6 +49,8 @@ pub(crate) struct Network { /// Output pipe of the network actor. pub(crate) sender: channel::UnboundedSender, /// Queue of block fetching requests. + /// + /// These are blocks that this node wants to request from remote peers via RPC. pub(crate) fetch_queue: fetch::Queue, /// Last viewed QC. pub(crate) last_viewed_qc: Option, diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index af727905..bc7eace0 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -204,6 +204,7 @@ impl Network { // Perform get_block calls to peer. s.spawn::<()>(async { + // Gossiped state of what range of blocks is available on the remote peer. let state = &mut push_block_store_state_server.state.subscribe(); loop { let call = get_block_client.reserve(ctx).await?; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 5e08dbbb..897d1354 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -77,7 +77,21 @@ pub fn new_configs( setup: &validator::testonly::Setup, gossip_peers: usize, ) -> Vec { - let configs = setup.validator_keys.iter().map(|validator_key| { + new_configs_for_validators(rng, setup.validator_keys.iter(), gossip_peers) +} + +/// Construct configs for `n` validators of the consensus. +/// +/// This version allows for repeating keys used in Twins tests. +pub fn new_configs_for_validators<'a, I>( + rng: &mut impl Rng, + validator_keys: I, + gossip_peers: usize, +) -> Vec +where + I: Iterator, +{ + let configs = validator_keys.map(|validator_key| { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, diff --git a/node/libs/utils/src/pipe.rs b/node/libs/utils/src/pipe.rs index e8db302c..d1b3a0d7 100644 --- a/node/libs/utils/src/pipe.rs +++ b/node/libs/utils/src/pipe.rs @@ -5,12 +5,30 @@ use std::future::Future; use zksync_concurrency::ctx::{self, channel, Ctx}; /// This is the end of the Pipe that should be held by the actor. +/// +/// The actor can receive `In` and send back `Out` messages. pub type ActorPipe = Pipe; /// This is the end of the Pipe that should be held by the dispatcher. +/// +/// The dispatcher can send `In` messages and receive `Out` back. pub type DispatcherPipe = Pipe; /// This is a generic Pipe end. +/// +/// It is used to receive `In` and send `Out` messages. +/// +/// When viewed from the perspective of [new]: +/// * messages of type `In` are going right-to-left +/// * messages of type `Out` are going left-to-right +/// +/// ```text +/// In <- -------- <- In +/// | Pipe | +/// Out -> -------- -> Out +/// ^ ^ +/// Actor Dispatcher +/// ``` #[derive(Debug)] pub struct Pipe { /// This is the channel that receives messages.