From ae67132a9c6f4f43152962082a5096c83def415d Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 24 May 2024 16:57:26 +0100 Subject: [PATCH 01/25] BFT-465: Skeleton for Twins tests --- node/actors/bft/src/testonly/run.rs | 19 +++- node/actors/bft/src/tests.rs | 131 +++++++++++++++++++++++++++- node/actors/network/src/testonly.rs | 21 ++++- 3 files changed, 162 insertions(+), 9 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 9605e92d..e56d51e1 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,9 +1,10 @@ use super::{Behavior, Node}; +use network::Config; use std::collections::HashMap; use tracing::Instrument as _; use zksync_concurrency::{ctx, oneshot, scope}; use zksync_consensus_network as network; -use zksync_consensus_roles::validator; +use zksync_consensus_roles::validator::{self, Genesis}; use zksync_consensus_storage::testonly::new_store; use zksync_consensus_utils::pipe; @@ -22,7 +23,7 @@ pub(crate) struct Test { } impl Test { - /// Run a test with the given parameters. + /// Run a test with the given parameters and a random network setup. pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let rng = &mut ctx.rng(); let setup = validator::testonly::Setup::new_with_weights( @@ -30,11 +31,21 @@ impl Test { self.nodes.iter().map(|(_, w)| *w).collect(), ); let nets: Vec<_> = network::testonly::new_configs(rng, &setup, 1); + self.run_with_config(ctx, nets, &setup.genesis).await + } + + /// Run a test with the given parameters and network configuration. + pub(crate) async fn run_with_config( + &self, + ctx: &ctx::Ctx, + nets: Vec, + genesis: &Genesis, + ) -> anyhow::Result<()> { let mut nodes = vec![]; let mut honest = vec![]; scope::run!(ctx, |ctx, s| async { for (i, net) in nets.into_iter().enumerate() { - let (store, runner) = new_store(ctx, &setup.genesis).await; + let (store, runner) = new_store(ctx, genesis).await; s.spawn_bg(runner.run(ctx)); if self.nodes[i].0 == Behavior::Honest { honest.push(store.clone()); @@ -50,7 +61,7 @@ impl Test { // Run the nodes until all honest nodes store enough finalized blocks. assert!(self.blocks_to_finalize > 0); - let first = setup.genesis.first_block; + let first = genesis.first_block; let last = first + (self.blocks_to_finalize as u64 - 1); for store in &honest { store.wait_until_queued(ctx, last).await?; diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index eaddccb2..e39204a8 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -1,6 +1,19 @@ -use crate::testonly::{ut_harness::UTHarness, Behavior, Network, Test}; -use zksync_concurrency::{ctx, scope, time}; -use zksync_consensus_roles::validator; +use crate::testonly::{ + twins::{Cluster, HasKey, ScenarioGenerator, Twin}, + ut_harness::UTHarness, + Behavior, Network, Test, +}; +use rand::Rng; +use zksync_concurrency::{ + ctx::{self, Ctx}, + scope, time, +}; +use zksync_consensus_network::testonly::new_configs_for_validators; +use zksync_consensus_roles::validator::{ + self, + testonly::{Setup, SetupSpec}, + LeaderSelectionMode, PublicKey, +}; async fn run_test(behavior: Behavior, network: Network) { let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); @@ -195,3 +208,115 @@ async fn non_proposing_leader() { .await .unwrap() } + +/// Run Twins scenarios without actual twins, so just random partitions and leaders, +/// to see that the basic mechanics of the network allow finalizations to happen. +#[tokio::test(flavor = "multi_thread")] +async fn honest_no_twins_network() { + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + for _ in 0..5 { + let num_replicas = rng.gen_range(1..=11); + run_twins(ctx, num_replicas, false); + } +} + +async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { + #[derive(PartialEq)] + struct Replica { + id: i64, + public_key: PublicKey, + } + + impl HasKey for Replica { + type Key = PublicKey; + + fn key(&self) -> &Self::Key { + &self.public_key + } + } + + impl Twin for Replica { + fn to_twin(&self) -> Self { + Self { + id: self.id * -1, + public_key: self.public_key.clone(), + } + } + } + + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + zksync_concurrency::testonly::abort_on_panic(); + let rng = &mut ctx.rng(); + + // The existing test machinery uses the number of finalized blocks as an exit criteria. + let blocks_to_finalize = 5; + // The test is going to disrupt the communication by partitioning nodes, + // where the leader might not be in a partition with enough replicas to + // form a quorum, therefore to allow N blocks to be finalized we need to + // go longer. + let num_rounds = blocks_to_finalize * 5; + // The paper considers 2 or 3 partitions enough. + let max_partitions = 3; + + // Everyone on the twins network is honest. + // For now assign one power each (not, say, 100 each, or varying weights). + let mut nodes = vec![(Behavior::Honest, 1u64); num_replicas]; + let num_honest = validator::threshold(num_replicas as u64) as usize; + let num_faulty = num_replicas - num_honest; + let num_twins = if use_twins && num_faulty > 0 { + rng.gen_range(1..=num_faulty) + } else { + 0 + }; + + let mut spec = SetupSpec::new_with_weights(rng, nodes.iter().map(|(_, w)| *w).collect()); + + let replicas = spec + .validator_weights + .iter() + .enumerate() + .map(|(i, (sk, _))| Replica { + id: i as i64, + public_key: sk.public(), + }) + .collect::>(); + + let cluster = Cluster::new(replicas, num_twins); + let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions); + + // Reuse the same cluster to run a few scenarios. + for _ in 0..10 { + // Generate a permutation of partitions and leaders for the given number of rounds. + let scenario = scenarios.generate_one(rng); + + // Assign the leadership schedule to the consensus. + spec.leader_selection = + LeaderSelectionMode::Rota(scenario.rounds.iter().map(|rc| rc.leader.clone()).collect()); + + // Generate a new setup with this leadership schedule. + let setup = Setup::from(spec.clone()); + + // Create network config for honest nodes, and then extras for the twins. + let validator_keys = setup + .validator_keys + .iter() + .chain(setup.validator_keys.iter().take(num_twins)); + + let nets = new_configs_for_validators(rng, validator_keys, 1); + + // TODO: Create a network mode that supports partition schedule, + // which requires identifying the sender network (not validator) identity. + let network = todo!() + + Test { + network, + nodes, + blocks_to_finalize, + } + .run_with_config(ctx, nets, &setup.genesis) + .await + .unwrap() + } +} diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 5e08dbbb..e7d452e6 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -13,7 +13,10 @@ use std::{ sync::Arc, }; use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync}; -use zksync_consensus_roles::{node, validator}; +use zksync_consensus_roles::{ + node, + validator::{self, SecretKey}, +}; use zksync_consensus_storage::BlockStore; use zksync_consensus_utils::pipe; @@ -77,7 +80,21 @@ pub fn new_configs( setup: &validator::testonly::Setup, gossip_peers: usize, ) -> Vec { - let configs = setup.validator_keys.iter().map(|validator_key| { + new_configs_for_validators(rng, setup.validator_keys.iter(), gossip_peers) +} + +/// Construct configs for `n` validators of the consensus. +/// +/// This version allows for repeating keys used in Twins tests. +pub fn new_configs_for_validators<'a, I>( + rng: &mut impl Rng, + validator_keys: I, + gossip_peers: usize, +) -> Vec +where + I: Iterator, +{ + let configs = validator_keys.map(|validator_key| { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, From 1e4357f2f109edc5e2a6474450ea218839df4ee8 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 11:55:27 +0100 Subject: [PATCH 02/25] BFT-465: Some comments to help understand the flow --- node/actors/bft/src/testonly/node.rs | 5 +++++ node/actors/bft/src/testonly/run.rs | 14 +++++++++++--- node/libs/utils/src/pipe.rs | 18 ++++++++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/node/actors/bft/src/testonly/node.rs b/node/actors/bft/src/testonly/node.rs index d08a52d0..7b348e15 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/actors/bft/src/testonly/node.rs @@ -62,6 +62,7 @@ impl Node { let mut con_recv = consensus_pipe.recv; let con_send = consensus_pipe.send; scope::run!(ctx, |ctx, s| async { + // Run the consensus actor s.spawn(async { let validator_key = self.net.validator_key.clone().unwrap(); crate::Config { @@ -75,6 +76,8 @@ impl Node { .await .context("consensus.run()") }); + // Forward input messages received from the network to the actor; + // turns output from others to input for this instance. s.spawn(async { while let Ok(network_message) = net_recv.recv(ctx).await { match network_message { @@ -85,6 +88,8 @@ impl Node { } Ok(()) }); + // Forward output messages from the actor to the network; + // turns output from this to inputs for others. // Get the next message from the channel. Our response depends on what type of replica we are. while let Ok(msg) = con_recv.recv(ctx).await { match msg { diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index e56d51e1..7df6a86d 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -108,21 +108,29 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow:: } } Network::Mock => { + // Actor inputs, ie. where the test can send messages to the consensus. let mut sends = HashMap::new(); + // Actor outputs, ie. the messages the actor wants to send to the others. let mut recvs = vec![]; for (i, spec) in specs.iter().enumerate() { - let (actor_pipe, pipe) = pipe::new(); + let (actor_pipe, dispatcher_pipe) = pipe::new(); let key = spec.net.validator_key.as_ref().unwrap().public(); sends.insert(key, actor_pipe.send); recvs.push(actor_pipe.recv); + // Run consensus; the dispatcher pipe is its network connection, + // which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors s.spawn( async { - let mut pipe = pipe; - spec.run(ctx, &mut pipe).await + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await } .instrument(tracing::info_span!("node", i)), ); } + // Run mock networks by receiving the output-turned-input from all consensus + // instances and forwarding them to the others. scope::run!(ctx, |ctx, s| async { for recv in recvs { s.spawn(async { diff --git a/node/libs/utils/src/pipe.rs b/node/libs/utils/src/pipe.rs index e8db302c..d1b3a0d7 100644 --- a/node/libs/utils/src/pipe.rs +++ b/node/libs/utils/src/pipe.rs @@ -5,12 +5,30 @@ use std::future::Future; use zksync_concurrency::ctx::{self, channel, Ctx}; /// This is the end of the Pipe that should be held by the actor. +/// +/// The actor can receive `In` and send back `Out` messages. pub type ActorPipe = Pipe; /// This is the end of the Pipe that should be held by the dispatcher. +/// +/// The dispatcher can send `In` messages and receive `Out` back. pub type DispatcherPipe = Pipe; /// This is a generic Pipe end. +/// +/// It is used to receive `In` and send `Out` messages. +/// +/// When viewed from the perspective of [new]: +/// * messages of type `In` are going right-to-left +/// * messages of type `Out` are going left-to-right +/// +/// ```text +/// In <- -------- <- In +/// | Pipe | +/// Out -> -------- -> Out +/// ^ ^ +/// Actor Dispatcher +/// ``` #[derive(Debug)] pub struct Pipe { /// This is the channel that receives messages. From a93aa84d90b63deb4d6a30952329d941a10fe872 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 12:03:16 +0100 Subject: [PATCH 03/25] BFT-465: Split different network executions --- node/actors/bft/src/testonly/run.rs | 147 ++++++++++++++-------------- 1 file changed, 75 insertions(+), 72 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 7df6a86d..ab73e00c 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -83,84 +83,87 @@ impl Test { /// Run a set of nodes. async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow::Result<()> { + match network { + Network::Real => run_nodes_real(ctx, specs).await, + Network::Mock => run_nodes_mock(ctx, specs).await, + } +} + +/// Run a set of nodes with a real network. +async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { - match network { - Network::Real => { - let mut nodes = vec![]; - for (i, spec) in specs.iter().enumerate() { - let (node, runner) = network::testonly::Instance::new( - spec.net.clone(), - spec.block_store.clone(), - ); - s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); - nodes.push(node); + let mut nodes = vec![]; + for (i, spec) in specs.iter().enumerate() { + let (node, runner) = + network::testonly::Instance::new(spec.net.clone(), spec.block_store.clone()); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + nodes.push(node); + } + network::testonly::instant_network(ctx, nodes.iter()).await?; + for (i, node) in nodes.into_iter().enumerate() { + let spec = &specs[i]; + s.spawn( + async { + let mut node = node; + spec.run(ctx, node.pipe()).await } - network::testonly::instant_network(ctx, nodes.iter()).await?; - for (i, node) in nodes.into_iter().enumerate() { - let spec = &specs[i]; - s.spawn( - async { - let mut node = node; - spec.run(ctx, node.pipe()).await - } - .instrument(tracing::info_span!("node", i)), - ); + .instrument(tracing::info_span!("node", i)), + ); + } + Ok(()) + }) + .await +} + +/// Run a set of nodes with a mock network. +async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + // Actor inputs, ie. where the test can send messages to the consensus. + let mut sends = HashMap::new(); + // Actor outputs, ie. the messages the actor wants to send to the others. + let mut recvs = vec![]; + for (i, spec) in specs.iter().enumerate() { + let (actor_pipe, dispatcher_pipe) = pipe::new(); + let key = spec.net.validator_key.as_ref().unwrap().public(); + sends.insert(key, actor_pipe.send); + recvs.push(actor_pipe.recv); + // Run consensus; the dispatcher pipe is its network connection, + // which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors + s.spawn( + async { + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await } - } - Network::Mock => { - // Actor inputs, ie. where the test can send messages to the consensus. - let mut sends = HashMap::new(); - // Actor outputs, ie. the messages the actor wants to send to the others. - let mut recvs = vec![]; - for (i, spec) in specs.iter().enumerate() { - let (actor_pipe, dispatcher_pipe) = pipe::new(); - let key = spec.net.validator_key.as_ref().unwrap().public(); - sends.insert(key, actor_pipe.send); - recvs.push(actor_pipe.recv); - // Run consensus; the dispatcher pipe is its network connection, - // which means we can use the actor pipe to: - // * send Output messages from other actors to this consensus instance - // * receive Input messages sent by this consensus to the other actors - s.spawn( - async { - let mut network_pipe = dispatcher_pipe; - spec.run(ctx, &mut network_pipe).await + .instrument(tracing::info_span!("node", i)), + ); + } + // Run mock networks by receiving the output-turned-input from all consensus + // instances and forwarding them to the others. + scope::run!(ctx, |ctx, s| async { + for recv in recvs { + s.spawn(async { + use zksync_consensus_network::io; + let mut recv = recv; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + match message.recipient { + io::Target::Validator(v) => sends.get(&v).unwrap().send(msg()), + io::Target::Broadcast => sends.values().for_each(|s| s.send(msg())), } - .instrument(tracing::info_span!("node", i)), - ); - } - // Run mock networks by receiving the output-turned-input from all consensus - // instances and forwarding them to the others. - scope::run!(ctx, |ctx, s| async { - for recv in recvs { - s.spawn(async { - use zksync_consensus_network::io; - let mut recv = recv; - while let Ok(io::InputMessage::Consensus(message)) = - recv.recv(ctx).await - { - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) - }; - match message.recipient { - io::Target::Validator(v) => sends.get(&v).unwrap().send(msg()), - io::Target::Broadcast => { - sends.values().for_each(|s| s.send(msg())) - } - } - } - Ok(()) - }); } - anyhow::Ok(()) - }) - .await?; + Ok(()) + }); } - } - Ok(()) + anyhow::Ok(()) + }) + .await }) .await } From 43fb874bab747ba05c02fa50ebed27bb4287b6d3 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 14:04:46 +0100 Subject: [PATCH 04/25] BFT-465: Twins network skeleton --- node/actors/bft/src/testonly/run.rs | 130 +++++++++++++++++++++++++++- node/actors/bft/src/tests.rs | 4 +- 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index ab73e00c..a8eb5b3e 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,6 +1,6 @@ use super::{Behavior, Node}; use network::Config; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tracing::Instrument as _; use zksync_concurrency::{ctx, oneshot, scope}; use zksync_consensus_network as network; @@ -14,6 +14,14 @@ pub(crate) enum Network { Mock, } +// Identify different network identities of twins by their listener port. +// They are all expected to be on localhost, but `ListenerAddr` can't be +// directly used as a map key. +type Port = u16; +type PortPartition = HashSet; +type PortSplit = Vec; +type PortSplitSchedule = Vec; + /// Config for the test. Determines the parameters to run the test with. #[derive(Clone)] pub(crate) struct Test { @@ -127,8 +135,7 @@ async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { let key = spec.net.validator_key.as_ref().unwrap().public(); sends.insert(key, actor_pipe.send); recvs.push(actor_pipe.recv); - // Run consensus; the dispatcher pipe is its network connection, - // which means we can use the actor pipe to: + // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: // * send Output messages from other actors to this consensus instance // * receive Input messages sent by this consensus to the other actors s.spawn( @@ -167,3 +174,120 @@ async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { }) .await } + +/// Run a set of nodes with a Twins network configuration. +async fn run_nodes_twins( + ctx: &ctx::Ctx, + specs: &[Node], + splits: PortSplitSchedule, +) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + // All known network ports of a validator, so that we can tell if any of + // those addresses are in the same partition as the sender. + let mut validator_ports: HashMap<_, Vec> = HashMap::new(); + // Outbox of consensus instances, paired with their network identity, + // so we can tell which partition they are in in the given view. + let mut recvs = vec![]; + // Inbox of the consensus instances, indexed by their network identity, + // so that we can send to the one which is in the same partition as the sender. + let mut sends = HashMap::new(); + + // TODO: Buffer messages that aren't delivered in partitions until the end + // of the test and deliver them then. Need to define when it's time to deliver + // all buffered messages: if we allow all partitions to be less than required + // for a quorum, they will keep timing out in the same view; one indication is + // too many messages received in the same view, e.g. more than 2x the number of nodes. + + for (i, spec) in specs.iter().enumerate() { + let (actor_pipe, dispatcher_pipe) = pipe::new(); + let validator_key = spec.net.validator_key.as_ref().unwrap().public(); + let port = spec.net.server_addr.port(); + + validator_ports.entry(validator_key).or_default().push(port); + + sends.insert(port, actor_pipe.send); + recvs.push((port, actor_pipe.recv)); + + // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: + // * send Output messages from other actors to this consensus instance + // * receive Input messages sent by this consensus to the other actors + s.spawn( + async { + let mut network_pipe = dispatcher_pipe; + spec.run(ctx, &mut network_pipe).await + } + .instrument(tracing::info_span!("node", i)), + ); + } + // Run networks by receiving from all consensus instances: + // * identify the view they are in from the message + // * identify the partition they are in based on their network id + // * either broadcast to all other instances in the partition, or find out the network + // identity of the target validator and send to it _iff_ they are in the same partition + scope::run!(ctx, |ctx, s| async { + for (port, recv) in recvs { + s.spawn(async { + let mut recv = recv; + use zksync_consensus_network::io; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let view_number = message.message.msg.view().number; + // Here we assume that all instances start from view 0 in the tests. + // If the view is higher than what we have planned for, assume no partitions. + let partitions_opt = splits.get(view_number.0 as usize); + + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + + match message.recipient { + io::Target::Broadcast => match partitions_opt { + None => sends.values().for_each(|s| s.send(msg())), + Some(ps) => { + for p in ps { + if !p.contains(&port) { + continue; + } + for target_port in p { + sends[target_port].send(msg()); + } + } + } + }, + io::Target::Validator(v) => { + let target_ports = validator_ports.get(&v).unwrap(); + + match partitions_opt { + None => { + for target_port in target_ports.iter() { + sends[&target_port].send(msg()); + } + } + Some(ps) => { + for p in ps { + if !p.contains(&port) { + continue; + } + for target_port in target_ports.iter() { + if !p.contains(&target_port) { + continue; + } + sends[&target_port].send(msg()) + } + } + } + } + } + } + } + Ok(()) + }); + } + anyhow::Ok(()) + }) + .await + }) + .await +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index e39204a8..59bcbb9c 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -213,6 +213,7 @@ async fn non_proposing_leader() { /// to see that the basic mechanics of the network allow finalizations to happen. #[tokio::test(flavor = "multi_thread")] async fn honest_no_twins_network() { + // TODO: Speed up the clock. let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); @@ -262,7 +263,7 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { // Everyone on the twins network is honest. // For now assign one power each (not, say, 100 each, or varying weights). - let mut nodes = vec![(Behavior::Honest, 1u64); num_replicas]; + let nodes = vec![(Behavior::Honest, 1u64); num_replicas]; let num_honest = validator::threshold(num_replicas as u64) as usize; let num_faulty = num_replicas - num_honest; let num_twins = if use_twins && num_faulty > 0 { @@ -304,6 +305,7 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { .iter() .chain(setup.validator_keys.iter().take(num_twins)); + // Create the network configuration, e.g. assign a unique network address to each validator. let nets = new_configs_for_validators(rng, validator_keys, 1); // TODO: Create a network mode that supports partition schedule, From a26d2cfb8bd2bc543a5f071965716fd9f4a92f4e Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 17:55:43 +0100 Subject: [PATCH 05/25] BFT-465: Make it compile with lifetimes --- node/actors/bft/src/testonly/run.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index a8eb5b3e..1c7ebffc 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -224,10 +224,12 @@ async fn run_nodes_twins( // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network // identity of the target validator and send to it _iff_ they are in the same partition - scope::run!(ctx, |ctx, s| async { - for (port, recv) in recvs { - s.spawn(async { - let mut recv = recv; + let splits = &splits; + let sends = &sends; + let validator_ports = &validator_ports; + scope::run!(ctx, |ctx, s| async move { + for (port, mut recv) in recvs { + s.spawn(async move { use zksync_consensus_network::io; while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { let view_number = message.message.msg.view().number; @@ -257,11 +259,11 @@ async fn run_nodes_twins( } }, io::Target::Validator(v) => { - let target_ports = validator_ports.get(&v).unwrap(); + let target_ports = &validator_ports[&v]; match partitions_opt { None => { - for target_port in target_ports.iter() { + for target_port in target_ports { sends[&target_port].send(msg()); } } @@ -270,7 +272,7 @@ async fn run_nodes_twins( if !p.contains(&port) { continue; } - for target_port in target_ports.iter() { + for target_port in target_ports { if !p.contains(&target_port) { continue; } From 0a413a4025df51668261858c3897842c08e3ce6a Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 17:58:27 +0100 Subject: [PATCH 06/25] BFT-465: Use break after partition is found --- node/actors/bft/src/testonly/run.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 1c7ebffc..a7274d6d 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -235,6 +235,7 @@ async fn run_nodes_twins( let view_number = message.message.msg.view().number; // Here we assume that all instances start from view 0 in the tests. // If the view is higher than what we have planned for, assume no partitions. + // Ever node is guaranteed to be present in only one partition. let partitions_opt = splits.get(view_number.0 as usize); let msg = || { @@ -249,11 +250,11 @@ async fn run_nodes_twins( None => sends.values().for_each(|s| s.send(msg())), Some(ps) => { for p in ps { - if !p.contains(&port) { - continue; - } - for target_port in p { - sends[target_port].send(msg()); + if p.contains(&port) { + for target_port in p { + sends[target_port].send(msg()); + } + break; } } } @@ -269,14 +270,13 @@ async fn run_nodes_twins( } Some(ps) => { for p in ps { - if !p.contains(&port) { - continue; - } - for target_port in target_ports { - if !p.contains(&target_port) { - continue; + if p.contains(&port) { + for target_port in target_ports { + if p.contains(&target_port) { + sends[&target_port].send(msg()) + } } - sends[&target_port].send(msg()) + break; } } } From 34afa95e4c14b9747714ff969b5baa670920413b Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 18:04:04 +0100 Subject: [PATCH 07/25] BFT-465: Add Network::Twins --- node/actors/bft/src/testonly/run.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index a7274d6d..a118417c 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -8,10 +8,11 @@ use zksync_consensus_roles::validator::{self, Genesis}; use zksync_consensus_storage::testonly::new_store; use zksync_consensus_utils::pipe; -#[derive(Clone, Copy)] +#[derive(Clone)] pub(crate) enum Network { Real, Mock, + Twins(PortSplitSchedule), } // Identify different network identities of twins by their listener port. @@ -65,7 +66,7 @@ impl Test { }); } assert!(!honest.is_empty()); - s.spawn_bg(run_nodes(ctx, self.network, &nodes)); + s.spawn_bg(run_nodes(ctx, &self.network, &nodes)); // Run the nodes until all honest nodes store enough finalized blocks. assert!(self.blocks_to_finalize > 0); @@ -90,10 +91,11 @@ impl Test { } /// Run a set of nodes. -async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow::Result<()> { +async fn run_nodes(ctx: &ctx::Ctx, network: &Network, specs: &[Node]) -> anyhow::Result<()> { match network { Network::Real => run_nodes_real(ctx, specs).await, Network::Mock => run_nodes_mock(ctx, specs).await, + Network::Twins(splits) => run_nodes_twins(ctx, specs, splits).await, } } @@ -179,7 +181,7 @@ async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { async fn run_nodes_twins( ctx: &ctx::Ctx, specs: &[Node], - splits: PortSplitSchedule, + splits: &PortSplitSchedule, ) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { // All known network ports of a validator, so that we can tell if any of @@ -224,7 +226,6 @@ async fn run_nodes_twins( // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network // identity of the target validator and send to it _iff_ they are in the same partition - let splits = &splits; let sends = &sends; let validator_ports = &validator_ports; scope::run!(ctx, |ctx, s| async move { From 6360c12a2b9a49e2207e4f4208f4402025482ab3 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 28 May 2024 20:44:53 +0100 Subject: [PATCH 08/25] BFT-465: Call twins test runner --- node/actors/bft/src/testonly/run.rs | 6 +- node/actors/bft/src/tests.rs | 97 +++++++++++++++++++---------- 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index a118417c..debd3d1b 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -266,15 +266,15 @@ async fn run_nodes_twins( match partitions_opt { None => { for target_port in target_ports { - sends[&target_port].send(msg()); + sends[target_port].send(msg()); } } Some(ps) => { for p in ps { if p.contains(&port) { for target_port in target_ports { - if p.contains(&target_port) { - sends[&target_port].send(msg()) + if p.contains(target_port) { + sends[target_port].send(msg()) } } break; diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 59bcbb9c..b43d06cd 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::testonly::{ twins::{Cluster, HasKey, ScenarioGenerator, Twin}, ut_harness::UTHarness, @@ -12,7 +14,7 @@ use zksync_consensus_network::testonly::new_configs_for_validators; use zksync_consensus_roles::validator::{ self, testonly::{Setup, SetupSpec}, - LeaderSelectionMode, PublicKey, + LeaderSelectionMode, PublicKey, SecretKey, }; async fn run_test(behavior: Behavior, network: Network) { @@ -212,22 +214,38 @@ async fn non_proposing_leader() { /// Run Twins scenarios without actual twins, so just random partitions and leaders, /// to see that the basic mechanics of the network allow finalizations to happen. #[tokio::test(flavor = "multi_thread")] -async fn honest_no_twins_network() { - // TODO: Speed up the clock. - let ctx = &ctx::test_root(&ctx::RealClock); +async fn twins_network_without_twins() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); let rng = &mut ctx.rng(); for _ in 0..5 { let num_replicas = rng.gen_range(1..=11); - run_twins(ctx, num_replicas, false); + run_twins(ctx, num_replicas, false).await.unwrap(); } } -async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { +/// Run Twins scenarios with actual twins. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_with_twins() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + let rng = &mut ctx.rng(); + + for _ in 0..10 { + let num_replicas = rng.gen_range(1..=11); + run_twins(ctx, num_replicas, true).await.unwrap(); + } +} + +async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) -> anyhow::Result<()> { + zksync_concurrency::testonly::abort_on_panic(); + // Give 30 seconds for all scenarios to finish. + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + #[derive(PartialEq)] struct Replica { id: i64, public_key: PublicKey, + secret_key: SecretKey, } impl HasKey for Replica { @@ -241,14 +259,13 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { impl Twin for Replica { fn to_twin(&self) -> Self { Self { - id: self.id * -1, + id: -self.id, public_key: self.public_key.clone(), + secret_key: self.secret_key.clone(), } } } - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); - zksync_concurrency::testonly::abort_on_panic(); let rng = &mut ctx.rng(); // The existing test machinery uses the number of finalized blocks as an exit criteria. @@ -261,18 +278,17 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { // The paper considers 2 or 3 partitions enough. let max_partitions = 3; - // Everyone on the twins network is honest. - // For now assign one power each (not, say, 100 each, or varying weights). - let nodes = vec![(Behavior::Honest, 1u64); num_replicas]; let num_honest = validator::threshold(num_replicas as u64) as usize; - let num_faulty = num_replicas - num_honest; - let num_twins = if use_twins && num_faulty > 0 { - rng.gen_range(1..=num_faulty) + let max_faulty = num_replicas - num_honest; + let num_twins = if use_twins && max_faulty > 0 { + rng.gen_range(1..=max_faulty) } else { 0 }; - let mut spec = SetupSpec::new_with_weights(rng, nodes.iter().map(|(_, w)| *w).collect()); + // Every validator has equal power of 1. + const WEIGHT: u64 = 1; + let mut spec = SetupSpec::new_with_weights(rng, vec![WEIGHT; num_replicas]); let replicas = spec .validator_weights @@ -281,13 +297,26 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { .map(|(i, (sk, _))| Replica { id: i as i64, public_key: sk.public(), + secret_key: sk.clone(), }) .collect::>(); let cluster = Cluster::new(replicas, num_twins); let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions); - // Reuse the same cluster to run a few scenarios. + // Create network config for all nodes in the cluster (assigns unique network addresses). + let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1); + let node_to_port = cluster + .nodes() + .iter() + .zip(nets.iter()) + .map(|(node, net)| (node.id, net.server_addr.port())) + .collect::>(); + + // Every network needs a behaviour. They are all honest, just some might be duplicated. + let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; + + // Reuse the same cluster and network setup to run a few scenarios. for _ in 0..10 { // Generate a permutation of partitions and leaders for the given number of rounds. let scenario = scenarios.generate_one(rng); @@ -299,26 +328,28 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) { // Generate a new setup with this leadership schedule. let setup = Setup::from(spec.clone()); - // Create network config for honest nodes, and then extras for the twins. - let validator_keys = setup - .validator_keys - .iter() - .chain(setup.validator_keys.iter().take(num_twins)); - - // Create the network configuration, e.g. assign a unique network address to each validator. - let nets = new_configs_for_validators(rng, validator_keys, 1); - - // TODO: Create a network mode that supports partition schedule, - // which requires identifying the sender network (not validator) identity. - let network = todo!() + // Create a network with the partition schedule of the scenario. + let network = Network::Twins( + scenario + .rounds + .into_iter() + .map(|rc| { + rc.partitions + .into_iter() + .map(|p| p.into_iter().map(|r| node_to_port[&r.id]).collect()) + .collect() + }) + .collect(), + ); Test { network, - nodes, + nodes: nodes.clone(), blocks_to_finalize, } - .run_with_config(ctx, nets, &setup.genesis) - .await - .unwrap() + .run_with_config(ctx, nets.clone(), &setup.genesis) + .await? } + + Ok(()) } From 9c06ad482d66a79a6ab5b6da5a6272a433f1531f Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 29 May 2024 11:28:30 +0100 Subject: [PATCH 09/25] BFT-465: Try with the simplest test --- node/actors/bft/src/testonly/run.rs | 6 ++-- node/actors/bft/src/tests.rs | 47 +++++++++++++++-------------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index debd3d1b..44f0d3ed 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -221,13 +221,15 @@ async fn run_nodes_twins( .instrument(tracing::info_span!("node", i)), ); } + // Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile + // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. + let sends = &sends; + let validator_ports = &validator_ports; // Run networks by receiving from all consensus instances: // * identify the view they are in from the message // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network // identity of the target validator and send to it _iff_ they are in the same partition - let sends = &sends; - let validator_ports = &validator_ports; scope::run!(ctx, |ctx, s| async move { for (port, mut recv) in recvs { s.spawn(async move { diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index b43d06cd..e7f68fe2 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -211,35 +211,38 @@ async fn non_proposing_leader() { .unwrap() } -/// Run Twins scenarios without actual twins, so just random partitions and leaders, -/// to see that the basic mechanics of the network allow finalizations to happen. +/// Run Twins scenarios without actual twins, and with so few nodes that all +/// of them are required for a quorum, which means (currently) there won't be +/// any partitions. +/// +/// This should be a simple sanity check that the network works and consensus +/// is achieved under the most favourable conditions. #[tokio::test(flavor = "multi_thread")] -async fn twins_network_without_twins() { +async fn twins_network_without_twins_or_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); - let rng = &mut ctx.rng(); - - for _ in 0..5 { - let num_replicas = rng.gen_range(1..=11); - run_twins(ctx, num_replicas, false).await.unwrap(); - } -} - -/// Run Twins scenarios with actual twins. -#[tokio::test(flavor = "multi_thread")] -async fn twins_network_with_twins() { - let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); - let rng = &mut ctx.rng(); - - for _ in 0..10 { - let num_replicas = rng.gen_range(1..=11); - run_twins(ctx, num_replicas, true).await.unwrap(); - } + let num_replicas = 5; // Implies f=0 and q=5 + run_twins(ctx, num_replicas, false).await.unwrap(); } +// /// Run Twins scenarios without actual twins, but enough replicas that partitions +// /// can play a role, isolating certain nodes (potentially the leader) in some +// /// rounds. +// /// +// /// This should be a sanity check that without Byzantine behaviour the consensus +// /// is resilient to temporary network partitions. +// #[tokio::test(flavor = "multi_thread")] +// async fn twins_network_without_twins() { +// let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); +// let num_replicas = 6; // Implies f=1 and q=5 +// run_twins(ctx, num_replicas, false).await.unwrap(); +// } + +/// Create network configuration for a given number of replicas with a random number of twins and run [Test]. async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) -> anyhow::Result<()> { zksync_concurrency::testonly::abort_on_panic(); // Give 30 seconds for all scenarios to finish. let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + const NUM_SCENARIOS: usize = 10; #[derive(PartialEq)] struct Replica { @@ -317,7 +320,7 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) -> anyhow::R let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; // Reuse the same cluster and network setup to run a few scenarios. - for _ in 0..10 { + for _ in 0..NUM_SCENARIOS { // Generate a permutation of partitions and leaders for the given number of rounds. let scenario = scenarios.generate_one(rng); From 7a2c516ec439fafab8b8e37dcd5c7e23e5ead442 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 29 May 2024 11:51:53 +0100 Subject: [PATCH 10/25] BFT-465: Try with partitions --- node/actors/bft/src/tests.rs | 66 +++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index e7f68fe2..9594fde8 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -218,31 +218,35 @@ async fn non_proposing_leader() { /// This should be a simple sanity check that the network works and consensus /// is achieved under the most favourable conditions. #[tokio::test(flavor = "multi_thread")] -async fn twins_network_without_twins_or_partitions() { +async fn twins_network_wo_twins_wo_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); - let num_replicas = 5; // Implies f=0 and q=5 - run_twins(ctx, num_replicas, false).await.unwrap(); + // n<6 implies f=0 and q=n + run_twins(ctx, 5, false, 10).await.unwrap(); } -// /// Run Twins scenarios without actual twins, but enough replicas that partitions -// /// can play a role, isolating certain nodes (potentially the leader) in some -// /// rounds. -// /// -// /// This should be a sanity check that without Byzantine behaviour the consensus -// /// is resilient to temporary network partitions. -// #[tokio::test(flavor = "multi_thread")] -// async fn twins_network_without_twins() { -// let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); -// let num_replicas = 6; // Implies f=1 and q=5 -// run_twins(ctx, num_replicas, false).await.unwrap(); -// } +/// Run Twins scenarios without actual twins, but enough replicas that partitions +/// can play a role, isolating certain nodes (potentially the leader) in some +/// rounds. +/// +/// This should be a sanity check that without Byzantine behaviour the consensus +/// is resilient to temporary network partitions. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_wo_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // TODO: At the moment this test doesn't work with partitions, so just try to do a single scenario to debug. + run_twins(ctx, 6, false, 1).await.unwrap(); +} /// Create network configuration for a given number of replicas with a random number of twins and run [Test]. -async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) -> anyhow::Result<()> { +async fn run_twins( + ctx: &Ctx, + num_replicas: usize, + use_twins: bool, + num_scenarios: usize, +) -> anyhow::Result<()> { zksync_concurrency::testonly::abort_on_panic(); // Give 30 seconds for all scenarios to finish. let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); - const NUM_SCENARIOS: usize = 10; #[derive(PartialEq)] struct Replica { @@ -320,7 +324,7 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) -> anyhow::R let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; // Reuse the same cluster and network setup to run a few scenarios. - for _ in 0..NUM_SCENARIOS { + for _ in 0..num_scenarios { // Generate a permutation of partitions and leaders for the given number of rounds. let scenario = scenarios.generate_one(rng); @@ -332,21 +336,21 @@ async fn run_twins(ctx: &Ctx, num_replicas: usize, use_twins: bool) -> anyhow::R let setup = Setup::from(spec.clone()); // Create a network with the partition schedule of the scenario. - let network = Network::Twins( - scenario - .rounds - .into_iter() - .map(|rc| { - rc.partitions - .into_iter() - .map(|p| p.into_iter().map(|r| node_to_port[&r.id]).collect()) - .collect() - }) - .collect(), - ); + let splits = scenario + .rounds + .into_iter() + .map(|rc| { + rc.partitions + .into_iter() + .map(|p| p.into_iter().map(|r| node_to_port[&r.id]).collect()) + .collect() + }) + .collect(); + + eprintln!("splits = {:?}", splits); Test { - network, + network: Network::Twins(splits), nodes: nodes.clone(), blocks_to_finalize, } From 8a838e3a1cb04fa55b948671b66e504148e9e3c0 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 29 May 2024 13:07:10 +0100 Subject: [PATCH 11/25] BFT-465: Debugging messages --- node/actors/bft/src/testonly/run.rs | 24 ++++++++++++++------- node/actors/bft/src/tests.rs | 33 ++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 44f0d3ed..a993cf70 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -18,10 +18,10 @@ pub(crate) enum Network { // Identify different network identities of twins by their listener port. // They are all expected to be on localhost, but `ListenerAddr` can't be // directly used as a map key. -type Port = u16; -type PortPartition = HashSet; -type PortSplit = Vec; -type PortSplitSchedule = Vec; +pub(crate) type Port = u16; +pub(crate) type PortPartition = HashSet; +pub(crate) type PortSplit = Vec; +pub(crate) type PortSplitSchedule = Vec; /// Config for the test. Determines the parameters to run the test with. #[derive(Clone)] @@ -235,11 +235,11 @@ async fn run_nodes_twins( s.spawn(async move { use zksync_consensus_network::io; while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { - let view_number = message.message.msg.view().number; + let view_number = message.message.msg.view().number.0 as usize; // Here we assume that all instances start from view 0 in the tests. // If the view is higher than what we have planned for, assume no partitions. // Ever node is guaranteed to be present in only one partition. - let partitions_opt = splits.get(view_number.0 as usize); + let partitions_opt = splits.get(view_number); let msg = || { io::OutputMessage::Consensus(io::ConsensusReq { @@ -250,10 +250,14 @@ async fn run_nodes_twins( match message.recipient { io::Target::Broadcast => match partitions_opt { - None => sends.values().for_each(|s| s.send(msg())), + None => { + eprintln!("broadcasting view={view_number} from={port} targets=all"); + sends.values().for_each(|s| s.send(msg())) + }, Some(ps) => { for p in ps { if p.contains(&port) { + eprintln!("broadcasting view={view_number} from={port} targets={:?}", p); for target_port in p { sends[target_port].send(msg()); } @@ -268,15 +272,19 @@ async fn run_nodes_twins( match partitions_opt { None => { for target_port in target_ports { + eprintln!("sending view={view_number} from={port} to={target_port}"); sends[target_port].send(msg()); } } Some(ps) => { for p in ps { if p.contains(&port) { - for target_port in target_ports { + for target_port in target_ports { if p.contains(target_port) { + eprintln!("sending view={view_number} from={port} to={target_port}"); sends[target_port].send(msg()) + } else { + eprintln!("cannot send view={view_number} from={port} to={target_port}"); } } break; diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 9594fde8..41642d5d 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use crate::testonly::{ twins::{Cluster, HasKey, ScenarioGenerator, Twin}, ut_harness::UTHarness, - Behavior, Network, Test, + Behavior, Network, PortSplitSchedule, Test, }; use rand::Rng; use zksync_concurrency::{ @@ -245,8 +245,8 @@ async fn run_twins( num_scenarios: usize, ) -> anyhow::Result<()> { zksync_concurrency::testonly::abort_on_panic(); - // Give 30 seconds for all scenarios to finish. - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + // Use a single timeout for all scenarios to finish. + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); #[derive(PartialEq)] struct Replica { @@ -336,18 +336,35 @@ async fn run_twins( let setup = Setup::from(spec.clone()); // Create a network with the partition schedule of the scenario. - let splits = scenario + let splits: PortSplitSchedule = scenario .rounds - .into_iter() + .iter() .map(|rc| { rc.partitions - .into_iter() - .map(|p| p.into_iter().map(|r| node_to_port[&r.id]).collect()) + .iter() + .map(|p| p.iter().map(|r| node_to_port[&r.id]).collect()) .collect() }) .collect(); - eprintln!("splits = {:?}", splits); + for (r, rc) in scenario.rounds.iter().enumerate() { + let leader_id = cluster + .nodes() + .iter() + .find(|n| n.public_key == *rc.leader) + .unwrap() + .id; + let leader_port = node_to_port[&leader_id]; + let partitions = &splits[r]; + let leader_partition_size = partitions + .iter() + .find(|p| p.contains(&leader_port)) + .unwrap() + .len(); + let leader_isolated = leader_partition_size < cluster.quorum_size(); + + eprintln!("round={r} partitions={partitions:?} leader={leader_port} leader_partition_size={leader_partition_size} leader_isolated={leader_isolated}"); + } Test { network: Network::Twins(splits), From 9e6fd727a3004948a0de27bcb51bdfe80416f456 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 29 May 2024 15:34:41 +0100 Subject: [PATCH 12/25] BFT-465: Stash messages and unstash when partition lifted --- node/actors/bft/src/testonly/run.rs | 102 ++++++++++++++++++++-------- node/actors/bft/src/tests.rs | 2 +- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index a993cf70..c2d6d3ff 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,5 +1,6 @@ use super::{Behavior, Node}; -use network::Config; +use network::{io, Config}; +use rand::prelude::SliceRandom; use std::collections::{HashMap, HashSet}; use tracing::Instrument as _; use zksync_concurrency::{ctx, oneshot, scope}; @@ -194,12 +195,6 @@ async fn run_nodes_twins( // so that we can send to the one which is in the same partition as the sender. let mut sends = HashMap::new(); - // TODO: Buffer messages that aren't delivered in partitions until the end - // of the test and deliver them then. Need to define when it's time to deliver - // all buffered messages: if we allow all partitions to be less than required - // for a quorum, they will keep timing out in the same view; one indication is - // too many messages received in the same view, e.g. more than 2x the number of nodes. - for (i, spec) in specs.iter().enumerate() { let (actor_pipe, dispatcher_pipe) = pipe::new(); let validator_key = spec.net.validator_key.as_ref().unwrap().public(); @@ -234,11 +229,54 @@ async fn run_nodes_twins( for (port, mut recv) in recvs { s.spawn(async move { use zksync_consensus_network::io; + let rng = &mut ctx.rng(); + + // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. + // The spec says that the network is expected to deliver messages eventually, potentially out of order, + // caveated by the fact that the actual implementation only keep retrying the last message. + // However with the actors instantiated in by this test, that would not be sufficient because + // there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip, + // and will forever be unable to finalize blocks. + // A separate issue is the definition of "later", without actually adding timing assumptions: + // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas + // don't move on from a view until they reach quorum, then "later" could be defined by so many + // messages in a view that it indicates a timeout loop. NB the consensus might not actually have + // an actual timeout loop and might rely on the network trying to re-broadcast forever. + // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that + // can move on to the next view, in which a new partition configuration will allow them to broadcast + // to previously isolated peers, which will indicate that the buffered messages can be sent as the + // partition has been "healed". + let mut stashes: HashMap> = HashMap::new(); + + // Either stash a message, or unstash all messages and send them to the target along with the new one. + let mut send_or_stash = + |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let stash = stashes.entry(target_port).or_default(); + let view = output_msg_view_number(&msg); + + if can_send { + let s = &sends[&target_port]; + + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); + + for unstashed in stash.drain(0..) { + eprintln!("^-> unstashed view={view} unstashed-view={} from={port} to={target_port}", output_msg_view_number(&unstashed)); + s.send(unstashed); + } + eprintln!("--> sending view={view} from={port} to={target_port}"); + s.send(msg); + } else { + eprintln!("--V stashed view={view} from={port} to={target_port}"); + stash.push(msg) + } + }; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { let view_number = message.message.msg.view().number.0 as usize; // Here we assume that all instances start from view 0 in the tests. // If the view is higher than what we have planned for, assume no partitions. - // Ever node is guaranteed to be present in only one partition. + // Every node is guaranteed to be present in only one partition. let partitions_opt = splits.get(view_number); let msg = || { @@ -251,17 +289,17 @@ async fn run_nodes_twins( match message.recipient { io::Target::Broadcast => match partitions_opt { None => { - eprintln!("broadcasting view={view_number} from={port} targets=all"); - sends.values().for_each(|s| s.send(msg())) - }, + eprintln!("broadcasting view={view_number} from={port} target=all"); + for target_port in sends.keys() { + send_or_stash(true, *target_port, msg()); + } + } Some(ps) => { - for p in ps { - if p.contains(&port) { - eprintln!("broadcasting view={view_number} from={port} targets={:?}", p); - for target_port in p { - sends[target_port].send(msg()); - } - break; + for p in ps { + let can_send = p.contains(&port); + eprintln!("broadcasting view={view_number} from={port} target={:?} can_send={can_send}", p); + for target_port in p { + send_or_stash(can_send, *target_port, msg()); } } } @@ -272,22 +310,22 @@ async fn run_nodes_twins( match partitions_opt { None => { for target_port in target_ports { - eprintln!("sending view={view_number} from={port} to={target_port}"); - sends[target_port].send(msg()); + eprintln!("unicasting view={view_number} from={port} target={target_port}"); + send_or_stash(true, *target_port, msg()); } } Some(ps) => { for p in ps { - if p.contains(&port) { - for target_port in target_ports { - if p.contains(target_port) { - eprintln!("sending view={view_number} from={port} to={target_port}"); - sends[target_port].send(msg()) - } else { - eprintln!("cannot send view={view_number} from={port} to={target_port}"); - } + let can_send = p.contains(&port); + for target_port in target_ports { + if p.contains(target_port) { + eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send}"); + send_or_stash( + can_send, + *target_port, + msg(), + ); } - break; } } } @@ -304,3 +342,9 @@ async fn run_nodes_twins( }) .await } + +fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { + match msg { + io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0, + } +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 41642d5d..1db34f79 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -246,7 +246,7 @@ async fn run_twins( ) -> anyhow::Result<()> { zksync_concurrency::testonly::abort_on_panic(); // Use a single timeout for all scenarios to finish. - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); #[derive(PartialEq)] struct Replica { From e558c31c0c33b40f41a0a7335d3a1c8a365a3506 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 29 May 2024 16:56:05 +0100 Subject: [PATCH 13/25] BFT-465: Comments about why the unstashing doesn't work with HotStuff --- node/actors/bft/src/testonly/run.rs | 32 ++++++++++++++++++++++------- node/actors/bft/src/tests.rs | 2 +- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index c2d6d3ff..388489c1 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -220,6 +220,7 @@ async fn run_nodes_twins( // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. let sends = &sends; let validator_ports = &validator_ports; + let start = &ctx.now(); // Run networks by receiving from all consensus instances: // * identify the view they are in from the message // * identify the partition they are in based on their network id @@ -244,8 +245,16 @@ async fn run_nodes_twins( // an actual timeout loop and might rely on the network trying to re-broadcast forever. // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that // can move on to the next view, in which a new partition configuration will allow them to broadcast - // to previously isolated peers, which will indicate that the buffered messages can be sent as the - // partition has been "healed". + // to previously isolated peers. + // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer + // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff + // out of the box, because replicas only communicate with their leader, so if for example B missed + // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because + // they can't commit the earlier block until they get a new message from A. However since + // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, + // so we shouldn't have to wait long for A to unstash its messages to B. + // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages + // and unstash them as soon as someone moves on to a new view. let mut stashes: HashMap> = HashMap::new(); // Either stash a message, or unstash all messages and send them to the target along with the new one. @@ -253,6 +262,7 @@ async fn run_nodes_twins( |can_send: bool, target_port: Port, msg: io::OutputMessage| { let stash = stashes.entry(target_port).or_default(); let view = output_msg_view_number(&msg); + let kind = output_msg_label(&msg); if can_send { let s = &sends[&target_port]; @@ -261,13 +271,15 @@ async fn run_nodes_twins( stash.shuffle(rng); for unstashed in stash.drain(0..) { - eprintln!("^-> unstashed view={view} unstashed-view={} from={port} to={target_port}", output_msg_view_number(&unstashed)); + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); s.send(unstashed); } - eprintln!("--> sending view={view} from={port} to={target_port}"); + eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); s.send(msg); } else { - eprintln!("--V stashed view={view} from={port} to={target_port}"); + eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); stash.push(msg) } }; @@ -297,7 +309,7 @@ async fn run_nodes_twins( Some(ps) => { for p in ps { let can_send = p.contains(&port); - eprintln!("broadcasting view={view_number} from={port} target={:?} can_send={can_send}", p); + eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64()); for target_port in p { send_or_stash(can_send, *target_port, msg()); } @@ -319,7 +331,7 @@ async fn run_nodes_twins( let can_send = p.contains(&port); for target_port in target_ports { if p.contains(target_port) { - eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send}"); + eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64()); send_or_stash( can_send, *target_port, @@ -348,3 +360,9 @@ fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0, } } + +fn output_msg_label(msg: &io::OutputMessage) -> &str { + match msg { + io::OutputMessage::Consensus(cr) => cr.msg.msg.label() + } +} \ No newline at end of file diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 1db34f79..b2349e92 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -232,7 +232,7 @@ async fn twins_network_wo_twins_wo_partitions() { /// is resilient to temporary network partitions. #[tokio::test(flavor = "multi_thread")] async fn twins_network_wo_twins_w_partitions() { - let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + let ctx = &ctx::test_root(&ctx::AffineClock::new(5.0)); // TODO: At the moment this test doesn't work with partitions, so just try to do a single scenario to debug. run_twins(ctx, 6, false, 1).await.unwrap(); } From 8709620acf636941e48b355ae598ad4f50163467 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 10:20:25 +0100 Subject: [PATCH 14/25] BFT-465: Helpful comments in gossip code --- node/actors/network/src/gossip/fetch.rs | 2 +- node/actors/network/src/gossip/mod.rs | 2 ++ node/actors/network/src/gossip/runner.rs | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/node/actors/network/src/gossip/fetch.rs b/node/actors/network/src/gossip/fetch.rs index 115852bd..e9ec37fc 100644 --- a/node/actors/network/src/gossip/fetch.rs +++ b/node/actors/network/src/gossip/fetch.rs @@ -64,7 +64,7 @@ impl Queue { ) -> ctx::OrCanceled { let sub = &mut self.0.subscribe(); while ctx.is_active() { - // Wait for the lowest requested block to be available. + // Wait for the lowest requested block to be available on the remote peer. // This scope is always cancelled, so we ignore the result. let mut block_number = None; let _: Result<(), _> = scope::run!(ctx, |ctx, s| async { diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 7091c643..7021ff28 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -49,6 +49,8 @@ pub(crate) struct Network { /// Output pipe of the network actor. pub(crate) sender: channel::UnboundedSender, /// Queue of block fetching requests. + /// + /// These are blocks that this node wants to request from remote peers via RPC. pub(crate) fetch_queue: fetch::Queue, /// Last viewed QC. pub(crate) last_viewed_qc: Option, diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index af727905..bc7eace0 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -204,6 +204,7 @@ impl Network { // Perform get_block calls to peer. s.spawn::<()>(async { + // Gossiped state of what range of blocks is available on the remote peer. let state = &mut push_block_store_state_server.state.subscribe(); loop { let call = get_block_client.reserve(ctx).await?; From 994656706546c7cb92763749722098588658be50 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 10:31:53 +0100 Subject: [PATCH 15/25] BFT-465: Separate twins receive loop function --- node/actors/bft/src/testonly/run.rs | 253 +++++++++++++++------------- 1 file changed, 137 insertions(+), 116 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 388489c1..88e95116 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -3,9 +3,15 @@ use network::{io, Config}; use rand::prelude::SliceRandom; use std::collections::{HashMap, HashSet}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, oneshot, scope}; +use zksync_concurrency::{ + ctx::{ + self, + channel::{UnboundedReceiver, UnboundedSender}, + }, + oneshot, scope, +}; use zksync_consensus_network as network; -use zksync_consensus_roles::validator::{self, Genesis}; +use zksync_consensus_roles::validator::{self, Genesis, PublicKey}; use zksync_consensus_storage::testonly::new_store; use zksync_consensus_utils::pipe; @@ -218,141 +224,156 @@ async fn run_nodes_twins( } // Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. + // Potentially `ctx::NoCopy` could be used with `port`. let sends = &sends; let validator_ports = &validator_ports; - let start = &ctx.now(); + // Run networks by receiving from all consensus instances: // * identify the view they are in from the message // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network // identity of the target validator and send to it _iff_ they are in the same partition scope::run!(ctx, |ctx, s| async move { - for (port, mut recv) in recvs { + for (port, recv) in recvs { s.spawn(async move { - use zksync_consensus_network::io; - let rng = &mut ctx.rng(); + twins_receive_loop(ctx, port, recv, sends, validator_ports, splits).await + }); + } + anyhow::Ok(()) + }) + .await + }) + .await +} - // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. - // The spec says that the network is expected to deliver messages eventually, potentially out of order, - // caveated by the fact that the actual implementation only keep retrying the last message. - // However with the actors instantiated in by this test, that would not be sufficient because - // there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip, - // and will forever be unable to finalize blocks. - // A separate issue is the definition of "later", without actually adding timing assumptions: - // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas - // don't move on from a view until they reach quorum, then "later" could be defined by so many - // messages in a view that it indicates a timeout loop. NB the consensus might not actually have - // an actual timeout loop and might rely on the network trying to re-broadcast forever. - // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that - // can move on to the next view, in which a new partition configuration will allow them to broadcast - // to previously isolated peers. - // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer - // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff - // out of the box, because replicas only communicate with their leader, so if for example B missed - // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because - // they can't commit the earlier block until they get a new message from A. However since - // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, - // so we shouldn't have to wait long for A to unstash its messages to B. - // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages - // and unstash them as soon as someone moves on to a new view. - let mut stashes: HashMap> = HashMap::new(); +/// Receive input messages from the consensus actor and send them to the others +/// according to the partition schedule of the port associated with this instance. +async fn twins_receive_loop( + ctx: &ctx::Ctx, + port: Port, + mut recv: UnboundedReceiver, + sends: &HashMap>, + validator_ports: &HashMap>, + splits: &PortSplitSchedule, +) -> anyhow::Result<()> { + let rng = &mut ctx.rng(); + let start = &ctx.now(); - // Either stash a message, or unstash all messages and send them to the target along with the new one. - let mut send_or_stash = - |can_send: bool, target_port: Port, msg: io::OutputMessage| { - let stash = stashes.entry(target_port).or_default(); - let view = output_msg_view_number(&msg); - let kind = output_msg_label(&msg); + // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. + // The spec says that the network is expected to deliver messages eventually, potentially out of order, + // caveated by the fact that the actual implementation only keep retrying the last message. + // However with the actors instantiated in by this test, that would not be sufficient because + // there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip, + // and will forever be unable to finalize blocks. + // A separate issue is the definition of "later", without actually adding timing assumptions: + // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas + // don't move on from a view until they reach quorum, then "later" could be defined by so many + // messages in a view that it indicates a timeout loop. NB the consensus might not actually have + // an actual timeout loop and might rely on the network trying to re-broadcast forever. + // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that + // can move on to the next view, in which a new partition configuration will allow them to broadcast + // to previously isolated peers. + // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer + // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff + // out of the box, because replicas only communicate with their leader, so if for example B missed + // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because + // they can't commit the earlier block until they get a new message from A. However since + // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, + // so we shouldn't have to wait long for A to unstash its messages to B. + // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages + // and unstash them as soon as someone moves on to a new view. + let mut stashes: HashMap> = HashMap::new(); - if can_send { - let s = &sends[&target_port]; + // Either stash a message, or unstash all messages and send them to the target along with the new one. + let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let stash = stashes.entry(target_port).or_default(); + let view = output_msg_view_number(&msg); + let kind = output_msg_label(&msg); - // Messages can be delivered in arbitrary order. - stash.shuffle(rng); + if can_send { + let s = &sends[&target_port]; - for unstashed in stash.drain(0..) { - let view = output_msg_view_number(&unstashed); - let kind = output_msg_label(&unstashed); - eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); - s.send(unstashed); - } - eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); - s.send(msg); - } else { - eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); - stash.push(msg) - } - }; + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); - while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { - let view_number = message.message.msg.view().number.0 as usize; - // Here we assume that all instances start from view 0 in the tests. - // If the view is higher than what we have planned for, assume no partitions. - // Every node is guaranteed to be present in only one partition. - let partitions_opt = splits.get(view_number); + for unstashed in stash.drain(0..) { + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); + s.send(unstashed); + } + eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); + s.send(msg); + // TODO: If the message is a LeaderPrepare then remember the block sent, + // then if the next message is a LeaderCommit with a CommitQC then + // prepare a FinalBlock and pretend that we have a gossip layer by + // by calling block_store.queue_block on every other node. + } else { + eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); + stash.push(msg) + } + }; - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) - }; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let view_number = message.message.msg.view().number.0 as usize; + // Here we assume that all instances start from view 0 in the tests. + // If the view is higher than what we have planned for, assume no partitions. + // Every node is guaranteed to be present in only one partition. + let partitions_opt = splits.get(view_number); - match message.recipient { - io::Target::Broadcast => match partitions_opt { - None => { - eprintln!("broadcasting view={view_number} from={port} target=all"); - for target_port in sends.keys() { - send_or_stash(true, *target_port, msg()); - } - } - Some(ps) => { - for p in ps { - let can_send = p.contains(&port); - eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64()); - for target_port in p { - send_or_stash(can_send, *target_port, msg()); - } - } - } - }, - io::Target::Validator(v) => { - let target_ports = &validator_ports[&v]; + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + + match message.recipient { + io::Target::Broadcast => match partitions_opt { + None => { + eprintln!("broadcasting view={view_number} from={port} target=all"); + for target_port in sends.keys() { + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64()); + for target_port in p { + send_or_stash(can_send, *target_port, msg()); + } + } + } + }, + io::Target::Validator(v) => { + let target_ports = &validator_ports[&v]; - match partitions_opt { - None => { - for target_port in target_ports { - eprintln!("unicasting view={view_number} from={port} target={target_port}"); - send_or_stash(true, *target_port, msg()); - } - } - Some(ps) => { - for p in ps { - let can_send = p.contains(&port); - for target_port in target_ports { - if p.contains(target_port) { - eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64()); - send_or_stash( - can_send, - *target_port, - msg(), - ); - } - } - } - } + match partitions_opt { + None => { + for target_port in target_ports { + eprintln!( + "unicasting view={view_number} from={port} target={target_port}" + ); + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + for target_port in target_ports { + if p.contains(target_port) { + eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64()); + send_or_stash(can_send, *target_port, msg()); } } } } - Ok(()) - }); + } } - anyhow::Ok(()) - }) - .await - }) - .await + } + } + Ok(()) } fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { @@ -363,6 +384,6 @@ fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { fn output_msg_label(msg: &io::OutputMessage) -> &str { match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.label() + io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), } -} \ No newline at end of file +} From 9d47255dc257f381b571675f1b38bda7698e346e Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 13:13:50 +0100 Subject: [PATCH 16/25] BFT-465: Twins gossip loop --- node/actors/bft/src/testonly/run.rs | 131 ++++++++++++++++++++++++---- node/actors/bft/src/tests.rs | 6 +- 2 files changed, 118 insertions(+), 19 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 88e95116..e60e025f 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,18 +1,23 @@ use super::{Behavior, Node}; use network::{io, Config}; use rand::prelude::SliceRandom; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::Instrument as _; use zksync_concurrency::{ ctx::{ self, - channel::{UnboundedReceiver, UnboundedSender}, + channel::{self, UnboundedReceiver, UnboundedSender}, }, oneshot, scope, }; use zksync_consensus_network as network; -use zksync_consensus_roles::validator::{self, Genesis, PublicKey}; -use zksync_consensus_storage::testonly::new_store; +use zksync_consensus_roles::validator::{ + self, BlockNumber, CommitQC, ConsensusMsg, Genesis, PublicKey, +}; +use zksync_consensus_storage::{testonly::new_store, BlockStore}; use zksync_consensus_utils::pipe; #[derive(Clone)] @@ -200,6 +205,10 @@ async fn run_nodes_twins( // Inbox of the consensus instances, indexed by their network identity, // so that we can send to the one which is in the same partition as the sender. let mut sends = HashMap::new(); + // Blockstores of nodes, indexed by port; used to simulate the effect of gossip. + let mut stores = HashMap::new(); + // Outbound gossip relationships to simulate the effects of block fetching. + let mut gossip_targets = HashMap::new(); for (i, spec) in specs.iter().enumerate() { let (actor_pipe, dispatcher_pipe) = pipe::new(); @@ -210,6 +219,19 @@ async fn run_nodes_twins( sends.insert(port, actor_pipe.send); recvs.push((port, actor_pipe.recv)); + stores.insert(port, spec.block_store.clone()); + gossip_targets.insert( + port, + spec.net + .gossip + .static_outbound + .values() + .map(|host| { + let addr: std::net::SocketAddr = host.0.parse().expect("valid address"); + addr.port() + }) + .collect(), + ); // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: // * send Output messages from other actors to this consensus instance @@ -225,20 +247,36 @@ async fn run_nodes_twins( // Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. // Potentially `ctx::NoCopy` could be used with `port`. - let sends = &sends; let validator_ports = &validator_ports; + let sends = &sends; + let stores = &stores; + let gossip_targets = &gossip_targets; + let (gossip_send, gossip_recv) = channel::unbounded(); // Run networks by receiving from all consensus instances: // * identify the view they are in from the message // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network // identity of the target validator and send to it _iff_ they are in the same partition + // * simulating the gossiping of finalized blockss scope::run!(ctx, |ctx, s| async move { for (port, recv) in recvs { + let gossip_send = gossip_send.clone(); s.spawn(async move { - twins_receive_loop(ctx, port, recv, sends, validator_ports, splits).await + twins_receive_loop( + ctx, + splits, + validator_ports, + sends, + &gossip_targets[&port], + gossip_send, + port, + recv, + ) + .await }); } + s.spawn(async { twins_gossip_loop(ctx, stores, gossip_recv).await }); anyhow::Ok(()) }) .await @@ -248,17 +286,32 @@ async fn run_nodes_twins( /// Receive input messages from the consensus actor and send them to the others /// according to the partition schedule of the port associated with this instance. +/// +/// We have to simulate the gossip layer which isn't instantiated by these tests. +/// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload +/// and won't be able to finalize the block, and won't participate further in the consensus. +#[allow(clippy::too_many_arguments)] async fn twins_receive_loop( ctx: &ctx::Ctx, + splits: &PortSplitSchedule, + validator_ports: &HashMap>, + sends: &HashMap>, + gossip_targets: &HashSet, + gossip_send: UnboundedSender<(Port, Port, BlockNumber)>, port: Port, mut recv: UnboundedReceiver, - sends: &HashMap>, - validator_ports: &HashMap>, - splits: &PortSplitSchedule, ) -> anyhow::Result<()> { let rng = &mut ctx.rng(); let start = &ctx.now(); + // Finalized block number iff this node can gossip to the target and the message contains a QC. + let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { + if !gossip_targets.contains(&target_port) { + return None; + } + output_msg_commit_qc(msg).map(|qc| qc.header().number) + }; + // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. // The spec says that the network is expected to deliver messages eventually, potentially out of order, // caveated by the fact that the actual implementation only keep retrying the last message. @@ -293,6 +346,14 @@ async fn twins_receive_loop( if can_send { let s = &sends[&target_port]; + // Send after taking note of potentially gossipable blocks. + let send = |msg| { + if let Some(bn) = block_to_gossip(target_port, &msg) { + gossip_send.send((port, target_port, bn)); + } + s.send(msg); + }; + // Messages can be delivered in arbitrary order. stash.shuffle(rng); @@ -300,17 +361,13 @@ async fn twins_receive_loop( let view = output_msg_view_number(&unstashed); let kind = output_msg_label(&unstashed); eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); - s.send(unstashed); + send(unstashed); } eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); - s.send(msg); - // TODO: If the message is a LeaderPrepare then remember the block sent, - // then if the next message is a LeaderCommit with a CommitQC then - // prepare a FinalBlock and pretend that we have a gossip layer by - // by calling block_store.queue_block on every other node. + send(msg); } else { eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); - stash.push(msg) + stash.push(msg); } }; @@ -376,6 +433,37 @@ async fn twins_receive_loop( Ok(()) } +/// Simulate the effects of gossip: if the source node can gossip to the target, +/// and the message being sent contains a CommitQC, and the sender has the +/// referenced finalized block in its store, then assume that the target +/// could fetch this block if they wanted, and insert it directly into their store. +/// +/// This happens concurrently with the actual message passing, so it's just an +/// approximation. It also happens out of order. This method only contains the +/// send loop, to deal with the spawning of store operations. +async fn twins_gossip_loop( + ctx: &ctx::Ctx, + stores: &HashMap>, + mut recv: UnboundedReceiver<(Port, Port, BlockNumber)>, +) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async move { + while let Ok((from, to, number)) = recv.recv(ctx).await { + // Perform the storage operations asynchronously because `queue_block` will + // wait for all dependencies to be inserted first. + s.spawn_bg(async move { + let local_store = &stores[&from]; + let remote_store = &stores[&to]; + if let Ok(Some(block)) = local_store.block(ctx, number).await { + let _ = remote_store.queue_block(ctx, block).await; + } + Ok(()) + }); + } + Ok(()) + }) + .await +} + fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { match msg { io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0, @@ -387,3 +475,14 @@ fn output_msg_label(msg: &io::OutputMessage) -> &str { io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), } } + +fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> { + match msg { + io::OutputMessage::Consensus(cr) => match &cr.msg.msg { + ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(), + ConsensusMsg::LeaderPrepare(lp) => lp.justification.high_qc(), + ConsensusMsg::ReplicaCommit(_) => None, + ConsensusMsg::LeaderCommit(lc) => Some(&lc.justification), + }, + } +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index b2349e92..d02a8c7b 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -232,9 +232,9 @@ async fn twins_network_wo_twins_wo_partitions() { /// is resilient to temporary network partitions. #[tokio::test(flavor = "multi_thread")] async fn twins_network_wo_twins_w_partitions() { - let ctx = &ctx::test_root(&ctx::AffineClock::new(5.0)); - // TODO: At the moment this test doesn't work with partitions, so just try to do a single scenario to debug. - run_twins(ctx, 6, false, 1).await.unwrap(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible. + run_twins(ctx, 6, false, 10).await.unwrap(); } /// Create network configuration for a given number of replicas with a random number of twins and run [Test]. From 474a305bbacc45aae8c90486dd3a0a7c07487200 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 14:04:21 +0100 Subject: [PATCH 17/25] BFT-465: Fix replica ID --- node/actors/bft/src/tests.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index d02a8c7b..3eefe335 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -237,6 +237,17 @@ async fn twins_network_wo_twins_w_partitions() { run_twins(ctx, 6, false, 10).await.unwrap(); } +/// Run Twins scenarios with random number of nodes and twins. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_w_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(5.0)); + // n>=6 implies f>=1 and q=n-f + // let rng = &mut ctx.rng(); + // let num_replicas = rng.gen_range(6..=11); + let num_replicas = 6; // debug with minimum number of nodes + run_twins(ctx, num_replicas, true, 1).await.unwrap(); +} + /// Create network configuration for a given number of replicas with a random number of twins and run [Test]. async fn run_twins( ctx: &Ctx, @@ -246,11 +257,11 @@ async fn run_twins( ) -> anyhow::Result<()> { zksync_concurrency::testonly::abort_on_panic(); // Use a single timeout for all scenarios to finish. - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); - #[derive(PartialEq)] + #[derive(PartialEq, Debug)] struct Replica { - id: i64, + id: i64, // non-zero ID public_key: PublicKey, secret_key: SecretKey, } @@ -302,7 +313,7 @@ async fn run_twins( .iter() .enumerate() .map(|(i, (sk, _))| Replica { - id: i as i64, + id: i as i64 + 1, public_key: sk.public(), secret_key: sk.clone(), }) @@ -311,8 +322,14 @@ async fn run_twins( let cluster = Cluster::new(replicas, num_twins); let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions); + eprintln!( + "num_replicas={num_replicas} num_twins={num_twins} num_nodes={}", + cluster.num_nodes() + ); + // Create network config for all nodes in the cluster (assigns unique network addresses). let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1); + let node_to_port = cluster .nodes() .iter() @@ -320,6 +337,8 @@ async fn run_twins( .map(|(node, net)| (node.id, net.server_addr.port())) .collect::>(); + assert_eq!(node_to_port.len(), cluster.num_nodes()); + // Every network needs a behaviour. They are all honest, just some might be duplicated. let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; From 78b81f2b4ae9f70905f37000373a3c2adc35f697 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 14:40:23 +0100 Subject: [PATCH 18/25] BFT-465: Try with the maximum cluster size with 11 nodes and 2 twins --- node/actors/bft/src/tests.rs | 39 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 3eefe335..e3b6cb0c 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -5,7 +5,6 @@ use crate::testonly::{ ut_harness::UTHarness, Behavior, Network, PortSplitSchedule, Test, }; -use rand::Rng; use zksync_concurrency::{ ctx::{self, Ctx}, scope, time, @@ -221,7 +220,7 @@ async fn non_proposing_leader() { async fn twins_network_wo_twins_wo_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); // n<6 implies f=0 and q=n - run_twins(ctx, 5, false, 10).await.unwrap(); + run_twins(ctx, 5, 0, 10).await.unwrap(); } /// Run Twins scenarios without actual twins, but enough replicas that partitions @@ -234,30 +233,38 @@ async fn twins_network_wo_twins_wo_partitions() { async fn twins_network_wo_twins_w_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); // n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible. - run_twins(ctx, 6, false, 10).await.unwrap(); + run_twins(ctx, 6, 0, 10).await.unwrap(); } /// Run Twins scenarios with random number of nodes and twins. #[tokio::test(flavor = "multi_thread")] async fn twins_network_w_twins_w_partitions() { - let ctx = &ctx::test_root(&ctx::AffineClock::new(5.0)); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); // n>=6 implies f>=1 and q=n-f - // let rng = &mut ctx.rng(); - // let num_replicas = rng.gen_range(6..=11); - let num_replicas = 6; // debug with minimum number of nodes - run_twins(ctx, num_replicas, true, 1).await.unwrap(); + // for _ in 0..5 { + // let rng = &mut ctx.rng(); + // let num_replicas = rng.gen_range(6..=11); + // let num_honest = validator::threshold(num_replicas as u64) as usize; + // let max_faulty = num_replicas - num_honest; + // let num_twins = rng.gen_range(1..=max_faulty); + // run_twins(ctx, num_replicas, num_twins, 1).await.unwrap(); + // } + + // Try the maximum + run_twins(ctx, 11, 2, 1).await.unwrap(); } -/// Create network configuration for a given number of replicas with a random number of twins and run [Test]. +/// Create network configuration for a given number of replicas and twins and run [Test]. async fn run_twins( ctx: &Ctx, num_replicas: usize, - use_twins: bool, + num_twins: usize, num_scenarios: usize, ) -> anyhow::Result<()> { zksync_concurrency::testonly::abort_on_panic(); // Use a single timeout for all scenarios to finish. - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); + // A single scenario with 11 replicas took 3-5 seconds. + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); #[derive(PartialEq, Debug)] struct Replica { @@ -287,7 +294,7 @@ async fn run_twins( let rng = &mut ctx.rng(); // The existing test machinery uses the number of finalized blocks as an exit criteria. - let blocks_to_finalize = 5; + let blocks_to_finalize = 3; // The test is going to disrupt the communication by partitioning nodes, // where the leader might not be in a partition with enough replicas to // form a quorum, therefore to allow N blocks to be finalized we need to @@ -296,14 +303,6 @@ async fn run_twins( // The paper considers 2 or 3 partitions enough. let max_partitions = 3; - let num_honest = validator::threshold(num_replicas as u64) as usize; - let max_faulty = num_replicas - num_honest; - let num_twins = if use_twins && max_faulty > 0 { - rng.gen_range(1..=max_faulty) - } else { - 0 - }; - // Every validator has equal power of 1. const WEIGHT: u64 = 1; let mut spec = SetupSpec::new_with_weights(rng, vec![WEIGHT; num_replicas]); From 683cc7e83312fa280bf57ee0fe65b12587ed3deb Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 15:12:00 +0100 Subject: [PATCH 19/25] BFT-465: Quit as soon as we run out of rounds --- node/actors/bft/src/testonly/run.rs | 7 +++++++ node/actors/bft/src/tests.rs | 32 ++++++++++++++++------------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index e60e025f..53b32dc9 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,4 +1,5 @@ use super::{Behavior, Node}; +use anyhow::bail; use network::{io, Config}; use rand::prelude::SliceRandom; use std::{ @@ -378,6 +379,12 @@ async fn twins_receive_loop( // Every node is guaranteed to be present in only one partition. let partitions_opt = splits.get(view_number); + if partitions_opt.is_none() { + bail!( + "ran out of scheduled rounds; most likely cannot finalize blocks even if we go on" + ); + } + let msg = || { io::OutputMessage::Consensus(io::ConsensusReq { msg: message.message.clone(), diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index e3b6cb0c..56577a30 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -233,24 +233,28 @@ async fn twins_network_wo_twins_wo_partitions() { async fn twins_network_wo_twins_w_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); // n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible. - run_twins(ctx, 6, 0, 10).await.unwrap(); + run_twins(ctx, 6, 0, 5).await.unwrap(); } -/// Run Twins scenarios with random number of nodes and twins. +/// Run Twins scenarios with random number of nodes and 1 twin. #[tokio::test(flavor = "multi_thread")] -async fn twins_network_w_twins_w_partitions() { +async fn twins_network_w1_twins_w_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); // n>=6 implies f>=1 and q=n-f - // for _ in 0..5 { - // let rng = &mut ctx.rng(); - // let num_replicas = rng.gen_range(6..=11); - // let num_honest = validator::threshold(num_replicas as u64) as usize; - // let max_faulty = num_replicas - num_honest; - // let num_twins = rng.gen_range(1..=max_faulty); - // run_twins(ctx, num_replicas, num_twins, 1).await.unwrap(); - // } - - // Try the maximum + for num_replicas in 6..=10 { + // let num_honest = validator::threshold(num_replicas as u64) as usize; + // let max_faulty = num_replicas - num_honest; + // let num_twins = rng.gen_range(1..=max_faulty); + run_twins(ctx, num_replicas, 1, 3).await.unwrap(); + } +} + +/// Run Twins scenarios with higher number of nodes and 2 twins. +#[tokio::test(flavor = "multi_thread")] +async fn twins_network_w2_twins_w_partitions() { + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + // n>=11 implies f>=2 and q=n-f + // TODO: This fails for now. run_twins(ctx, 11, 2, 1).await.unwrap(); } @@ -299,7 +303,7 @@ async fn run_twins( // where the leader might not be in a partition with enough replicas to // form a quorum, therefore to allow N blocks to be finalized we need to // go longer. - let num_rounds = blocks_to_finalize * 5; + let num_rounds = blocks_to_finalize * 10; // The paper considers 2 or 3 partitions enough. let max_partitions = 3; From 47b34f9148c133ef4e9e3fc4668a74d5a813b7a2 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 15:17:34 +0100 Subject: [PATCH 20/25] BFT-465: Only compare the payload for equivalence at the end --- node/actors/bft/src/testonly/run.rs | 10 ++++++++-- node/actors/bft/src/tests.rs | 12 ++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 53b32dc9..32c1b836 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -92,9 +92,15 @@ impl Test { // Check that the stored blocks are consistent. for i in 0..self.blocks_to_finalize as u64 { let i = first + i; - let want = honest[0].block(ctx, i).await?; + // Only comparing the payload; the signatories might be different, + // at least with the simulated gossip of the twins network. + let want = honest[0] + .block(ctx, i) + .await? + .expect("checked its existence") + .payload; for store in &honest[1..] { - assert_eq!(want, store.block(ctx, i).await?); + assert_eq!(want, store.block(ctx, i).await?.unwrap().payload); } } Ok(()) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 56577a30..9c77a61d 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -325,11 +325,6 @@ async fn run_twins( let cluster = Cluster::new(replicas, num_twins); let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions); - eprintln!( - "num_replicas={num_replicas} num_twins={num_twins} num_nodes={}", - cluster.num_nodes() - ); - // Create network config for all nodes in the cluster (assigns unique network addresses). let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1); @@ -346,7 +341,7 @@ async fn run_twins( let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()]; // Reuse the same cluster and network setup to run a few scenarios. - for _ in 0..num_scenarios { + for i in 0..num_scenarios { // Generate a permutation of partitions and leaders for the given number of rounds. let scenario = scenarios.generate_one(rng); @@ -369,6 +364,11 @@ async fn run_twins( }) .collect(); + eprintln!( + "num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}", + cluster.num_nodes() + ); + for (r, rc) in scenario.rounds.iter().enumerate() { let leader_id = cluster .nodes() From 23a10e2f79da8e8503704f57485ec24991edfc14 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 15:34:03 +0100 Subject: [PATCH 21/25] BFT-465: Allow more time to finish and run more scenarios until it fails --- node/actors/bft/src/testonly/run.rs | 6 +++--- node/actors/bft/src/tests.rs | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 32c1b836..5c84248c 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -309,7 +309,7 @@ async fn twins_receive_loop( mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { let rng = &mut ctx.rng(); - let start = &ctx.now(); + let start = std::time::Instant::now(); // Just to give an idea of how long time passes between rounds. // Finalized block number iff this node can gossip to the target and the message contains a QC. let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { @@ -409,7 +409,7 @@ async fn twins_receive_loop( Some(ps) => { for p in ps { let can_send = p.contains(&port); - eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64()); + eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); for target_port in p { send_or_stash(can_send, *target_port, msg()); } @@ -433,7 +433,7 @@ async fn twins_receive_loop( let can_send = p.contains(&port); for target_port in target_ports { if p.contains(target_port) { - eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64()); + eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); send_or_stash(can_send, *target_port, msg()); } } diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 9c77a61d..87cecac7 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -245,7 +245,7 @@ async fn twins_network_w1_twins_w_partitions() { // let num_honest = validator::threshold(num_replicas as u64) as usize; // let max_faulty = num_replicas - num_honest; // let num_twins = rng.gen_range(1..=max_faulty); - run_twins(ctx, num_replicas, 1, 3).await.unwrap(); + run_twins(ctx, num_replicas, 1, 5).await.unwrap(); } } @@ -254,8 +254,7 @@ async fn twins_network_w1_twins_w_partitions() { async fn twins_network_w2_twins_w_partitions() { let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); // n>=11 implies f>=2 and q=n-f - // TODO: This fails for now. - run_twins(ctx, 11, 2, 1).await.unwrap(); + run_twins(ctx, 11, 2, 5).await.unwrap(); } /// Create network configuration for a given number of replicas and twins and run [Test]. @@ -268,7 +267,7 @@ async fn run_twins( zksync_concurrency::testonly::abort_on_panic(); // Use a single timeout for all scenarios to finish. // A single scenario with 11 replicas took 3-5 seconds. - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60)); #[derive(PartialEq, Debug)] struct Replica { From c596aa2287d380f1f078e74318cdb42b96823e03 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 17:13:16 +0100 Subject: [PATCH 22/25] BFT-465: Print multiple leader ports in debug --- node/actors/bft/src/tests.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 87cecac7..02d3e04e 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -369,22 +369,25 @@ async fn run_twins( ); for (r, rc) in scenario.rounds.iter().enumerate() { - let leader_id = cluster + let partitions = &splits[r]; + + let leader_ports = cluster .nodes() .iter() - .find(|n| n.public_key == *rc.leader) - .unwrap() - .id; - let leader_port = node_to_port[&leader_id]; - let partitions = &splits[r]; - let leader_partition_size = partitions + .filter(|n| n.public_key == *rc.leader) + .map(|n| node_to_port[&n.id]) + .collect::>(); + + let leader_partition_sizes = leader_ports + .iter() + .map(|lp| partitions.iter().find(|p| p.contains(&lp)).unwrap().len()) + .collect::>(); + + let leader_isolated = leader_partition_sizes .iter() - .find(|p| p.contains(&leader_port)) - .unwrap() - .len(); - let leader_isolated = leader_partition_size < cluster.quorum_size(); + .all(|s| *s < cluster.quorum_size()); - eprintln!("round={r} partitions={partitions:?} leader={leader_port} leader_partition_size={leader_partition_size} leader_isolated={leader_isolated}"); + eprintln!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); } Test { From ea10b1de363e11beb8809819267a58847499ea5f Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 19:03:37 +0100 Subject: [PATCH 23/25] BFT-465: Gossip all predecessors --- node/actors/bft/src/testonly/run.rs | 38 +++++++++++++++++++++-------- node/actors/bft/src/tests.rs | 2 +- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 5c84248c..02976496 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -460,17 +460,35 @@ async fn twins_gossip_loop( mut recv: UnboundedReceiver<(Port, Port, BlockNumber)>, ) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async move { - while let Ok((from, to, number)) = recv.recv(ctx).await { - // Perform the storage operations asynchronously because `queue_block` will - // wait for all dependencies to be inserted first. - s.spawn_bg(async move { - let local_store = &stores[&from]; - let remote_store = &stores[&to]; - if let Ok(Some(block)) = local_store.block(ctx, number).await { - let _ = remote_store.queue_block(ctx, block).await; + while let Ok((from, to, mut number)) = recv.recv(ctx).await { + let local_store = &stores[&from]; + let remote_store = &stores[&to]; + let first_needed = remote_store.queued().next(); + + loop { + // Stop going back if the target already has the block. + if number < first_needed { + break; } - Ok(()) - }); + // Stop if the source doesn't actually have this block to give. + let Ok(Some(block)) = local_store.block(ctx, number).await else { + break; + }; + // Perform the storing operation asynchronously because `queue_block` will + // wait for all dependencies to be inserted first. + s.spawn_bg(async move { + let _ = remote_store.queue_block(ctx, block).await; + Ok(()) + }); + // Be pessimistic and try to insert all ancestors, to minimise the chance that + // for some reason a node doesn't get a finalized block from anyone. + // Without this some scenarios with twins actually fail. + if let Some(prev) = number.prev() { + number = prev; + } else { + break; + }; + } } Ok(()) }) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 02d3e04e..2c09e9d0 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -380,7 +380,7 @@ async fn run_twins( let leader_partition_sizes = leader_ports .iter() - .map(|lp| partitions.iter().find(|p| p.contains(&lp)).unwrap().len()) + .map(|lp| partitions.iter().find(|p| p.contains(lp)).unwrap().len()) .collect::>(); let leader_isolated = leader_partition_sizes From c13bcbfc1763ab2f3ee65f21d0e22fce64b1c2b0 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 30 May 2024 20:59:45 +0100 Subject: [PATCH 24/25] BFT-465: Fix some comments --- node/actors/bft/src/testonly/run.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 02976496..3effb94f 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -251,7 +251,7 @@ async fn run_nodes_twins( .instrument(tracing::info_span!("node", i)), ); } - // Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile + // Taking these references is necessary for the `scope::run!` environment lifetime rules to compile // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. // Potentially `ctx::NoCopy` could be used with `port`. let validator_ports = &validator_ports; @@ -264,8 +264,8 @@ async fn run_nodes_twins( // * identify the view they are in from the message // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network - // identity of the target validator and send to it _iff_ they are in the same partition - // * simulating the gossiping of finalized blockss + // identity of the target validator and send to it iff they are in the same partition + // * simulate the gossiping of finalized blockss scope::run!(ctx, |ctx, s| async move { for (port, recv) in recvs { let gossip_send = gossip_send.clone(); @@ -321,10 +321,7 @@ async fn twins_receive_loop( // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. // The spec says that the network is expected to deliver messages eventually, potentially out of order, - // caveated by the fact that the actual implementation only keep retrying the last message. - // However with the actors instantiated in by this test, that would not be sufficient because - // there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip, - // and will forever be unable to finalize blocks. + // caveated by the fact that the actual implementation only keeps retrying the last message.. // A separate issue is the definition of "later", without actually adding timing assumptions: // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas // don't move on from a view until they reach quorum, then "later" could be defined by so many From 23943505dadf4efaf65fd043b996a666ec0d5273 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 10 Jun 2024 10:27:04 +0100 Subject: [PATCH 25/25] BFT-465: Nits --- node/actors/bft/src/testonly/run.rs | 113 +++++++++++++++++----------- node/actors/bft/src/tests.rs | 4 +- node/actors/network/src/testonly.rs | 7 +- 3 files changed, 73 insertions(+), 51 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 3effb94f..102b28f4 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,7 +1,7 @@ use super::{Behavior, Node}; use anyhow::bail; use network::{io, Config}; -use rand::prelude::SliceRandom; +use rand::seq::SliceRandom; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -15,9 +15,7 @@ use zksync_concurrency::{ oneshot, scope, }; use zksync_consensus_network as network; -use zksync_consensus_roles::validator::{ - self, BlockNumber, CommitQC, ConsensusMsg, Genesis, PublicKey, -}; +use zksync_consensus_roles::validator; use zksync_consensus_storage::{testonly::new_store, BlockStore}; use zksync_consensus_utils::pipe; @@ -61,7 +59,7 @@ impl Test { &self, ctx: &ctx::Ctx, nets: Vec, - genesis: &Genesis, + genesis: &validator::Genesis, ) -> anyhow::Result<()> { let mut nodes = vec![]; let mut honest = vec![]; @@ -92,8 +90,7 @@ impl Test { // Check that the stored blocks are consistent. for i in 0..self.blocks_to_finalize as u64 { let i = first + i; - // Only comparing the payload; the signatories might be different, - // at least with the simulated gossip of the twins network. + // Only comparing the payload; the justification might be different. let want = honest[0] .block(ctx, i) .await? @@ -275,8 +272,10 @@ async fn run_nodes_twins( splits, validator_ports, sends, - &gossip_targets[&port], - gossip_send, + TwinsGossipConfig { + targets: &gossip_targets[&port], + send: gossip_send, + }, port, recv, ) @@ -297,14 +296,12 @@ async fn run_nodes_twins( /// We have to simulate the gossip layer which isn't instantiated by these tests. /// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload /// and won't be able to finalize the block, and won't participate further in the consensus. -#[allow(clippy::too_many_arguments)] async fn twins_receive_loop( ctx: &ctx::Ctx, splits: &PortSplitSchedule, - validator_ports: &HashMap>, + validator_ports: &HashMap>, sends: &HashMap>, - gossip_targets: &HashSet, - gossip_send: UnboundedSender<(Port, Port, BlockNumber)>, + gossip: TwinsGossipConfig<'_>, port: Port, mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { @@ -313,7 +310,7 @@ async fn twins_receive_loop( // Finalized block number iff this node can gossip to the target and the message contains a QC. let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { - if !gossip_targets.contains(&target_port) { + if !gossip.targets.contains(&target_port) { return None; } output_msg_commit_qc(msg).map(|qc| qc.header().number) @@ -347,32 +344,41 @@ async fn twins_receive_loop( let view = output_msg_view_number(&msg); let kind = output_msg_label(&msg); - if can_send { - let s = &sends[&target_port]; + // Remove any previously stashed message of the same kind, because the network will only + // try to send the last one of each, not all pending messages. + stash.retain(|stashed| output_msg_label(stashed) != kind); - // Send after taking note of potentially gossipable blocks. - let send = |msg| { - if let Some(bn) = block_to_gossip(target_port, &msg) { - gossip_send.send((port, target_port, bn)); - } - s.send(msg); - }; + if !can_send { + tracing::info!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); + stash.push(msg); + return; + } - // Messages can be delivered in arbitrary order. - stash.shuffle(rng); + let s = &sends[&target_port]; - for unstashed in stash.drain(0..) { - let view = output_msg_view_number(&unstashed); - let kind = output_msg_label(&unstashed); - eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); - send(unstashed); + // Send after taking note of potentially gossipable blocks. + let send = |msg| { + if let Some(number) = block_to_gossip(target_port, &msg) { + gossip.send.send(TwinsGossipMessage { + from: port, + to: target_port, + number, + }); } - eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); - send(msg); - } else { - eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); - stash.push(msg); + s.send(msg); + }; + + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); + + for unstashed in stash.drain(0..) { + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + tracing::info!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); + send(unstashed); } + tracing::info!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); + send(msg); }; while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { @@ -398,7 +404,7 @@ async fn twins_receive_loop( match message.recipient { io::Target::Broadcast => match partitions_opt { None => { - eprintln!("broadcasting view={view_number} from={port} target=all"); + tracing::info!("broadcasting view={view_number} from={port} target=all"); for target_port in sends.keys() { send_or_stash(true, *target_port, msg()); } @@ -406,7 +412,7 @@ async fn twins_receive_loop( Some(ps) => { for p in ps { let can_send = p.contains(&port); - eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); + tracing::info!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); for target_port in p { send_or_stash(can_send, *target_port, msg()); } @@ -419,7 +425,7 @@ async fn twins_receive_loop( match partitions_opt { None => { for target_port in target_ports { - eprintln!( + tracing::info!( "unicasting view={view_number} from={port} target={target_port}" ); send_or_stash(true, *target_port, msg()); @@ -430,7 +436,7 @@ async fn twins_receive_loop( let can_send = p.contains(&port); for target_port in target_ports { if p.contains(target_port) { - eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); + tracing::info!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); send_or_stash(can_send, *target_port, msg()); } } @@ -454,10 +460,15 @@ async fn twins_receive_loop( async fn twins_gossip_loop( ctx: &ctx::Ctx, stores: &HashMap>, - mut recv: UnboundedReceiver<(Port, Port, BlockNumber)>, + mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async move { - while let Ok((from, to, mut number)) = recv.recv(ctx).await { + while let Ok(TwinsGossipMessage { + from, + to, + mut number, + }) = recv.recv(ctx).await + { let local_store = &stores[&from]; let remote_store = &stores[&to]; let first_needed = remote_store.queued().next(); @@ -492,9 +503,9 @@ async fn twins_gossip_loop( .await } -fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { +fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber { match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0, + io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number, } } @@ -504,7 +515,8 @@ fn output_msg_label(msg: &io::OutputMessage) -> &str { } } -fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> { +fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> { + use validator::ConsensusMsg; match msg { io::OutputMessage::Consensus(cr) => match &cr.msg.msg { ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(), @@ -514,3 +526,16 @@ fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> { }, } } + +struct TwinsGossipMessage { + from: Port, + to: Port, + number: validator::BlockNumber, +} + +struct TwinsGossipConfig<'a> { + /// Ports to which this node should gossip to. + targets: &'a HashSet, + /// Channel over which to send gossip messages. + send: UnboundedSender, +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 2c09e9d0..36d57fbd 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -363,7 +363,7 @@ async fn run_twins( }) .collect(); - eprintln!( + tracing::info!( "num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}", cluster.num_nodes() ); @@ -387,7 +387,7 @@ async fn run_twins( .iter() .all(|s| *s < cluster.quorum_size()); - eprintln!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); + tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); } Test { diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index e7d452e6..897d1354 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -13,10 +13,7 @@ use std::{ sync::Arc, }; use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync}; -use zksync_consensus_roles::{ - node, - validator::{self, SecretKey}, -}; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_consensus_utils::pipe; @@ -92,7 +89,7 @@ pub fn new_configs_for_validators<'a, I>( gossip_peers: usize, ) -> Vec where - I: Iterator, + I: Iterator, { let configs = validator_keys.map(|validator_key| { let addr = net::tcp::testonly::reserve_listener();