Skip to content

Commit

Permalink
added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Apr 11, 2024
1 parent 1b28fd7 commit c8d06ef
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 26 deletions.
28 changes: 15 additions & 13 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const RESP_MAX_SIZE: usize = kB;
/// is down.
const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10);

type MsgPoolInner = BTreeMap<usize, io::ConsensusInputMessage>;
type MsgPoolInner = BTreeMap<usize, Arc<io::ConsensusInputMessage>>;

/// Pool of messages to send.
/// It stores the newest message (with the highest view) of each type.
Expand All @@ -44,7 +44,7 @@ impl MsgPool {
}

/// Inserts a message to the pool.
pub(crate) fn send(&self, msg: io::ConsensusInputMessage) {
pub(crate) fn send(&self, msg: Arc<io::ConsensusInputMessage>) {
self.0.send_if_modified(|msgs| {
// Select a unique ID for the new message: using `last ID+1` is ok (will NOT cause
// an ID to be reused), because whenever we remove a message, we also insert a message.
Expand Down Expand Up @@ -92,21 +92,15 @@ impl MsgPool {
}

impl MsgPoolRecv {
/// Awaits a message addressed to `peer`.
/// Awaits the next message.
pub(crate) async fn recv(
&mut self,
ctx: &ctx::Ctx,
peer: &validator::PublicKey,
) -> ctx::OrCanceled<validator::Signed<validator::ConsensusMsg>> {
) -> ctx::OrCanceled<Arc<io::ConsensusInputMessage>> {
loop {
for (k, v) in self.recv.borrow().range(self.next..) {
if let Some((k, v)) = self.recv.borrow().range(self.next..).next() {
self.next = k + 1;
match &v.recipient {
io::Target::Broadcast => {}
io::Target::Validator(x) if x == peer => {}
_ => continue,
}
return Ok(v.message.clone());
return Ok(v.clone());
}
sync::changed(ctx, &mut self.recv).await?;
}
Expand Down Expand Up @@ -235,7 +229,15 @@ impl Network {
let mut sub = self.msg_pool.subscribe();
loop {
let call = consensus_cli.reserve(ctx).await?;
let msg = sub.recv(ctx, peer).await?;
let msg = loop {
let msg = sub.recv(ctx).await?;
match &msg.recipient {
io::Target::Broadcast => {}
io::Target::Validator(recipient) if recipient == peer => {}
_ => continue,
}
break msg.message.clone();
};
s.spawn(async {
let req = rpc::consensus::Req(msg);
let res = call.call(ctx, &req, RESP_MAX_SIZE).await;
Expand Down
106 changes: 97 additions & 9 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,90 @@ use super::*;
use crate::{io, metrics, preface, rpc, testonly};
use assert_matches::assert_matches;
use rand::Rng;
use std::collections::HashSet;
use zksync_concurrency::{ctx, net, scope, testonly::abort_on_panic};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::testonly::new_store;
use zksync_consensus_utils::enum_util::Variant as _;

#[tokio::test]
async fn test_msg_pool() {
use validator::ConsensusMsg as M;
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let pool = MsgPool::new();

// Generate signed consensus messages of different types and views.
let key: validator::SecretKey = rng.gen();
let gen = |f: &mut dyn FnMut() -> M| {
let mut x: Vec<_> = (0..5).map(|_| key.sign_msg(f())).collect();
x.sort_by_key(|m| m.msg.view().number);
x
};
// We keep them sorted by type and view, so that it is easy to
// compute the expected state of the pool after insertions.
let msgs = [
gen(&mut || M::ReplicaPrepare(rng.gen())),
gen(&mut || M::ReplicaCommit(rng.gen())),
gen(&mut || M::LeaderPrepare(rng.gen())),
gen(&mut || M::LeaderCommit(rng.gen())),
];

// Insert messages at random.
let mut want = vec![None; msgs.len()];
for _ in 0..30 {
// Select a random message from `msgs` and insert it.
// Recompute the expected state.
let i = rng.gen_range(0..msgs.len());
let j = rng.gen_range(0..msgs[i].len());
want[i] = Some(want[i].unwrap_or(0).max(j));
pool.send(Arc::new(io::ConsensusInputMessage {
message: msgs[i][j].clone(),
recipient: io::Target::Broadcast,
}));
// Here we compare the internal state of the pool to the expected state.
// Note that we compare sets of crypto hashes of messages, because the messages themselves do not
// implement Hash trait. As a result the error message won't be very helpful.
// If that's problematic, we can either make all the values implement Hash/PartialOrd.
let want: HashSet<_> = want
.iter()
.enumerate()
.filter_map(|(i, j)| j.map(|j| msgs[i][j].msg.clone().insert().hash()))
.collect();
let mut recv = pool.subscribe();
let mut got = HashSet::new();
for _ in 0..want.len() {
got.insert(
recv.recv(ctx)
.await
.unwrap()
.message
.msg
.clone()
.insert()
.hash(),
);
}
assert_eq!(got, want);
}
}

#[tokio::test]
async fn test_msg_pool_recv() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let mut msgs: Vec<io::ConsensusInputMessage> = (0..20).map(|_| rng.gen()).collect();
msgs.sort_by_key(|m| m.message.msg.view().number);

let pool = MsgPool::new();
let mut recv = pool.subscribe();
for m in msgs {
let m = Arc::new(m);
pool.send(m.clone());
assert_eq!(m, recv.recv(ctx).await.unwrap());
}
}

#[tokio::test]
async fn test_one_connection_per_validator() {
Expand Down Expand Up @@ -261,22 +342,25 @@ async fn test_retransmission() {
s.spawn_bg(runner.run(ctx));

// Spawn the first node.
let (node0,runner) = testonly::Instance::new(cfgs[0].clone(), store.clone());
let (node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone());
s.spawn_bg(runner.run(ctx));

// Make first node broadcast a message.
let want: validator::Signed<validator::ConsensusMsg> = rng.gen();
node0.pipe.send(io::ConsensusInputMessage {
message: want.clone(),
recipient: io::Target::Broadcast,
}.into());
node0.pipe.send(
io::ConsensusInputMessage {
message: want.clone(),
recipient: io::Target::Broadcast,
}
.into(),
);

// Spawn the second node multiple times.
// Each time the node should reconnect and re-receive the broadcasted consensus message.
for i in 0..2 {
tracing::info!("iteration {i}");
scope::run!(ctx, |ctx,s| async {
let (mut node1,runner) = testonly::Instance::new(cfgs[1].clone(), store.clone());
scope::run!(ctx, |ctx, s| async {
let (mut node1, runner) = testonly::Instance::new(cfgs[1].clone(), store.clone());
s.spawn_bg(runner.run(ctx));
loop {
if let io::OutputMessage::Consensus(got) = node1.pipe.recv(ctx).await.unwrap() {
Expand All @@ -286,8 +370,12 @@ async fn test_retransmission() {
}
}
Ok(())
}).await.unwrap();
})
.await
.unwrap();
}
Ok(())
}).await.unwrap();
})
.await
.unwrap();
}
2 changes: 1 addition & 1 deletion node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Network {
.as_ref()
.context("not a validator node")?
.msg_pool
.send(message);
.send(Arc::new(message));
}
io::InputMessage::SyncBlocks(io::SyncBlocksInputMessage::GetBlock {
recipient,
Expand Down
28 changes: 26 additions & 2 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
//! Testonly utilities.
#![allow(dead_code)]
use crate::{Config, GossipConfig, Network, RpcConfig, Runner};
use rand::Rng;
use crate::{
io::{ConsensusInputMessage, Target},
Config, GossipConfig, Network, RpcConfig, Runner,
};
use rand::{
distributions::{Distribution, Standard},
Rng,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -11,6 +17,24 @@ use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe;

impl Distribution<Target> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Target {
match rng.gen_range(0..2) {
0 => Target::Broadcast,
_ => Target::Validator(rng.gen()),
}
}
}

impl Distribution<ConsensusInputMessage> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> ConsensusInputMessage {
ConsensusInputMessage {
message: rng.gen(),
recipient: rng.gen(),
}
}
}

/// Synchronously forwards data from one stream to another.
pub(crate) async fn forward(
ctx: &ctx::Ctx,
Expand Down
2 changes: 1 addition & 1 deletion node/libs/roles/src/validator/messages/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Variant<Msg> for NetAddress {
}

/// Hash of a message.
#[derive(Clone, Copy, PartialEq, Eq)]
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct MsgHash(pub(crate) keccak256::Keccak256);

impl ByteFmt for MsgHash {
Expand Down

0 comments on commit c8d06ef

Please sign in to comment.