Skip to content

Commit

Permalink
BFT-465: Separate twins receive loop function
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 30, 2024
1 parent 8709620 commit 9946567
Showing 1 changed file with 137 additions and 116 deletions.
253 changes: 137 additions & 116 deletions node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ use network::{io, Config};
use rand::prelude::SliceRandom;
use std::collections::{HashMap, HashSet};
use tracing::Instrument as _;
use zksync_concurrency::{ctx, oneshot, scope};
use zksync_concurrency::{
ctx::{
self,
channel::{UnboundedReceiver, UnboundedSender},
},
oneshot, scope,
};
use zksync_consensus_network as network;
use zksync_consensus_roles::validator::{self, Genesis};
use zksync_consensus_roles::validator::{self, Genesis, PublicKey};
use zksync_consensus_storage::testonly::new_store;
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -218,141 +224,156 @@ async fn run_nodes_twins(
}
// Taking these refeferences is necessary for the `scope::run!` environment lifetime rules to compile
// with `async move`, which in turn is necessary otherwise it the spawned process could not borrow `port`.
// Potentially `ctx::NoCopy` could be used with `port`.
let sends = &sends;
let validator_ports = &validator_ports;
let start = &ctx.now();

// Run networks by receiving from all consensus instances:
// * identify the view they are in from the message
// * identify the partition they are in based on their network id
// * either broadcast to all other instances in the partition, or find out the network
// identity of the target validator and send to it _iff_ they are in the same partition
scope::run!(ctx, |ctx, s| async move {
for (port, mut recv) in recvs {
for (port, recv) in recvs {
s.spawn(async move {
use zksync_consensus_network::io;
let rng = &mut ctx.rng();
twins_receive_loop(ctx, port, recv, sends, validator_ports, splits).await
});
}
anyhow::Ok(())
})
.await
})
.await
}

// We need to buffer messages that cannot be delivered due to partitioning, and deliver them later.
// The spec says that the network is expected to deliver messages eventually, potentially out of order,
// caveated by the fact that the actual implementation only keep retrying the last message.
// However with the actors instantiated in by this test, that would not be sufficient because
// there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip,
// and will forever be unable to finalize blocks.
// A separate issue is the definition of "later", without actually adding timing assumptions:
// * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas
// don't move on from a view until they reach quorum, then "later" could be defined by so many
// messages in a view that it indicates a timeout loop. NB the consensus might not actually have
// an actual timeout loop and might rely on the network trying to re-broadcast forever.
// * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that
// can move on to the next view, in which a new partition configuration will allow them to broadcast
// to previously isolated peers.
// * One idea is to wait until replica A wants to send to replica B in a view when they are no longer
// partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff
// out of the box, because replicas only communicate with their leader, so if for example B missed
// a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because
// they can't commit the earlier block until they get a new message from A. However since
// https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted,
// so we shouldn't have to wait long for A to unstash its messages to B.
// * If that wouldn't be acceptable then we could have some kind of global view of stashed messages
// and unstash them as soon as someone moves on to a new view.
let mut stashes: HashMap<Port, Vec<io::OutputMessage>> = HashMap::new();
/// Receive input messages from the consensus actor and send them to the others
/// according to the partition schedule of the port associated with this instance.
async fn twins_receive_loop(
ctx: &ctx::Ctx,
port: Port,
mut recv: UnboundedReceiver<io::InputMessage>,
sends: &HashMap<Port, UnboundedSender<io::OutputMessage>>,
validator_ports: &HashMap<PublicKey, Vec<Port>>,
splits: &PortSplitSchedule,
) -> anyhow::Result<()> {
let rng = &mut ctx.rng();
let start = &ctx.now();

// Either stash a message, or unstash all messages and send them to the target along with the new one.
let mut send_or_stash =
|can_send: bool, target_port: Port, msg: io::OutputMessage| {
let stash = stashes.entry(target_port).or_default();
let view = output_msg_view_number(&msg);
let kind = output_msg_label(&msg);
// We need to buffer messages that cannot be delivered due to partitioning, and deliver them later.
// The spec says that the network is expected to deliver messages eventually, potentially out of order,
// caveated by the fact that the actual implementation only keep retrying the last message.
// However with the actors instantiated in by this test, that would not be sufficient because
// there is no gossip network here, and if a replica misses a proposal, it won't get it via gossip,
// and will forever be unable to finalize blocks.
// A separate issue is the definition of "later", without actually adding timing assumptions:
// * If we want to allow partitions which don't have enough replicas for a quorum, and the replicas
// don't move on from a view until they reach quorum, then "later" could be defined by so many
// messages in a view that it indicates a timeout loop. NB the consensus might not actually have
// an actual timeout loop and might rely on the network trying to re-broadcast forever.
// * OTOH if all partitions are at least quorum sized, then we there will be a subset of nodes that
// can move on to the next view, in which a new partition configuration will allow them to broadcast
// to previously isolated peers.
// * One idea is to wait until replica A wants to send to replica B in a view when they are no longer
// partitioned, and then unstash all previous A-to-B messages. This would _not_ work with HotStuff
// out of the box, because replicas only communicate with their leader, so if for example B missed
// a LeaderCommit from A in an earlier view, B will not respond to the LeaderPrepare from C because
// they can't commit the earlier block until they get a new message from A. However since
// https://github.com/matter-labs/era-consensus/pull/119 the ReplicaPrepare messages are broadcasted,
// so we shouldn't have to wait long for A to unstash its messages to B.
// * If that wouldn't be acceptable then we could have some kind of global view of stashed messages
// and unstash them as soon as someone moves on to a new view.
let mut stashes: HashMap<Port, Vec<io::OutputMessage>> = HashMap::new();

if can_send {
let s = &sends[&target_port];
// Either stash a message, or unstash all messages and send them to the target along with the new one.
let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| {
let stash = stashes.entry(target_port).or_default();
let view = output_msg_view_number(&msg);
let kind = output_msg_label(&msg);

// Messages can be delivered in arbitrary order.
stash.shuffle(rng);
if can_send {
let s = &sends[&target_port];

for unstashed in stash.drain(0..) {
let view = output_msg_view_number(&unstashed);
let kind = output_msg_label(&unstashed);
eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}");
s.send(unstashed);
}
eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}");
s.send(msg);
} else {
eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}");
stash.push(msg)
}
};
// Messages can be delivered in arbitrary order.
stash.shuffle(rng);

