Skip to content

Commit

Permalink
BFT-465: Twins tests (#117)
Browse files Browse the repository at this point in the history
## What ❔

Add tests that run multiple Twins scenarios with random number of
replicas as twins.

- [x] Add `Network::Twins` type with a partition schedule
- [x] Add `run_nodes_twins` to execute the test with arbitrary network
partitions and potentially multiple ports per validator
- [x] Buffer messages that can't be delivered and deliver them sometime
later (potentially unless superseded by a newer message from source to
target)
- [x] Simulate the gossiping of finalized blocks
- [x] Run test with no twins and no partitions
- [x] Run test with no twins but random partitions 
- [x] Run test with twins and random partitions 
### Investigation

```shell
cargo nextest run -p zksync_consensus_bft twins_network --no-capture
```

Initially the test failed when partitions are introduced. I'll try to
understand if this is because the leader got isolated and an important
message got lost. Would like to understand if eventual delivery is an
absolute requirement even if all partitions are at least as large as the
quorum size.

🔍 I think the reason for the test failing is because it looks for all
nodes having persisted a certain number of blocks, but if a proposal
with the payload is missed due to a partition preventing the message
from being delivered, then there is no mechanism in the machinery
instantiated by the test to procure these later, and those nodes are
stuck.

🔧 I implemented a message stashing mechanism in the mock network but it
still doesn't finalise blocks 👀

🔍 The problem seems to be that my unstashing mechanism only kicked in
when node A tried to send a new message to node B and wasn't blocked any
more. However if A didn't try to send, then the stashed messages to B
weren't delivered, which causes longer and longer timeouts as nobody is
making progress for one reason or another. Meanwhile for example C can
be already in a new view, so if we see that, we could conclude that
A-to-B should be unstashed as well even if there are no messages from A
to B in that round.

🔧 I'll try testing after merging
#119 which should
trigger unstashes in each round.

🔍 It's still failing after adding the replica-to-replica broadcast. For
example one replica A is isolated in a round 1 and doesn't get a
LeaderCommit; then in the next round replica B is isolated, and A gets
all the missing messages from round 1, plus the new LeaderPrepare, but
it doesn't respond with a ReplicaCommit because it doesn't have the
block from round 1, and therefore cannot even store proposal 2. The
consensus relies on the external gossiping mechanism to eventually
propagate the FinalBlock to it; until then the node is stuck. I need to
simulate the effect of gossiping in the test.

🔧 I implemented a simulated gossip using the following mechanism: if one
node is about to send/unstash a message to another and they are in a
gossip relationship, and the message contains a CommitQC, and the sender
has the finalized block for the committed height, then it inserts the
block directly into the target store.

🔍 The tests now work without twins, but fail with 1 or 2 twins, albeit
not on every scenario

🔧 Changed the simulated gossip to push all ancestors of a finalized
block into the target blockstore, not just the one in the CommitQC that
is in the latest message. This simulates the ability of the target to
fetch all missing blocks.

## Why ❔

To check that the consensus does not fail as long as the number of twins
does not exceed the tolerable number of Byzantine nodes.
  • Loading branch information
aakoshh authored Jun 10, 2024
1 parent 410636c commit 90b893c
Show file tree
Hide file tree
Showing 8 changed files with 713 additions and 73 deletions.
5 changes: 5 additions & 0 deletions node/actors/bft/src/testonly/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl Node {
let mut con_recv = consensus_pipe.recv;
let con_send = consensus_pipe.send;
scope::run!(ctx, |ctx, s| async {
// Run the consensus actor
s.spawn(async {
let validator_key = self.net.validator_key.clone().unwrap();
crate::Config {
Expand All @@ -75,6 +76,8 @@ impl Node {
.await
.context("consensus.run()")
});
// Forward input messages received from the network to the actor;
// turns output from others to input for this instance.
s.spawn(async {
while let Ok(network_message) = net_recv.recv(ctx).await {
match network_message {
Expand All @@ -85,6 +88,8 @@ impl Node {
}
Ok(())
});
// Forward output messages from the actor to the network;
// turns output from this to inputs for others.
// Get the next message from the channel. Our response depends on what type of replica we are.
while let Ok(msg) = con_recv.recv(ctx).await {
match msg {
Expand Down
530 changes: 462 additions & 68 deletions node/actors/bft/src/testonly/run.rs

Large diffs are not rendered by default.

212 changes: 209 additions & 3 deletions node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
use crate::testonly::{ut_harness::UTHarness, Behavior, Network, Test};
use zksync_concurrency::{ctx, scope, time};
use zksync_consensus_roles::validator;
use std::collections::HashMap;

use crate::testonly::{
twins::{Cluster, HasKey, ScenarioGenerator, Twin},
ut_harness::UTHarness,
Behavior, Network, PortSplitSchedule, Test,
};
use zksync_concurrency::{
ctx::{self, Ctx},
scope, time,
};
use zksync_consensus_network::testonly::new_configs_for_validators;
use zksync_consensus_roles::validator::{
self,
testonly::{Setup, SetupSpec},
LeaderSelectionMode, PublicKey, SecretKey,
};

async fn run_test(behavior: Behavior, network: Network) {
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30));
Expand Down Expand Up @@ -195,3 +209,195 @@ async fn non_proposing_leader() {
.await
.unwrap()
}

/// Run Twins scenarios without actual twins, and with so few nodes that all
/// of them are required for a quorum, which means (currently) there won't be
/// any partitions.
///
/// This should be a simple sanity check that the network works and consensus
/// is achieved under the most favourable conditions.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_wo_twins_wo_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n<6 implies f=0 and q=n
run_twins(ctx, 5, 0, 10).await.unwrap();
}

/// Run Twins scenarios without actual twins, but enough replicas that partitions
/// can play a role, isolating certain nodes (potentially the leader) in some
/// rounds.
///
/// This should be a sanity check that without Byzantine behaviour the consensus
/// is resilient to temporary network partitions.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_wo_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible.
run_twins(ctx, 6, 0, 5).await.unwrap();
}

/// Run Twins scenarios with random number of nodes and 1 twin.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_w1_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n>=6 implies f>=1 and q=n-f
for num_replicas in 6..=10 {
// let num_honest = validator::threshold(num_replicas as u64) as usize;
// let max_faulty = num_replicas - num_honest;
// let num_twins = rng.gen_range(1..=max_faulty);
run_twins(ctx, num_replicas, 1, 5).await.unwrap();
}
}

/// Run Twins scenarios with higher number of nodes and 2 twins.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_w2_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n>=11 implies f>=2 and q=n-f
run_twins(ctx, 11, 2, 5).await.unwrap();
}

/// Create network configuration for a given number of replicas and twins and run [Test].
async fn run_twins(
ctx: &Ctx,
num_replicas: usize,
num_twins: usize,
num_scenarios: usize,
) -> anyhow::Result<()> {
zksync_concurrency::testonly::abort_on_panic();
// Use a single timeout for all scenarios to finish.
// A single scenario with 11 replicas took 3-5 seconds.
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60));

#[derive(PartialEq, Debug)]
struct Replica {
id: i64, // non-zero ID
public_key: PublicKey,
secret_key: SecretKey,
}

impl HasKey for Replica {
type Key = PublicKey;

fn key(&self) -> &Self::Key {
&self.public_key
}
}

impl Twin for Replica {
fn to_twin(&self) -> Self {
Self {
id: -self.id,
public_key: self.public_key.clone(),
secret_key: self.secret_key.clone(),
}
}
}

let rng = &mut ctx.rng();

// The existing test machinery uses the number of finalized blocks as an exit criteria.
let blocks_to_finalize = 3;
// The test is going to disrupt the communication by partitioning nodes,
// where the leader might not be in a partition with enough replicas to
// form a quorum, therefore to allow N blocks to be finalized we need to
// go longer.
let num_rounds = blocks_to_finalize * 10;
// The paper considers 2 or 3 partitions enough.
let max_partitions = 3;

// Every validator has equal power of 1.
const WEIGHT: u64 = 1;
let mut spec = SetupSpec::new_with_weights(rng, vec![WEIGHT; num_replicas]);

let replicas = spec
.validator_weights
.iter()
.enumerate()
.map(|(i, (sk, _))| Replica {
id: i as i64 + 1,
public_key: sk.public(),
secret_key: sk.clone(),
})
.collect::<Vec<_>>();

let cluster = Cluster::new(replicas, num_twins);
let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions);

// Create network config for all nodes in the cluster (assigns unique network addresses).
let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1);

let node_to_port = cluster
.nodes()
.iter()
.zip(nets.iter())
.map(|(node, net)| (node.id, net.server_addr.port()))
.collect::<HashMap<_, _>>();

assert_eq!(node_to_port.len(), cluster.num_nodes());

// Every network needs a behaviour. They are all honest, just some might be duplicated.
let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()];

// Reuse the same cluster and network setup to run a few scenarios.
for i in 0..num_scenarios {
// Generate a permutation of partitions and leaders for the given number of rounds.
let scenario = scenarios.generate_one(rng);

// Assign the leadership schedule to the consensus.
spec.leader_selection =
LeaderSelectionMode::Rota(scenario.rounds.iter().map(|rc| rc.leader.clone()).collect());

// Generate a new setup with this leadership schedule.
let setup = Setup::from(spec.clone());

// Create a network with the partition schedule of the scenario.
let splits: PortSplitSchedule = scenario
.rounds
.iter()
.map(|rc| {
rc.partitions
.iter()
.map(|p| p.iter().map(|r| node_to_port[&r.id]).collect())
.collect()
})
.collect();

tracing::info!(
"num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}",
cluster.num_nodes()
);

for (r, rc) in scenario.rounds.iter().enumerate() {
let partitions = &splits[r];

let leader_ports = cluster
.nodes()
.iter()
.filter(|n| n.public_key == *rc.leader)
.map(|n| node_to_port[&n.id])
.collect::<Vec<_>>();

let leader_partition_sizes = leader_ports
.iter()
.map(|lp| partitions.iter().find(|p| p.contains(lp)).unwrap().len())
.collect::<Vec<_>>();

let leader_isolated = leader_partition_sizes
.iter()
.all(|s| *s < cluster.quorum_size());

tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}");
}

Test {
network: Network::Twins(splits),
nodes: nodes.clone(),
blocks_to_finalize,
}
.run_with_config(ctx, nets.clone(), &setup.genesis)
.await?
}

