Skip to content

Commit

Permalink
Made view timeout computation saturating. (#88)
Browse files Browse the repository at this point in the history
The current version was overflowing quite fast. It might have been the
cause why consensus component is broken on stage right now. Additionally
I've added some more debugging info and view_number metric. Unrelated:
for consistency, I've hardcoded the secret key constructor for bls to
use good entropy source.
  • Loading branch information
pompon0 authored Apr 9, 2024
1 parent f04a3bf commit d17c018
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 38 deletions.
3 changes: 3 additions & 0 deletions node/actors/bft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub(crate) struct ConsensusMetrics {
pub(crate) leader_processing_latency: Family<ProcessingLatencyLabels, Histogram<Duration>>,
/// Number of the last finalized block observed by the node.
pub(crate) finalized_block_number: Gauge<u64>,
/// Number of the current view of the replica.
#[metrics(unit = Unit::Seconds)]
pub(crate) replica_view_number: Gauge<u64>,
}

/// Global instance of [`ConsensusMetrics`].
Expand Down
6 changes: 4 additions & 2 deletions node/actors/bft/src/replica/new_view.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::StateMachine;
use crate::metrics;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
Expand All @@ -8,10 +9,11 @@ impl StateMachine {
/// This blocking method is used whenever we start a new view.
#[instrument(level = "trace", err)]
pub(crate) async fn start_new_view(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
tracing::info!("Starting view {}", self.view.next().0);

// Update the state machine.
self.view = self.view.next();
tracing::info!("Starting view {}", self.view);
metrics::METRICS.replica_view_number.set(self.view.0);

self.phase = validator::Phase::Prepare;
if let Some(qc) = self.high_qc.as_ref() {
// Clear the block cache.
Expand Down
15 changes: 14 additions & 1 deletion node/actors/bft/src/replica/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use zksync_consensus_roles::validator;
impl StateMachine {
/// The base duration of the timeout.
pub(crate) const BASE_DURATION: time::Duration = time::Duration::milliseconds(2000);
/// Max duration of the timeout.
/// Consensus is unusable with this range of timeout anyway,
/// however to make debugging easier we bound it to a specific value.
pub(crate) const MAX_DURATION: time::Duration = time::Duration::seconds(1000000);

/// Resets the timer. On every timeout we double the duration, starting from a given base duration.
/// This is a simple exponential backoff.
Expand All @@ -16,7 +20,16 @@ impl StateMachine {
Some(qc) => qc.view().number.next(),
None => validator::ViewNumber(0),
};
let timeout = Self::BASE_DURATION * 2u32.pow((self.view.0 - final_view.0) as u32);
let f = self
.view
.0
.saturating_sub(final_view.0)
.try_into()
.unwrap_or(u32::MAX);
let f = 2u64.saturating_pow(f).try_into().unwrap_or(i32::MAX);
let timeout = Self::BASE_DURATION
.saturating_mul(f)
.min(Self::MAX_DURATION);

metrics::METRICS.replica_view_timeout.set_latency(timeout);
self.timeout_deadline = time::Deadline::Finite(ctx.now() + timeout);
Expand Down
25 changes: 20 additions & 5 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! Consensus network is a full graph of connections between all validators.
//! BFT consensus messages are exchanged over this network.
use crate::rpc::Rpc as _;
use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc};
use anyhow::Context as _;
use rand::seq::SliceRandom;
use std::{collections::HashSet, sync::Arc};
use tracing::Instrument as _;
use zksync_concurrency::{ctx, oneshot, scope, sync, time};
use zksync_consensus_roles::validator;
use zksync_protobuf::kB;
Expand Down Expand Up @@ -89,7 +91,11 @@ impl Network {
for (peer, conn) in &outbound {
s.spawn(async {
if let Err(err) = conn.consensus.call(ctx, &req, RESP_MAX_SIZE).await {
tracing::info!("send({:?},<ConsensusMsg>): {err:#}", &*peer);
tracing::info!(
"send({:?},{}): {err:#}",
&*peer,
rpc::consensus::Rpc::submethod(&req)
);
}
Ok(())
});
Expand All @@ -107,12 +113,14 @@ impl Network {
msg: validator::Signed<validator::ConsensusMsg>,
) -> anyhow::Result<()> {
let outbound = self.outbound.current();
let req = rpc::consensus::Req(msg);
outbound
.get(key)
.context("not an active validator")?
.context("not reachable")?
.consensus
.call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE)
.await?;
.call(ctx, &req, RESP_MAX_SIZE)
.await
.with_context(|| rpc::consensus::Rpc::submethod(&req))?;
Ok(())
}

Expand All @@ -126,6 +134,7 @@ impl Network {
let peer =
handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?;
self.inbound.insert(peer.clone(), ()).await?;
tracing::info!("inbound connection from {peer:?}");
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
Expand Down Expand Up @@ -166,6 +175,7 @@ impl Network {
consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate),
});
self.outbound.insert(peer.clone(), conn.clone()).await?;
tracing::info!("outbound connection to {peer:?}");
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
Expand Down Expand Up @@ -221,6 +231,7 @@ impl Network {
format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr)
})?;
self.run_outbound_stream(ctx, &self.key.public(), addr)
.instrument(tracing::info_span!("{addr}"))
.await
}

Expand Down Expand Up @@ -250,7 +261,11 @@ impl Network {
addr = new.get(peer).map(|x| x.msg.addr);
}
let Some(addr) = addr else { continue };
if let Err(err) = self.run_outbound_stream(ctx, peer, addr).await {
if let Err(err) = self
.run_outbound_stream(ctx, peer, addr)
.instrument(tracing::info_span!("{addr}"))
.await
{
tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}");
}
}
Expand Down
1 change: 0 additions & 1 deletion node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::*;
use crate::{io, metrics, preface, rpc, testonly};
use assert_matches::assert_matches;
use rand::Rng;
use tracing::Instrument as _;
use zksync_concurrency::{ctx, net, scope, testonly::abort_on_panic};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::testonly::new_store;
Expand Down
7 changes: 6 additions & 1 deletion node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Network actor maintaining a pool of outbound and inbound connections to other nodes.
use anyhow::Context as _;
use std::sync::Arc;
use tracing::Instrument as _;
use zksync_concurrency::{ctx, ctx::channel, limiter, scope, time};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe::ActorPipe;
Expand Down Expand Up @@ -172,7 +173,10 @@ impl Runner {
.await?
.context("accept()")?;
s.spawn(async {
// This is a syscall which should always succeed on a correctly opened socket.
let addr = stream.peer_addr().context("peer_addr()")?;
let res = async {
tracing::info!("new inbound TCP connection from");
let (stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
Expand All @@ -194,9 +198,10 @@ impl Runner {
}
anyhow::Ok(())
}
.instrument(tracing::info_span!("{addr}"))
.await;
if let Err(err) = res {
tracing::info!("{err:#}");
tracing::info!("{addr}: {err:#}");
}
Ok(())
});
Expand Down
7 changes: 7 additions & 0 deletions node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ impl MeteredStream {
}
}

impl std::ops::Deref for MeteredStream {
type Target = net::tcp::Stream;
fn deref(&self) -> &Self::Target {
&self.stream
}
}

impl io::AsyncRead for MeteredStream {
#[inline(always)]
fn poll_read(
Expand Down
7 changes: 7 additions & 0 deletions node/actors/network/src/noise/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ pub(crate) struct Stream<S = MeteredStream> {
write_buf: Box<Buffer>,
}

impl<S> std::ops::Deref for Stream<S> {
type Target = S;
fn deref(&self) -> &S {
&self.inner
}
}

impl<S> Stream<S>
where
S: io::AsyncRead + io::AsyncWrite + Unpin,
Expand Down
13 changes: 6 additions & 7 deletions node/libs/crypto/src/bls12_381/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::ByteFmt;
use anyhow::{anyhow, bail};
use blst::{min_pk as bls, BLST_ERROR};
use rand::Rng as _;
use std::collections::BTreeMap;

#[cfg(test)]
Expand All @@ -25,23 +26,21 @@ pub const INFINITY_PUBLIC_KEY: [u8; PUBLIC_KEY_BYTES_LEN] = [
pub struct SecretKey(bls::SecretKey);

impl SecretKey {
/// Generates a secret key from provided key material
pub fn generate(key_material: [u8; 32]) -> Self {
/// Generates a secret key from a cryptographically-secure entropy source.
pub fn generate() -> Self {
// This unwrap is safe as the blst library method will only error if provided less than 32 bytes of key material
Self(bls::SecretKey::key_gen_v4_5(&key_material, &[], &[]).unwrap())
Self(bls::SecretKey::key_gen_v4_5(&rand::rngs::OsRng.gen::<[u8; 32]>(), &[], &[]).unwrap())
}

/// Produces a signature using this [`SecretKey`]
pub fn sign(&self, msg: &[u8]) -> Signature {
let signature = self.0.sign(msg, &[], &[]);
Signature(signature)
Signature(self.0.sign(msg, &[], &[]))
}

/// Gets the corresponding [`PublicKey`] for this [`SecretKey`]
#[inline]
pub fn public(&self) -> PublicKey {
let public_key = self.0.sk_to_pk();
PublicKey(public_key)
PublicKey(self.0.sk_to_pk())
}
}

Expand Down
4 changes: 2 additions & 2 deletions node/libs/crypto/src/bls12_381/testonly.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Random key generation, intended for use in testing
use super::{AggregateSignature, PublicKey, SecretKey, Signature};
use super::{bls, AggregateSignature, PublicKey, SecretKey, Signature};
use rand::{distributions::Standard, prelude::Distribution, Rng};

/// Generates a random SecretKey. This is meant for testing purposes.
impl Distribution<SecretKey> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> SecretKey {
SecretKey::generate(rng.gen())
SecretKey(bls::SecretKey::key_gen_v4_5(&rng.gen::<[u8; 32]>(), &[], &[]).unwrap())
}
}

Expand Down
25 changes: 7 additions & 18 deletions node/libs/crypto/src/bls12_381/tests.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use super::*;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::iter::repeat_with;

// Test signing and verifying a random message
#[test]
fn signature_smoke() {
let mut rng = StdRng::seed_from_u64(29483920);

let sk = SecretKey::generate(rng.gen());
let pk = sk.public();
let sk: SecretKey = rng.gen();
let msg: [u8; 32] = rng.gen();

let sig = sk.sign(&msg);

sig.verify(&msg, &pk).unwrap()
sig.verify(&msg, &sk.public()).unwrap()
}

#[test]
Expand All @@ -27,14 +23,11 @@ fn infinity_public_key_failure() {
fn signature_failure_smoke() {
let mut rng = StdRng::seed_from_u64(29483920);

let sk1 = SecretKey::generate(rng.gen());
let sk2 = SecretKey::generate(rng.gen());
let pk2 = sk2.public();
let sk1: SecretKey = rng.gen();
let sk2: SecretKey = rng.gen();
let msg: [u8; 32] = rng.gen();

let sig = sk1.sign(&msg);

assert!(sig.verify(&msg, &pk2).is_err())
assert!(sig.verify(&msg, &sk2.public()).is_err())
}

// Test signing and verifying a random message using aggregate signatures
Expand All @@ -43,9 +36,7 @@ fn aggregate_signature_smoke() {
let mut rng = StdRng::seed_from_u64(29483920);

// Use an arbitrary 5 keys for the smoke test
let sks: Vec<SecretKey> = repeat_with(|| SecretKey::generate(rng.gen()))
.take(5)
.collect();
let sks: Vec<SecretKey> = (0..5).map(|_| rng.gen()).collect();
let pks: Vec<PublicKey> = sks.iter().map(|k| k.public()).collect();
let msg: [u8; 32] = rng.gen();

Expand All @@ -61,9 +52,7 @@ fn aggregate_signature_failure_smoke() {
let mut rng = StdRng::seed_from_u64(29483920);

// Use an arbitrary 5 keys for the smoke test
let sks: Vec<SecretKey> = repeat_with(|| SecretKey::generate(rng.gen()))
.take(5)
.collect();
let sks: Vec<SecretKey> = (0..5).map(|_| rng.gen()).collect();
let pks: Vec<PublicKey> = sks.iter().map(|k| k.public()).collect();
let msg: [u8; 32] = rng.gen();

Expand Down
2 changes: 1 addition & 1 deletion node/libs/crypto/src/ed25519/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct SecretKey(ed::SigningKey);
impl SecretKey {
/// Generates a secret key from a cryptographically-secure entropy source.
pub fn generate() -> Self {
Self(ed::SigningKey::generate(&mut rand::rngs::OsRng {}))
Self(ed::SigningKey::generate(&mut rand::rngs::OsRng))
}

/// Signs a message.
Expand Down
6 changes: 6 additions & 0 deletions node/libs/roles/src/validator/messages/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ impl ViewNumber {
}
}

impl fmt::Display for ViewNumber {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, formatter)
}
}

/// An enum that represents the current phase of the consensus.
#[allow(missing_docs)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down

0 comments on commit d17c018

Please sign in to comment.