while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await {
let view_number = message.message.msg.view().number.0 as usize;
// Here we assume that all instances start from view 0 in the tests.
// If the view is higher than what we have planned for, assume no partitions.
// Every node is guaranteed to be present in only one partition.
let partitions_opt = splits.get(view_number);
for unstashed in stash.drain(0..) {
let view = output_msg_view_number(&unstashed);
let kind = output_msg_label(&unstashed);
eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}");
s.send(unstashed);
}
eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}");
s.send(msg);
// TODO: If the message is a LeaderPrepare then remember the block sent,
// then if the next message is a LeaderCommit with a CommitQC then
// prepare a FinalBlock and pretend that we have a gossip layer by
// by calling block_store.queue_block on every other node.
} else {
eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}");
stash.push(msg)
}
};

let msg = || {
io::OutputMessage::Consensus(io::ConsensusReq {
msg: message.message.clone(),
ack: oneshot::channel().0,
})
};
while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await {
let view_number = message.message.msg.view().number.0 as usize;
// Here we assume that all instances start from view 0 in the tests.
// If the view is higher than what we have planned for, assume no partitions.
// Every node is guaranteed to be present in only one partition.
let partitions_opt = splits.get(view_number);

match message.recipient {
io::Target::Broadcast => match partitions_opt {
None => {
eprintln!("broadcasting view={view_number} from={port} target=all");
for target_port in sends.keys() {
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64());
for target_port in p {
send_or_stash(can_send, *target_port, msg());
}
}
}
},
io::Target::Validator(v) => {
let target_ports = &validator_ports[&v];
let msg = || {
io::OutputMessage::Consensus(io::ConsensusReq {
msg: message.message.clone(),
ack: oneshot::channel().0,
})
};

match message.recipient {
io::Target::Broadcast => match partitions_opt {
None => {
eprintln!("broadcasting view={view_number} from={port} target=all");
for target_port in sends.keys() {
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_seconds_f64());
for target_port in p {
send_or_stash(can_send, *target_port, msg());
}
}
}
},
io::Target::Validator(v) => {
let target_ports = &validator_ports[&v];

match partitions_opt {
None => {
for target_port in target_ports {
eprintln!("unicasting view={view_number} from={port} target={target_port}");
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
for target_port in target_ports {
if p.contains(target_port) {
eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64());
send_or_stash(
can_send,
*target_port,
msg(),
);
}
}
}
}
match partitions_opt {
None => {
for target_port in target_ports {
eprintln!(
"unicasting view={view_number} from={port} target={target_port}"
);
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
for target_port in target_ports {
if p.contains(target_port) {
eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_seconds_f64());
send_or_stash(can_send, *target_port, msg());
}
}
}
}
Ok(())
});
}
}
anyhow::Ok(())
})
.await
})
.await
}
}
Ok(())
}

fn output_msg_view_number(msg: &io::OutputMessage) -> u64 {
Expand All @@ -363,6 +384,6 @@ fn output_msg_view_number(msg: &io::OutputMessage) -> u64 {

fn output_msg_label(msg: &io::OutputMessage) -> &str {
match msg {
io::OutputMessage::Consensus(cr) => cr.msg.msg.label()
io::OutputMessage::Consensus(cr) => cr.msg.msg.label(),
}
}
}

0 comments on commit 9946567

Please sign in to comment.