Ok(())
}
2 changes: 1 addition & 1 deletion node/actors/network/src/gossip/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Queue {
) -> ctx::OrCanceled<Call> {
let sub = &mut self.0.subscribe();
while ctx.is_active() {
// Wait for the lowest requested block to be available.
// Wait for the lowest requested block to be available on the remote peer.
// This scope is always cancelled, so we ignore the result.
let mut block_number = None;
let _: Result<(), _> = scope::run!(ctx, |ctx, s| async {
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub(crate) struct Network {
/// Output pipe of the network actor.
pub(crate) sender: channel::UnboundedSender<io::OutputMessage>,
/// Queue of block fetching requests.
///
/// These are blocks that this node wants to request from remote peers via RPC.
pub(crate) fetch_queue: fetch::Queue,
/// Last viewed QC.
pub(crate) last_viewed_qc: Option<attester::BatchQC>,
Expand Down
1 change: 1 addition & 0 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl Network {

// Perform get_block calls to peer.
s.spawn::<()>(async {
// Gossiped state of what range of blocks is available on the remote peer.
let state = &mut push_block_store_state_server.state.subscribe();
loop {
let call = get_block_client.reserve(ctx).await?;
Expand Down
16 changes: 15 additions & 1 deletion node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,21 @@ pub fn new_configs(
setup: &validator::testonly::Setup,
gossip_peers: usize,
) -> Vec<Config> {
let configs = setup.validator_keys.iter().map(|validator_key| {
new_configs_for_validators(rng, setup.validator_keys.iter(), gossip_peers)
}

/// Construct configs for `n` validators of the consensus.
///
/// This version allows for repeating keys used in Twins tests.
pub fn new_configs_for_validators<'a, I>(
rng: &mut impl Rng,
validator_keys: I,
gossip_peers: usize,
) -> Vec<Config>
where
I: Iterator<Item = &'a validator::SecretKey>,
{
let configs = validator_keys.map(|validator_key| {
let addr = net::tcp::testonly::reserve_listener();
Config {
server_addr: addr,
Expand Down
18 changes: 18 additions & 0 deletions node/libs/utils/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,30 @@ use std::future::Future;
use zksync_concurrency::ctx::{self, channel, Ctx};

/// This is the end of the Pipe that should be held by the actor.
///
/// The actor can receive `In` and send back `Out` messages.
pub type ActorPipe<In, Out> = Pipe<In, Out>;

/// This is the end of the Pipe that should be held by the dispatcher.
///
/// The dispatcher can send `In` messages and receive `Out` back.
pub type DispatcherPipe<In, Out> = Pipe<Out, In>;

/// This is a generic Pipe end.
///
/// It is used to receive `In` and send `Out` messages.
///
/// When viewed from the perspective of [new]:
/// * messages of type `In` are going right-to-left
/// * messages of type `Out` are going left-to-right
///
/// ```text
/// In <- -------- <- In
/// | Pipe |
/// Out -> -------- -> Out
/// ^ ^
/// Actor Dispatcher
/// ```
#[derive(Debug)]
pub struct Pipe<In, Out> {
/// This is the channel that receives messages.
Expand Down

0 comments on commit 90b893c

Please sign in to comment.