Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bft-465-twins-test
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 29, 2024
2 parents e558c31 + 410636c commit 7cc9dee
Show file tree
Hide file tree
Showing 23 changed files with 280 additions and 52 deletions.
3 changes: 1 addition & 2 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Handler of a ReplicaCommit message.
use std::collections::HashSet;

use super::StateMachine;
use crate::metrics;
use std::collections::HashSet;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
Expand Down
3 changes: 1 addition & 2 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Handler of a ReplicaPrepare message.
use std::collections::HashSet;

use super::StateMachine;
use std::collections::HashSet;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_roles::validator;
Expand Down
16 changes: 12 additions & 4 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,19 @@ impl StateMachine {
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(super::replica_prepare::Error::Internal(err)) => {
return Err(err);
}
Err(err) => {
tracing::warn!("process_replica_prepare: {err:#}");
match err {
super::replica_prepare::Error::Internal(e) => {
return Err(e);
}
super::replica_prepare::Error::Old { .. }
| super::replica_prepare::Error::NotLeaderInView => {
tracing::info!("process_replica_prepare: {err:#}");
}
_ => {
tracing::warn!("process_replica_prepare: {err:#}");
}
}
Err(())
}
};
Expand Down
17 changes: 15 additions & 2 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::io::{InputMessage, OutputMessage};
use anyhow::Context;
pub use config::Config;
use std::sync::Arc;
use zksync_concurrency::{ctx, scope};
use zksync_concurrency::{ctx, oneshot, scope};
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator;
use zksync_consensus_utils::pipe::ActorPipe;

Expand Down Expand Up @@ -93,7 +94,19 @@ impl Config {
let InputMessage::Network(req) = pipe.recv.recv(ctx).await?;
use validator::ConsensusMsg as M;
match &req.msg.msg {
M::ReplicaPrepare(_) | M::ReplicaCommit(_) => leader_send.send(req),
M::ReplicaPrepare(_) => {
// This is a hacky way to do a clone. This is necessary since we don't want to derive
// Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway.
let (ack, _) = oneshot::channel();
let new_req = ConsensusReq {
msg: req.msg.clone(),
ack,
};

replica_send.send(new_req);
leader_send.send(req);
}
M::ReplicaCommit(_) => leader_send.send(req),
M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req),
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ impl StateMachine {
/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
/// It also updates the High QC in the replica state machine, if the received QC is
/// higher.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn save_block(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions node/actors/bft/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod block;
pub(crate) mod leader_commit;
pub(crate) mod leader_prepare;
mod new_view;
pub(crate) mod replica_prepare;
mod state_machine;
#[cfg(test)]
mod tests;
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/replica/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl StateMachine {
// Backup our state.
self.backup_state(ctx).await.wrap("backup_state()")?;

// Send the replica message to the next leader.
// Send the replica message.
let output_message = ConsensusInputMessage {
message: self
.config
Expand All @@ -39,7 +39,7 @@ impl StateMachine {
high_qc: self.high_qc.clone(),
},
)),
recipient: Target::Validator(self.config.genesis().view_leader(self.view)),
recipient: Target::Broadcast,
};
self.outbound_pipe.send(output_message.into());

Expand Down
105 changes: 105 additions & 0 deletions node/actors/bft/src/replica/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//! Handler of a ReplicaPrepare message.
use super::StateMachine;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_roles::validator;

/// Errors that can occur when processing a "replica prepare" message.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Message signer isn't part of the validator set.
#[error("Message signer isn't part of the validator set (signer: {signer:?})")]
NonValidatorSigner {
/// Signer of the message.
signer: validator::PublicKey,
},
/// Past view or phase.
#[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
Old {
/// Current view.
current_view: validator::ViewNumber,
/// Current phase.
current_phase: validator::Phase,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] anyhow::Error),
/// Invalid message.
#[error(transparent)]
InvalidMessage(validator::ReplicaPrepareVerifyError),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
}

impl Wrap for Error {
fn with_wrap<C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(
self,
f: F,
) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.with_wrap(f)),
err => err,
}
}
}

impl StateMachine {
#[instrument(level = "trace", skip(self), ret)]
pub(crate) async fn process_replica_prepare(
&mut self,
ctx: &ctx::Ctx,
signed_message: validator::Signed<validator::ReplicaPrepare>,
) -> Result<(), Error> {
// ----------- Checking origin of the message --------------

// Unwrap message.
let message = signed_message.msg.clone();
let author = &signed_message.key;

// Check that the message signer is in the validator set.
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone(),
});
}

// We only accept this type of message from the future.
if message.view.number <= self.view {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
});
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

// Extract the QC and verify it.
let Some(high_qc) = message.high_qc else {
return Ok(());
};

high_qc.verify(self.config.genesis()).map_err(|err| {
Error::InvalidMessage(validator::ReplicaPrepareVerifyError::HighQC(err))
})?;

// ----------- All checks finished. Now we process the message. --------------

let qc_view = high_qc.view().number;

// Try to create a finalized block with this CommitQC and our block proposal cache.
// It will also update our high QC, if necessary.
self.save_block(ctx, &high_qc).await.wrap("save_block()")?;

// Skip to a new view, if necessary.
if qc_view >= self.view {
self.view = qc_view;
self.start_new_view(ctx).await.wrap("start_new_view()")?;
}

