diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 388489c1..88e95116 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -3,9 +3,15 @@ use network::{io, Config}; use rand::prelude::SliceRandom; use std::collections::{HashMap, HashSet}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, oneshot, scope}; +use zksync_concurrency::{ + ctx::{ + self, + channel::{UnboundedReceiver, UnboundedSender}, + }, + oneshot, scope, +}; use zksync_consensus_network as network; -use zksync_consensus_roles::validator::{self, Genesis}; +use zksync_consensus_roles::validator::{self, Genesis, PublicKey}; use zksync_consensus_storage::testonly::new_store; use zksync_consensus_utils::pipe; @@ -218,141 +224,156 @@ async fn run_nodes_twins( } // Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile // with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`. + // Potentially `ctx::NoCopy` could be used with `port`. let sends = &sends; let validator_ports = &validator_ports; - let start = &ctx.now(); + // Run networks by receiving from all consensus instances: // * identify the view they are in from the message // * identify the partition they are in based on their network id // * either broadcast to all other instances in the partition, or find out the network // identity of the target validator and send to it _iff_ they are in the same partition scope::run!(ctx, |ctx, s| async move { - for (port, mut recv) in recvs { + for (port, recv) in recvs { s.spawn(async move { - use zksync_consensus_network::io; - let rng = &mut ctx.rng(); + twins_receive_loop(ctx, port, recv, sends, validator_ports, splits).await + }); + } + anyhow::Ok(()) + }) + .await + }) + .await +} - // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. - // The spec says that the network is expected to deliver messages eventually, potentially out of order, - // caveated by the fact that the actual implementation only keep retrying the last message. - // However with the actors instantiated in by this test, that would not be sufficient because - // there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip, - // and will forever be unable to finalize blocks. - // A separate issue is the definition of "later", without actually adding timing assumptions: - // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas - // don't move on from a view until they reach quorum, then "later" could be defined by so many - // messages in a view that it indicates a timeout loop. NB the consensus might not actually have - // an actual timeout loop and might rely on the network trying to re-broadcast forever. - // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that - // can move on to the next view, in which a new partition configuration will allow them to broadcast - // to previously isolated peers. - // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer - // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff - // out of the box, because replicas only communicate with their leader, so if for example B missed - // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because - // they can't commit the earlier block until they get a new message from A. However since - // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, - // so we shouldn't have to wait long for A to unstash its messages to B. - // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages - // and unstash them as soon as someone moves on to a new view. - let mut stashes: HashMap> = HashMap::new(); +/// Receive input messages from the consensus actor and send them to the others +/// according to the partition schedule of the port associated with this instance. +async fn twins_receive_loop( + ctx: &ctx::Ctx, + port: Port, + mut recv: UnboundedReceiver, + sends: &HashMap>, + validator_ports: &HashMap>, + splits: &PortSplitSchedule, +) -> anyhow::Result<()> { + let rng = &mut ctx.rng(); + let start = &ctx.now(); - // Either stash a message, or unstash all messages and send them to the target along with the new one. - let mut send_or_stash = - |can_send: bool, target_port: Port, msg: io::OutputMessage| { - let stash = stashes.entry(target_port).or_default(); - let view = output_msg_view_number(&msg); - let kind = output_msg_label(&msg); + // We need to buffer messages that cannot be delivered due to partitioning, and deliver them later. + // The spec says that the network is expected to deliver messages eventually, potentially out of order, + // caveated by the fact that the actual implementation only keep retrying the last message. + // However with the actors instantiated in by this test, that would not be sufficient because + // there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip, + // and will forever be unable to finalize blocks. + // A separate issue is the definition of "later", without actually adding timing assumptions: + // * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas + // don't move on from a view until they reach quorum, then "later" could be defined by so many + // messages in a view that it indicates a timeout loop. NB the consensus might not actually have + // an actual timeout loop and might rely on the network trying to re-broadcast forever. + // * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that + // can move on to the next view, in which a new partition configuration will allow them to broadcast + // to previously isolated peers. + // * One idea is to wait until replica A wants to send to replica B in a view when they are no longer + // partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff + // out of the box, because replicas only communicate with their leader, so if for example B missed + // a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because + // they can't commit the earlier block until they get a new message from A. However since + // https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted, + // so we shouldn't have to wait long for A to unstash its messages to B. + // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages + // and unstash them as soon as someone moves on to a new view. + let mut stashes: HashMap> = HashMap::new(); - if can_send { - let s = &sends[&target_port]; + // Either stash a message, or unstash all messages and send them to the target along with the new one. + let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let stash = stashes.entry(target_port).or_default(); + let view = output_msg_view_number(&msg); + let kind = output_msg_label(&msg); - // Messages can be delivered in arbitrary order. - stash.shuffle(rng); + if can_send { + let s = &sends[&target_port]; - for unstashed in stash.drain(0..) { - let view = output_msg_view_number(&unstashed); - let kind = output_msg_label(&unstashed); - eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); - s.send(unstashed); - } - eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); - s.send(msg); - } else { - eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); - stash.push(msg) - } - }; + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); - while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { - let view_number = message.message.msg.view().number.0 as usize; - // Here we assume that all instances start from view 0 in the tests. - // If the view is higher than what we have planned for, assume no partitions. - // Every node is guaranteed to be present in only one partition. - let partitions_opt = splits.get(view_number); + for unstashed in stash.drain(0..) { + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); + s.send(unstashed); + } + eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); + s.send(msg); + // TODO: If the message is a LeaderPrepare then remember the block sent, + // then if the next message is a LeaderCommit with a CommitQC then + // prepare a FinalBlock and pretend that we have a gossip layer by + // by calling block_store.queue_block on every other node. + } else { + eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); + stash.push(msg) + } + }; - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) - }; + while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + let view_number = message.message.msg.view().number.0 as usize; + // Here we assume that all instances start from view 0 in the tests. + // If the view is higher than what we have planned for, assume no partitions. + // Every node is guaranteed to be present in only one partition. + let partitions_opt = splits.get(view_number); - match message.recipient { - io::Target::Broadcast => match partitions_opt { - None => { - eprintln!("broadcasting view={view_number} from={port} target=all"); - for target_port in sends.keys() { - send_or_stash(true, *target_port, msg()); - } - } - Some(ps) => { - for p in ps { - let can_send = p.contains(&port); - eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64()); - for target_port in p { - send_or_stash(can_send, *target_port, msg()); - } - } - } - }, - io::Target::Validator(v) => { - let target_ports = &validator_ports[&v]; + let msg = || { + io::OutputMessage::Consensus(io::ConsensusReq { + msg: message.message.clone(), + ack: oneshot::channel().0, + }) + }; + + match message.recipient { + io::Target::Broadcast => match partitions_opt { + None => { + eprintln!("broadcasting view={view_number} from={port} target=all"); + for target_port in sends.keys() { + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64()); + for target_port in p { + send_or_stash(can_send, *target_port, msg()); + } + } + } + }, + io::Target::Validator(v) => { + let target_ports = &validator_ports[&v]; - match partitions_opt { - None => { - for target_port in target_ports { - eprintln!("unicasting view={view_number} from={port} target={target_port}"); - send_or_stash(true, *target_port, msg()); - } - } - Some(ps) => { - for p in ps { - let can_send = p.contains(&port); - for target_port in target_ports { - if p.contains(target_port) { - eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64()); - send_or_stash( - can_send, - *target_port, - msg(), - ); - } - } - } - } + match partitions_opt { + None => { + for target_port in target_ports { + eprintln!( + "unicasting view={view_number} from={port} target={target_port}" + ); + send_or_stash(true, *target_port, msg()); + } + } + Some(ps) => { + for p in ps { + let can_send = p.contains(&port); + for target_port in target_ports { + if p.contains(target_port) { + eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64()); + send_or_stash(can_send, *target_port, msg()); } } } } - Ok(()) - }); + } } - anyhow::Ok(()) - }) - .await - }) - .await + } + } + Ok(()) } fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { @@ -363,6 +384,6 @@ fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { fn output_msg_label(msg: &io::OutputMessage) -> &str { match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.label() + io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), } -} \ No newline at end of file +}