Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFT-465: Twins tests #117

Merged
merged 26 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ae67132
BFT-465: Skeleton for Twins tests
aakoshh May 24, 2024
1e4357f
BFT-465: Some comments to help understand the flow
aakoshh May 28, 2024
a93aa84
BFT-465: Split different network executions
aakoshh May 28, 2024
43fb874
BFT-465: Twins network skeleton
aakoshh May 28, 2024
a26d2cf
BFT-465: Make it compile with lifetimes
aakoshh May 28, 2024
0a413a4
BFT-465: Use break after partition is found
aakoshh May 28, 2024
34afa95
BFT-465: Add Network::Twins
aakoshh May 28, 2024
6360c12
BFT-465: Call twins test runner
aakoshh May 28, 2024
9c06ad4
BFT-465: Try with the simplest test
aakoshh May 29, 2024
7a2c516
BFT-465: Try with partitions
aakoshh May 29, 2024
8a838e3
BFT-465: Debugging messages
aakoshh May 29, 2024
9e6fd72
BFT-465: Stash messages and unstash when partition lifted
aakoshh May 29, 2024
e558c31
BFT-465: Comments about why the unstashing doesn't work with HotStuff
aakoshh May 29, 2024
7cc9dee
Merge remote-tracking branch 'origin/main' into bft-465-twins-test
aakoshh May 29, 2024
8709620
BFT-465: Helpful comments in gossip code
aakoshh May 30, 2024
9946567
BFT-465: Separate twins receive loop function
aakoshh May 30, 2024
9d47255
BFT-465: Twins gossip loop
aakoshh May 30, 2024
474a305
BFT-465: Fix replica ID
aakoshh May 30, 2024
78b81f2
BFT-465: Try with the maximum cluster size with 11 nodes and 2 twins
aakoshh May 30, 2024
683cc7e
BFT-465: Quit as soon as we run out of rounds
aakoshh May 30, 2024
47b34f9
BFT-465: Only compare the payload for equivalence at the end
aakoshh May 30, 2024
23a10e2
BFT-465: Allow more time to finish and run more scenarios until it fails
aakoshh May 30, 2024
c596aa2
BFT-465: Print multiple leader ports in debug
aakoshh May 30, 2024
ea10b1d
BFT-465: Gossip all predecessors
aakoshh May 30, 2024
c13bcbf
BFT-465: Fix some comments
aakoshh May 30, 2024
2394350
BFT-465: Nits
aakoshh Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/actors/bft/src/testonly/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl Node {
let mut con_recv = consensus_pipe.recv;
let con_send = consensus_pipe.send;
scope::run!(ctx, |ctx, s| async {
// Run the consensus actor
s.spawn(async {
let validator_key = self.net.validator_key.clone().unwrap();
crate::Config {
Expand All @@ -75,6 +76,8 @@ impl Node {
.await
.context("consensus.run()")
});
// Forward input messages received from the network to the actor;
// turns output from others to input for this instance.
s.spawn(async {
while let Ok(network_message) = net_recv.recv(ctx).await {
match network_message {
Expand All @@ -85,6 +88,8 @@ impl Node {
}
Ok(())
});
// Forward output messages from the actor to the network;
// turns output from this to inputs for others.
// Get the next message from the channel. Our response depends on what type of replica we are.
while let Ok(msg) = con_recv.recv(ctx).await {
match msg {
Expand Down
530 changes: 462 additions & 68 deletions node/actors/bft/src/testonly/run.rs

Large diffs are not rendered by default.

212 changes: 209 additions & 3 deletions node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
use crate::testonly::{ut_harness::UTHarness, Behavior, Network, Test};
use zksync_concurrency::{ctx, scope, time};
use zksync_consensus_roles::validator;
use std::collections::HashMap;

use crate::testonly::{
twins::{Cluster, HasKey, ScenarioGenerator, Twin},
ut_harness::UTHarness,
Behavior, Network, PortSplitSchedule, Test,
};
use zksync_concurrency::{
ctx::{self, Ctx},
scope, time,
};
use zksync_consensus_network::testonly::new_configs_for_validators;
use zksync_consensus_roles::validator::{
self,
testonly::{Setup, SetupSpec},
LeaderSelectionMode, PublicKey, SecretKey,
};

async fn run_test(behavior: Behavior, network: Network) {
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30));
Expand Down Expand Up @@ -195,3 +209,195 @@ async fn non_proposing_leader() {
.await
.unwrap()
}

/// Run Twins scenarios without actual twins, and with so few nodes that all
/// of them are required for a quorum, which means (currently) there won't be
/// any partitions.
///
/// This should be a simple sanity check that the network works and consensus
/// is achieved under the most favourable conditions.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_wo_twins_wo_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n<6 implies f=0 and q=n
run_twins(ctx, 5, 0, 10).await.unwrap();
}

/// Run Twins scenarios without actual twins, but enough replicas that partitions
/// can play a role, isolating certain nodes (potentially the leader) in some
/// rounds.
///
/// This should be a sanity check that without Byzantine behaviour the consensus
/// is resilient to temporary network partitions.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_wo_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n=6 implies f=1 and q=5; 6 is the minimum where partitions are possible.
run_twins(ctx, 6, 0, 5).await.unwrap();
}

/// Run Twins scenarios with random number of nodes and 1 twin.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_w1_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n>=6 implies f>=1 and q=n-f
for num_replicas in 6..=10 {
// let num_honest = validator::threshold(num_replicas as u64) as usize;
// let max_faulty = num_replicas - num_honest;
// let num_twins = rng.gen_range(1..=max_faulty);
run_twins(ctx, num_replicas, 1, 5).await.unwrap();
}
}

/// Run Twins scenarios with higher number of nodes and 2 twins.
#[tokio::test(flavor = "multi_thread")]
async fn twins_network_w2_twins_w_partitions() {
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
// n>=11 implies f>=2 and q=n-f
run_twins(ctx, 11, 2, 5).await.unwrap();
}

/// Create network configuration for a given number of replicas and twins and run [Test].
async fn run_twins(
ctx: &Ctx,
num_replicas: usize,
num_twins: usize,
num_scenarios: usize,
) -> anyhow::Result<()> {
zksync_concurrency::testonly::abort_on_panic();
// Use a single timeout for all scenarios to finish.
// A single scenario with 11 replicas took 3-5 seconds.
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60));

#[derive(PartialEq, Debug)]
struct Replica {
id: i64, // non-zero ID
public_key: PublicKey,
secret_key: SecretKey,
}

impl HasKey for Replica {
type Key = PublicKey;

fn key(&self) -> &Self::Key {
&self.public_key
}
}

impl Twin for Replica {
fn to_twin(&self) -> Self {
Self {
id: -self.id,
public_key: self.public_key.clone(),
secret_key: self.secret_key.clone(),
}
}
}

let rng = &mut ctx.rng();

// The existing test machinery uses the number of finalized blocks as an exit criteria.
let blocks_to_finalize = 3;
// The test is going to disrupt the communication by partitioning nodes,
// where the leader might not be in a partition with enough replicas to
// form a quorum, therefore to allow N blocks to be finalized we need to
// go longer.
let num_rounds = blocks_to_finalize * 10;
// The paper considers 2 or 3 partitions enough.
let max_partitions = 3;

// Every validator has equal power of 1.
const WEIGHT: u64 = 1;
let mut spec = SetupSpec::new_with_weights(rng, vec![WEIGHT; num_replicas]);

let replicas = spec
.validator_weights
.iter()
.enumerate()
.map(|(i, (sk, _))| Replica {
id: i as i64 + 1,
public_key: sk.public(),
secret_key: sk.clone(),
})
.collect::<Vec<_>>();

let cluster = Cluster::new(replicas, num_twins);
let scenarios = ScenarioGenerator::new(&cluster, num_rounds, max_partitions);

// Create network config for all nodes in the cluster (assigns unique network addresses).
let nets = new_configs_for_validators(rng, cluster.nodes().iter().map(|r| &r.secret_key), 1);

let node_to_port = cluster
.nodes()
.iter()
.zip(nets.iter())
.map(|(node, net)| (node.id, net.server_addr.port()))
.collect::<HashMap<_, _>>();

assert_eq!(node_to_port.len(), cluster.num_nodes());

// Every network needs a behaviour. They are all honest, just some might be duplicated.
let nodes = vec![(Behavior::Honest, WEIGHT); cluster.num_nodes()];

// Reuse the same cluster and network setup to run a few scenarios.
for i in 0..num_scenarios {
// Generate a permutation of partitions and leaders for the given number of rounds.
let scenario = scenarios.generate_one(rng);

// Assign the leadership schedule to the consensus.
spec.leader_selection =
LeaderSelectionMode::Rota(scenario.rounds.iter().map(|rc| rc.leader.clone()).collect());

// Generate a new setup with this leadership schedule.
let setup = Setup::from(spec.clone());

// Create a network with the partition schedule of the scenario.
let splits: PortSplitSchedule = scenario
.rounds
.iter()
.map(|rc| {
rc.partitions
.iter()
.map(|p| p.iter().map(|r| node_to_port[&r.id]).collect())
.collect()
})
.collect();

tracing::info!(
"num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}",
cluster.num_nodes()
);

for (r, rc) in scenario.rounds.iter().enumerate() {
let partitions = &splits[r];

let leader_ports = cluster
.nodes()
.iter()
.filter(|n| n.public_key == *rc.leader)
.map(|n| node_to_port[&n.id])
.collect::<Vec<_>>();

let leader_partition_sizes = leader_ports
.iter()
.map(|lp| partitions.iter().find(|p| p.contains(lp)).unwrap().len())
.collect::<Vec<_>>();

let leader_isolated = leader_partition_sizes
.iter()
.all(|s| *s < cluster.quorum_size());

tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}");
}

Test {
network: Network::Twins(splits),
nodes: nodes.clone(),
blocks_to_finalize,
}
.run_with_config(ctx, nets.clone(), &setup.genesis)
.await?
}

Ok(())
}
2 changes: 1 addition & 1 deletion node/actors/network/src/gossip/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Queue {
) -> ctx::OrCanceled<Call> {
let sub = &mut self.0.subscribe();
while ctx.is_active() {
// Wait for the lowest requested block to be available.
// Wait for the lowest requested block to be available on the remote peer.
// This scope is always cancelled, so we ignore the result.
let mut block_number = None;
let _: Result<(), _> = scope::run!(ctx, |ctx, s| async {
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub(crate) struct Network {
/// Output pipe of the network actor.
pub(crate) sender: channel::UnboundedSender<io::OutputMessage>,
/// Queue of block fetching requests.
///
/// These are blocks that this node wants to request from remote peers via RPC.
pub(crate) fetch_queue: fetch::Queue,
/// Last viewed QC.
pub(crate) last_viewed_qc: Option<attester::BatchQC>,
Expand Down
1 change: 1 addition & 0 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl Network {

// Perform get_block calls to peer.
s.spawn::<()>(async {
// Gossiped state of what range of blocks is available on the remote peer.
let state = &mut push_block_store_state_server.state.subscribe();
loop {
let call = get_block_client.reserve(ctx).await?;
Expand Down
16 changes: 15 additions & 1 deletion node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,21 @@ pub fn new_configs(
setup: &validator::testonly::Setup,
gossip_peers: usize,
) -> Vec<Config> {
let configs = setup.validator_keys.iter().map(|validator_key| {
new_configs_for_validators(rng, setup.validator_keys.iter(), gossip_peers)
}

/// Construct configs for `n` validators of the consensus.
///
/// This version allows for repeating keys used in Twins tests.
pub fn new_configs_for_validators<'a, I>(
rng: &mut impl Rng,
validator_keys: I,
gossip_peers: usize,
) -> Vec<Config>
where
I: Iterator<Item = &'a validator::SecretKey>,
{
let configs = validator_keys.map(|validator_key| {
let addr = net::tcp::testonly::reserve_listener();
Config {
server_addr: addr,
Expand Down
18 changes: 18 additions & 0 deletions node/libs/utils/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,30 @@ use std::future::Future;
use zksync_concurrency::ctx::{self, channel, Ctx};

/// This is the end of the Pipe that should be held by the actor.
///
/// The actor can receive `In` and send back `Out` messages.
pub type ActorPipe<In, Out> = Pipe<In, Out>;

/// This is the end of the Pipe that should be held by the dispatcher.
///
/// The dispatcher can send `In` messages and receive `Out` back.
pub type DispatcherPipe<In, Out> = Pipe<Out, In>;

/// This is a generic Pipe end.
///
/// It is used to receive `In` and send `Out` messages.
///
/// When viewed from the perspective of [new]:
/// * messages of type `In` are going right-to-left
/// * messages of type `Out` are going left-to-right
///
/// ```text
/// In <- -------- <- In
/// | Pipe |
/// Out -> -------- -> Out
/// ^ ^
/// Actor Dispatcher
/// ```
#[derive(Debug)]
pub struct Pipe<In, Out> {
/// This is the channel that receives messages.
Expand Down
Loading