Ok(())
}
}
54 changes: 48 additions & 6 deletions node/actors/bft/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,51 @@ impl StateMachine {

let now = ctx.now();
let label = match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) => {
let res = match self
.process_replica_prepare(ctx, req.msg.cast().unwrap())
.await
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(err) => {
match err {
super::replica_prepare::Error::Internal(e) => {
return Err(e);
}
super::replica_prepare::Error::Old { .. } => {
tracing::info!("process_replica_prepare: {err:#}");
}
_ => {
tracing::warn!("process_replica_prepare: {err:#}");
}
}
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
ConsensusMsg::LeaderPrepare(_) => {
let res = match self
.process_leader_prepare(ctx, req.msg.cast().unwrap())
.await
.wrap("process_leader_prepare()")
{
Err(super::leader_prepare::Error::Internal(err)) => return Err(err),
Ok(()) => Ok(()),
Err(err) => {
tracing::warn!("process_leader_prepare(): {err:#}");
match err {
super::leader_prepare::Error::Internal(e) => {
return Err(e);
}
super::leader_prepare::Error::Old { .. } => {
tracing::info!("process_leader_prepare: {err:#}");
}
_ => {
tracing::warn!("process_leader_prepare: {err:#}");
}
}
Err(())
}
Ok(()) => Ok(()),
};
metrics::ConsensusMsgLabel::LeaderPrepare.with_result(&res)
}
Expand All @@ -127,12 +160,21 @@ impl StateMachine {
.await
.wrap("process_leader_commit()")
{
Err(super::leader_commit::Error::Internal(err)) => return Err(err),
Ok(()) => Ok(()),
Err(err) => {
tracing::warn!("process_leader_commit(): {err:#}");
match err {
super::leader_commit::Error::Internal(e) => {
return Err(e);
}
super::leader_commit::Error::Old { .. } => {
tracing::info!("process_leader_commit: {err:#}");
}
_ => {
tracing::warn!("process_leader_commit: {err:#}");
}
}
Err(())
}
Ok(()) => Ok(()),
};
metrics::ConsensusMsgLabel::LeaderCommit.with_result(&res)
}
Expand Down
3 changes: 1 addition & 2 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to
//! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip
//! network graph (minimize its diameter, increase connectedness).
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config};
use anyhow::Context as _;
use im::HashMap;
Expand All @@ -21,8 +22,6 @@ use zksync_concurrency::{ctx, ctx::channel, scope, sync};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::BlockStore;

use self::batch_votes::BatchVotesWatch;

mod batch_votes;
mod fetch;
mod handshake;
Expand Down
3 changes: 1 addition & 2 deletions node/actors/network/src/rpc/push_batch_votes.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! Defines RPC for passing consensus messages.
use std::sync::Arc;

use crate::{mux, proto::gossip as proto};
use anyhow::Context as _;
use std::sync::Arc;
use zksync_consensus_roles::attester::{self, Batch};
use zksync_protobuf::ProtoFmt;

Expand Down
9 changes: 4 additions & 5 deletions node/libs/roles/src/attester/conv.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::{
AggregateSignature, Batch, BatchNumber, BatchQC, Msg, MsgHash, PublicKey, Signature, Signed,
Signers, WeightedAttester,
};
use crate::proto::attester::{self as proto};
use anyhow::Context as _;
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_utils::enum_util::Variant;
use zksync_protobuf::{read_required, required, ProtoFmt};

use super::{
AggregateSignature, Batch, BatchNumber, BatchQC, Msg, MsgHash, PublicKey, Signature, Signed,
Signers, WeightedAttester,
};

impl ProtoFmt for Batch {
type Proto = proto::Batch;
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
Expand Down
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/keys/aggregate_signature.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::attester::{Batch, MsgHash};

use super::{PublicKey, Signature};
use crate::attester::{Batch, MsgHash};
use std::fmt;
use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt};
use zksync_consensus_utils::enum_util::Variant;
Expand Down
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/keys/signature.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::attester::{Msg, MsgHash};

use super::PublicKey;
use crate::attester::{Msg, MsgHash};
use std::fmt;
use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt};

Expand Down
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/messages/batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{attester, validator::Genesis};

use super::{Signed, Signers};
use crate::{attester, validator::Genesis};
use anyhow::{ensure, Context as _};

#[derive(Clone, Debug, PartialEq, Eq, Hash, Default, PartialOrd)]
Expand Down
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/messages/msg.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::{collections::BTreeMap, fmt};

use crate::{attester, validator};
use anyhow::Context as _;
use bit_vec::BitVec;
use std::{collections::BTreeMap, fmt};
use zksync_consensus_crypto::{keccak256, ByteFmt, Text, TextFmt};
use zksync_consensus_utils::enum_util::{BadVariantError, Variant};

Expand Down
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ mod keys;
mod messages;
mod testonly;

pub use self::keys::*;
pub use self::messages::*;
pub use self::{keys::*, messages::*};
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::validator::testonly::Setup;

use super::*;
use crate::validator::testonly::Setup;
use assert_matches::assert_matches;
use rand::Rng;
use zksync_concurrency::ctx;
Expand Down
Loading

0 comments on commit 7cc9dee

Please sign in to comment.