From 7efa1054be04dcf7582906e46512800179eb6aa4 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 31 Oct 2024 16:41:07 -0700 Subject: [PATCH] [fastpath] supporting locking transactions and returning effects without signatures --- consensus/core/src/block_verifier.rs | 8 +- consensus/core/src/linearizer.rs | 4 +- consensus/core/src/transaction.rs | 17 +- .../src/embedded_reconfig_observer.rs | 13 +- .../src/fullnode_reconfig_observer.rs | 11 +- crates/sui-benchmark/src/lib.rs | 322 +++--------------- .../src/workloads/adversarial.rs | 2 +- .../benches/batch_verification_bench.rs | 3 +- crates/sui-core/src/authority.rs | 55 ++- .../authority/authority_per_epoch_store.rs | 56 +-- .../sui-core/src/authority/authority_store.rs | 6 +- crates/sui-core/src/authority_server.rs | 4 +- crates/sui-core/src/consensus_adapter.rs | 20 +- crates/sui-core/src/consensus_handler.rs | 41 ++- crates/sui-core/src/consensus_validator.rs | 111 ++++-- crates/sui-core/src/execution_cache.rs | 3 +- .../src/execution_cache/object_locks.rs | 61 +++- .../src/execution_cache/passthrough_cache.rs | 10 +- .../src/execution_cache/proxy_cache.rs | 6 +- .../unit_tests/writeback_cache_tests.rs | 28 +- .../src/execution_cache/writeback_cache.rs | 11 +- crates/sui-core/src/mock_consensus.rs | 18 +- .../sui-core/src/post_consensus_tx_reorder.rs | 9 +- crates/sui-core/src/quorum_driver/mod.rs | 35 +- .../src/quorum_driver/reconfig_observer.rs | 20 +- crates/sui-core/src/quorum_driver/tests.rs | 8 +- crates/sui-core/src/signature_verifier.rs | 22 +- .../sui-core/src/transaction_orchestrator.rs | 13 +- .../unit_tests/batch_verification_tests.rs | 24 +- .../src/unit_tests/mysticeti_manager_tests.rs | 2 +- crates/sui-node/src/lib.rs | 2 +- crates/sui-rest-api/proto/rest.proto | 1 + .../src/proto/generated/sui.rest.rs | 2 + crates/sui-rest-api/src/proto/mod.rs | 23 +- .../src/transactions/execution.rs | 10 + crates/sui-types/src/messages_consensus.rs | 10 +- crates/sui-types/src/quorum_driver_types.rs | 33 +- 37 files changed, 510 insertions(+), 514 deletions(-) diff --git a/consensus/core/src/block_verifier.rs b/consensus/core/src/block_verifier.rs index f794c96b7007d..9ca9fb386f812 100644 --- a/consensus/core/src/block_verifier.rs +++ b/consensus/core/src/block_verifier.rs @@ -264,6 +264,7 @@ mod test { struct TxnSizeVerifier {} + #[async_trait::async_trait] impl TransactionVerifier for TxnSizeVerifier { // Fails verification if any transaction is < 4 bytes. fn verify_batch(&self, transactions: &[&[u8]]) -> Result<(), ValidationError> { @@ -278,8 +279,11 @@ mod test { Ok(()) } - fn vote_batch(&self, _batch: &[&[u8]]) -> Vec { - vec![] + async fn verify_and_vote_batch( + &self, + _batch: &[&[u8]], + ) -> Result, ValidationError> { + Ok(vec![]) } } diff --git a/consensus/core/src/linearizer.rs b/consensus/core/src/linearizer.rs index dba030d05271e..ec87c2b41f453 100644 --- a/consensus/core/src/linearizer.rs +++ b/consensus/core/src/linearizer.rs @@ -540,7 +540,9 @@ mod tests { let num_authorities = 4; let (mut context, _keys) = Context::new_for_test(num_authorities); - context.protocol_config.set_gc_depth_for_testing(gc_depth); + context + .protocol_config + .set_consensus_gc_depth_for_testing(gc_depth); let context = Arc::new(context); let dag_state = Arc::new(RwLock::new(DagState::new( context.clone(), diff --git a/consensus/core/src/transaction.rs b/consensus/core/src/transaction.rs index fcf9903bbbd2d..4a04d7d28b429 100644 --- a/consensus/core/src/transaction.rs +++ b/consensus/core/src/transaction.rs @@ -210,6 +210,7 @@ impl TransactionClient { /// `TransactionVerifier` implementation is supplied by Sui to validate transactions in a block, /// before acceptance of the block. +#[async_trait::async_trait] pub trait TransactionVerifier: Send + Sync + 'static { /// Determines if this batch of transactions is valid. /// Fails if any one of the transactions is invalid. @@ -219,7 +220,10 @@ pub trait TransactionVerifier: Send + Sync + 'static { /// Currently only uncertified user transactions can be rejected. /// The rest of transactions are implicitly voted to accept. // TODO: add rejection reasons, add VoteError and wrap the return in Result<>. - fn vote_batch(&self, batch: &[&[u8]]) -> Vec; + async fn verify_and_vote_batch( + &self, + batch: &[&[u8]], + ) -> Result, ValidationError>; } #[derive(Debug, Error)] @@ -229,16 +233,21 @@ pub enum ValidationError { } /// `NoopTransactionVerifier` accepts all transactions. -#[allow(unused)] +#[cfg(test)] pub(crate) struct NoopTransactionVerifier; +#[cfg(test)] +#[async_trait::async_trait] impl TransactionVerifier for NoopTransactionVerifier { fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> { Ok(()) } - fn vote_batch(&self, _batch: &[&[u8]]) -> Vec { - vec![] + async fn verify_and_vote_batch( + &self, + _batch: &[&[u8]], + ) -> Result, ValidationError> { + Ok(vec![]) } } diff --git a/crates/sui-benchmark/src/embedded_reconfig_observer.rs b/crates/sui-benchmark/src/embedded_reconfig_observer.rs index 42df82d81ce3b..6ea06a0961964 100644 --- a/crates/sui-benchmark/src/embedded_reconfig_observer.rs +++ b/crates/sui-benchmark/src/embedded_reconfig_observer.rs @@ -5,9 +5,9 @@ use anyhow::anyhow; use async_trait::async_trait; use std::sync::Arc; use sui_core::authority_aggregator::AuthorityAggregator; +use sui_core::quorum_driver::AuthorityAggregatorUpdatable; use sui_core::{ - authority_client::NetworkAuthorityClient, - quorum_driver::{reconfig_observer::ReconfigObserver, QuorumDriver}, + authority_client::NetworkAuthorityClient, quorum_driver::reconfig_observer::ReconfigObserver, }; use sui_network::default_mysten_network_config; use sui_types::sui_system_state::SuiSystemStateTrait; @@ -79,12 +79,15 @@ impl ReconfigObserver for EmbeddedReconfigObserver { Box::new(self.clone()) } - async fn run(&mut self, quorum_driver: Arc>) { + async fn run( + &mut self, + updatable: Arc>, + ) { loop { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - let auth_agg = quorum_driver.authority_aggregator().load(); + let auth_agg = updatable.authority_aggregator(); match self.get_committee(auth_agg.clone()).await { - Ok(new_auth_agg) => quorum_driver.update_validators(new_auth_agg).await, + Ok(new_auth_agg) => updatable.update_authority_aggregator(new_auth_agg), Err(err) => { error!( "Failed to recreate authority aggregator with committee: {}", diff --git a/crates/sui-benchmark/src/fullnode_reconfig_observer.rs b/crates/sui-benchmark/src/fullnode_reconfig_observer.rs index bfd4c83b2aa12..14f2687fd82be 100644 --- a/crates/sui-benchmark/src/fullnode_reconfig_observer.rs +++ b/crates/sui-benchmark/src/fullnode_reconfig_observer.rs @@ -7,7 +7,7 @@ use sui_core::{ authority_aggregator::{AuthAggMetrics, AuthorityAggregator}, authority_client::NetworkAuthorityClient, epoch::committee_store::CommitteeStore, - quorum_driver::{reconfig_observer::ReconfigObserver, QuorumDriver}, + quorum_driver::{reconfig_observer::ReconfigObserver, AuthorityAggregatorUpdatable}, safe_client::SafeClientMetricsBase, }; use sui_sdk::{SuiClient, SuiClientBuilder}; @@ -56,7 +56,10 @@ impl ReconfigObserver for FullNodeReconfigObserver { Box::new(self.clone()) } - async fn run(&mut self, quorum_driver: Arc>) { + async fn run( + &mut self, + updatable: Arc>, + ) { loop { tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; match self @@ -67,7 +70,7 @@ impl ReconfigObserver for FullNodeReconfigObserver { { Ok(sui_system_state) => { let epoch_id = sui_system_state.epoch; - if epoch_id > quorum_driver.current_epoch() { + if epoch_id > updatable.epoch() { debug!(epoch_id, "Got SuiSystemState in newer epoch"); let new_committee = sui_system_state.get_sui_committee_for_benchmarking(); let _ = self @@ -80,7 +83,7 @@ impl ReconfigObserver for FullNodeReconfigObserver { self.auth_agg_metrics.clone(), Arc::new(HashMap::new()), ); - quorum_driver.update_validators(Arc::new(auth_agg)).await + updatable.update_authority_aggregator(Arc::new(auth_agg)); } else { trace!( epoch_id, diff --git a/crates/sui-benchmark/src/lib.rs b/crates/sui-benchmark/src/lib.rs index 67f9d6a57db27..81ae3877f262c 100644 --- a/crates/sui-benchmark/src/lib.rs +++ b/crates/sui-benchmark/src/lib.rs @@ -4,22 +4,16 @@ use anyhow::bail; use async_trait::async_trait; use embedded_reconfig_observer::EmbeddedReconfigObserver; use fullnode_reconfig_observer::FullNodeReconfigObserver; -use futures::{stream::FuturesUnordered, StreamExt}; -use mysten_metrics::GaugeGuard; use prometheus::Registry; use rand::Rng; -use roaring::RoaringBitmap; -use std::{ - collections::BTreeMap, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use sui_config::genesis::Genesis; use sui_core::{ authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder}, - authority_client::{AuthorityAPI, NetworkAuthorityClient}, + authority_client::NetworkAuthorityClient, quorum_driver::{ - QuorumDriver, QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics, + reconfig_observer::ReconfigObserver, QuorumDriver, QuorumDriverHandler, + QuorumDriverHandlerBuilder, QuorumDriverMetrics, }, }; use sui_json_rpc_types::{ @@ -27,10 +21,11 @@ use sui_json_rpc_types::{ SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponseOptions, }; use sui_sdk::{SuiClient, SuiClientBuilder}; -use sui_types::base_types::ConciseableName; -use sui_types::committee::CommitteeTrait; -use sui_types::effects::{CertifiedTransactionEffects, TransactionEffectsAPI, TransactionEvents}; +use sui_types::effects::{TransactionEffectsAPI, TransactionEvents}; +use sui_types::gas::GasCostSummary; use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder; +use sui_types::quorum_driver_types::EffectsFinalityInfo; +use sui_types::quorum_driver_types::FinalizedEffects; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::transaction::Argument; use sui_types::transaction::CallArg; @@ -38,13 +33,8 @@ use sui_types::transaction::ObjectArg; use sui_types::{ base_types::ObjectID, committee::{Committee, EpochId}, - crypto::{ - AggregateAuthenticator, AggregateAuthoritySignature, AuthorityQuorumSignInfo, - AuthoritySignature, - }, - message_envelope::Envelope, object::Object, - transaction::{CertifiedTransaction, Transaction}, + transaction::Transaction, }; use sui_types::{base_types::ObjectRef, crypto::AuthorityStrongQuorumSignInfo, object::Owner}; use sui_types::{base_types::SequenceNumber, gas_coin::GasCoin}; @@ -52,12 +42,8 @@ use sui_types::{ base_types::{AuthorityName, SuiAddress}, sui_system_state::SuiSystemStateTrait, }; -use sui_types::{error::SuiError, gas::GasCostSummary}; -use tokio::{ - task::JoinSet, - time::{sleep, timeout}, -}; -use tracing::{error, info}; +use tokio::time::sleep; +use tracing::{error, info, warn}; pub mod bank; pub mod benchmark_setup; @@ -69,8 +55,6 @@ pub mod options; pub mod system_state_observer; pub mod util; pub mod workloads; -use futures::FutureExt; -use sui_types::messages_grpc::{HandleCertificateResponseV2, TransactionStatus}; use sui_types::quorum_driver_types::{QuorumDriverError, QuorumDriverResponse}; #[derive(Debug)] @@ -78,15 +62,15 @@ use sui_types::quorum_driver_types::{QuorumDriverError, QuorumDriverResponse}; /// responses from LocalValidatorAggregatorProxy and FullNodeProxy #[allow(clippy::large_enum_variant)] pub enum ExecutionEffects { - CertifiedTransactionEffects(CertifiedTransactionEffects, TransactionEvents), + FinalizedTransactionEffects(FinalizedEffects, TransactionEvents), SuiTransactionBlockEffects(SuiTransactionBlockEffects), } impl ExecutionEffects { pub fn mutated(&self) -> Vec<(ObjectRef, Owner)> { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - certified_effects.data().mutated().to_vec() + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => { + effects.data().mutated().to_vec() } ExecutionEffects::SuiTransactionBlockEffects(sui_tx_effects) => sui_tx_effects .mutated() @@ -98,9 +82,7 @@ impl ExecutionEffects { pub fn created(&self) -> Vec<(ObjectRef, Owner)> { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - certified_effects.data().created() - } + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => effects.data().created(), ExecutionEffects::SuiTransactionBlockEffects(sui_tx_effects) => sui_tx_effects .created() .iter() @@ -111,8 +93,8 @@ impl ExecutionEffects { pub fn deleted(&self) -> Vec { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - certified_effects.data().deleted().to_vec() + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => { + effects.data().deleted().to_vec() } ExecutionEffects::SuiTransactionBlockEffects(sui_tx_effects) => sui_tx_effects .deleted() @@ -124,8 +106,11 @@ impl ExecutionEffects { pub fn quorum_sig(&self) -> Option<&AuthorityStrongQuorumSignInfo> { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - Some(certified_effects.auth_sig()) + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => { + match &effects.finality_info { + EffectsFinalityInfo::Certified(sig) => Some(sig), + _ => None, + } } ExecutionEffects::SuiTransactionBlockEffects(_) => None, } @@ -133,8 +118,8 @@ impl ExecutionEffects { pub fn gas_object(&self) -> (ObjectRef, Owner) { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - certified_effects.data().gas_object() + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => { + effects.data().gas_object() } ExecutionEffects::SuiTransactionBlockEffects(sui_tx_effects) => { let refe = &sui_tx_effects.gas_object(); @@ -152,8 +137,8 @@ impl ExecutionEffects { pub fn is_ok(&self) -> bool { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - certified_effects.data().status().is_ok() + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => { + effects.data().status().is_ok() } ExecutionEffects::SuiTransactionBlockEffects(sui_tx_effects) => { sui_tx_effects.status().is_ok() @@ -163,8 +148,8 @@ impl ExecutionEffects { pub fn status(&self) -> String { match self { - ExecutionEffects::CertifiedTransactionEffects(certified_effects, ..) => { - format!("{:#?}", certified_effects.data().status()) + ExecutionEffects::FinalizedTransactionEffects(effects, ..) => { + format!("{:#?}", effects.data().status()) } ExecutionEffects::SuiTransactionBlockEffects(sui_tx_effects) => { format!("{:#?}", sui_tx_effects.status()) @@ -174,7 +159,7 @@ impl ExecutionEffects { pub fn gas_cost_summary(&self) -> GasCostSummary { match self { - crate::ExecutionEffects::CertifiedTransactionEffects(a, _) => { + crate::ExecutionEffects::FinalizedTransactionEffects(a, _) => { a.data().gas_cost_summary().clone() } crate::ExecutionEffects::SuiTransactionBlockEffects(b) => { @@ -224,10 +209,6 @@ pub trait ValidatorProxy { async fn execute_transaction_block(&self, tx: Transaction) -> anyhow::Result; - /// This function is similar to `execute_transaction` but does not check any validator's - /// signature. It should only be used for benchmarks. - async fn execute_bench_transaction(&self, tx: Transaction) -> anyhow::Result; - fn clone_committee(&self) -> Arc; fn get_current_epoch(&self) -> EpochId; @@ -244,7 +225,6 @@ pub struct LocalValidatorAggregatorProxy { qd: Arc>, committee: Committee, clients: BTreeMap, - requests: Mutex>, } impl LocalValidatorAggregatorProxy { @@ -276,11 +256,10 @@ impl LocalValidatorAggregatorProxy { committee: Committee, ) -> Self { let quorum_driver_metrics = Arc::new(QuorumDriverMetrics::new(registry)); - let qd_handler = (if let Some(reconfig_fullnode_rpc_url) = reconfig_fullnode_rpc_url { - let qd_handler_builder = QuorumDriverHandlerBuilder::new( - Arc::new(aggregator.clone()), - quorum_driver_metrics, - ); + let (aggregator, reconfig_observer): ( + Arc<_>, + Arc + Sync + Send>, + ) = if let Some(reconfig_fullnode_rpc_url) = reconfig_fullnode_rpc_url { info!( "Using FullNodeReconfigObserver: {:?}", reconfig_fullnode_rpc_url @@ -295,28 +274,27 @@ impl LocalValidatorAggregatorProxy { ) .await, ); - qd_handler_builder.with_reconfig_observer(reconfig_observer) + (Arc::new(aggregator), reconfig_observer) } else { info!("Using EmbeddedReconfigObserver"); - let observer = EmbeddedReconfigObserver::new(); + let reconfig_observer = Arc::new(EmbeddedReconfigObserver::new()); // Get the latest committee from config observer - let new_agg = observer + let aggregator = reconfig_observer .get_committee(Arc::new(aggregator)) .await .expect("Failed to get latest committee"); - let qd_handler_builder = - QuorumDriverHandlerBuilder::new(new_agg, quorum_driver_metrics); - qd_handler_builder.with_reconfig_observer(Arc::new(EmbeddedReconfigObserver::new())) - }) - .start(); - + (aggregator, reconfig_observer) + }; + let qd_handler_builder = + QuorumDriverHandlerBuilder::new(aggregator.clone(), quorum_driver_metrics.clone()) + .with_reconfig_observer(reconfig_observer.clone()); + let qd_handler = qd_handler_builder.start(); let qd = qd_handler.clone_quorum_driver(); Self { _qd_handler: qd_handler, qd, clients, committee, - requests: Mutex::new(JoinSet::new()), } } } @@ -346,9 +324,6 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy { } async fn execute_transaction_block(&self, tx: Transaction) -> anyhow::Result { - if std::env::var("BENCH_MODE").is_ok() { - return self.execute_bench_transaction(tx).await; - } let tx_digest = *tx.digest(); let mut retry_cnt = 0; while retry_cnt < 3 { @@ -372,8 +347,8 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy { events, .. } = resp; - return Ok(ExecutionEffects::CertifiedTransactionEffects( - effects_cert.into(), + return Ok(ExecutionEffects::FinalizedTransactionEffects( + FinalizedEffects::new_from_effects_cert(effects_cert.into()), events.unwrap_or_default(), )); } @@ -382,7 +357,7 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy { } Err(err) => { let delay = Duration::from_millis(rand::thread_rng().gen_range(100..1000)); - error!( + warn!( ?tx_digest, retry_cnt, "Transaction failed with err: {:?}. Sleeping for {:?} ...", @@ -397,208 +372,6 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy { bail!("Transaction {:?} failed for {retry_cnt} times", tx_digest); } - async fn execute_bench_transaction(&self, tx: Transaction) -> anyhow::Result { - // Store the epoch number; we read it from the votes and use it later to create the certificate. - let mut epoch = 0; - let auth_agg = self.qd.authority_aggregator().load(); - - // Send the transaction to all validators. - let tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions); - let mut futures = FuturesUnordered::new(); - for (name, client) in self.clients.iter() { - let fut = client - .handle_transaction(tx.clone(), None) - .map(|r| (r, *name)); - futures.push(fut); - } - auth_agg - .metrics - .inflight_transaction_requests - .add(futures.len() as i64); - - // TODO: This following aggregation will not work well at epoch boundary. - - // Listen to the replies from the first 2f+1 votes. - let mut total_stake = 0; - let mut votes = Vec::new(); - let mut certificate = None; - while let Some((response, name)) = futures.next().await { - auth_agg.metrics.inflight_transaction_requests.dec(); - match response { - Ok(response) => match response.status { - // If all goes well, the authority returns a vote. - TransactionStatus::Signed(signature) => { - epoch = signature.epoch; - total_stake += self.committee.weight(&signature.authority); - votes.push(signature); - } - // The transaction may be submitted again in case the certificate's submission failed. - TransactionStatus::Executed(cert, _effects, _) => { - tracing::warn!("Transaction already submitted: {tx:?}"); - if let Some(cert) = cert { - certificate = Some(CertifiedTransaction::new_from_data_and_sig( - tx.data().clone(), - cert, - )); - } - } - }, - // This typically happens when the validators are overloaded and the transaction is - // immediately rejected. - Err(e) => { - self.qd - .authority_aggregator() - .load() - .metrics - .process_tx_errors - .with_label_values(&[&name.concise().to_string(), e.as_ref()]) - .inc(); - tracing::warn!("Failed to submit transaction: {e}") - } - } - - if total_stake >= self.committee.quorum_threshold() { - break; - } - - if certificate.is_some() { - break; - } - } - - // Assemble a certificate from the validator's replies. - let certified_transaction: CertifiedTransaction = match certificate { - Some(x) => x, - None => { - let signatures: BTreeMap<_, _> = votes - .into_iter() - .map(|a| (a.authority, a.signature)) - .collect(); - let mut signers_map = RoaringBitmap::new(); - for pk in signatures.keys() { - signers_map.insert( - self.committee - .authority_index(pk) - .ok_or(SuiError::UnknownSigner { - signer: Some(pk.concise().to_string()), - index: None, - committee: Box::new(self.committee.clone()), - }) - .expect("Received signature from unknown validator"), - ); - } - let sigs: Vec = signatures.into_values().collect(); - - let quorum_signature = AuthorityQuorumSignInfo { - epoch, - // Note: This function simply aggregates signatures (it does not check that they - // are individually valid). - signature: AggregateAuthoritySignature::aggregate(&sigs) - .map_err(|e| SuiError::InvalidSignature { - error: e.to_string(), - }) - .expect("Validator returned invalid signature"), - signers_map, - }; - - Envelope::new_from_data_and_sig(tx.into_data(), quorum_signature) - } - }; - auth_agg - .metrics - .inflight_transaction_requests - .sub(futures.len() as i64); - drop(tx_guard); - - // Send the certificate to all validators. - let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates); - let mut futures = FuturesUnordered::new(); - total_stake = 0; - let mut transaction_effects = None; - let mut transaction_events = None; - for (name, client) in self.clients.iter() { - let client = client.clone(); - let certificate = certified_transaction.clone(); - let name = *name; - futures.push(async move { - client - .handle_certificate_v2(certificate, None) - .map(move |r| (r, name)) - .await - }); - } - auth_agg - .metrics - .inflight_certificate_requests - .add(futures.len() as i64); - - // Wait for the replies from a quorum of validators. - while let Some((response, name)) = futures.next().await { - auth_agg.metrics.inflight_certificate_requests.dec(); - match response { - // If all goes well, the validators reply with signed effects. - Ok(HandleCertificateResponseV2 { - signed_effects, - events, - fastpath_input_objects: _, // unused field - }) => { - let author = signed_effects.auth_sig().authority; - transaction_effects = Some(signed_effects.data().clone()); - transaction_events = Some(events); - total_stake += self.committee.weight(&author); - } - - // This typically happens when the validators are overloaded and the certificate is - // immediately rejected. - Err(e) => { - auth_agg - .metrics - .process_cert_errors - .with_label_values(&[&name.concise().to_string(), e.as_ref()]) - .inc(); - tracing::warn!("Failed to submit certificate: {e}") - } - } - - if total_stake >= self.committee.quorum_threshold() { - break; - } - } - - // Abort if we failed to submit the certificate to enough validators. This typically - // happens when the validators are overloaded and the requests timed out. - if transaction_effects.is_none() || total_stake < self.committee.quorum_threshold() { - bail!("Failed to submit certificate to quorum of validators"); - } - - // Wait for 10 more seconds on remaining requests asynchronously. - const WAIT_TIMEOUT: Duration = Duration::from_secs(10); - { - let auth_agg = auth_agg.clone(); - let mut requests = self.requests.lock().unwrap(); - requests.spawn(async move { - let _ = timeout(WAIT_TIMEOUT, async { - while futures.next().await.is_some() { - auth_agg.metrics.inflight_certificate_requests.dec(); - } - }) - .await; - auth_agg - .metrics - .inflight_certificate_requests - .sub(futures.len() as i64); - }); - } - - // Package the certificate and effects to return. - let signed_material = certified_transaction.auth_sig().clone(); - let effects = ExecutionEffects::CertifiedTransactionEffects( - Envelope::new_from_data_and_sig(transaction_effects.unwrap(), signed_material), - transaction_events.unwrap(), - ); - Ok(effects) - } - fn clone_committee(&self) -> Arc { self.qd.clone_committee() } @@ -615,7 +388,6 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy { qd, clients: self.clients.clone(), committee: self.committee.clone(), - requests: Mutex::new(JoinSet::new()), }) } @@ -758,10 +530,6 @@ impl ValidatorProxy for FullNodeProxy { bail!("Transaction {:?} failed for {retry_cnt} times", tx_digest); } - async fn execute_bench_transaction(&self, tx: Transaction) -> anyhow::Result { - self.execute_transaction_block(tx).await - } - fn clone_committee(&self) -> Arc { self.committee.clone() } diff --git a/crates/sui-benchmark/src/workloads/adversarial.rs b/crates/sui-benchmark/src/workloads/adversarial.rs index eeb82dd150aea..39746b89c31d5 100644 --- a/crates/sui-benchmark/src/workloads/adversarial.rs +++ b/crates/sui-benchmark/src/workloads/adversarial.rs @@ -163,7 +163,7 @@ impl Payload for AdversarialTestPayload { fn make_new_payload(&mut self, effects: &ExecutionEffects) { // Sometimes useful when figuring out why things failed let stat = match effects { - ExecutionEffects::CertifiedTransactionEffects(e, _) => e.data().status(), + ExecutionEffects::FinalizedTransactionEffects(e, _) => e.data().status(), ExecutionEffects::SuiTransactionBlockEffects(_) => unimplemented!("Not impl"), }; diff --git a/crates/sui-core/benches/batch_verification_bench.rs b/crates/sui-core/benches/batch_verification_bench.rs index 92e62c68212f3..6f06c01ea2377 100644 --- a/crates/sui-core/benches/batch_verification_bench.rs +++ b/crates/sui-core/benches/batch_verification_bench.rs @@ -3,6 +3,7 @@ use criterion::*; +use itertools::Itertools as _; use rand::prelude::*; use rand::seq::SliceRandom; @@ -135,7 +136,7 @@ fn batch_verification_bench(c: &mut Criterion) { certs.shuffle(&mut thread_rng()); batch_verify_certificates( &committee, - &certs, + &certs.iter().collect_vec(), Arc::new(VerifiedDigestCache::new_empty()), ); }) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 9c1d442298cf2..8a3788a8deabb 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -924,8 +924,9 @@ impl AuthorityState { async fn handle_transaction_impl( &self, transaction: VerifiedTransaction, + sign: bool, epoch_store: &Arc, - ) -> SuiResult { + ) -> SuiResult> { // Ensure that validator cannot reconfigure while we are signing the tx let _execution_lock = self.execution_lock_for_signing().await; @@ -934,19 +935,29 @@ impl AuthorityState { let owned_objects = checked_input_objects.inner().filter_owned_objects(); - let signed_transaction = VerifiedSignedTransaction::new( - epoch_store.epoch(), - transaction, - self.name, - &*self.secret, - ); + let tx_digest = *transaction.digest(); + let signed_transaction = if sign { + Some(VerifiedSignedTransaction::new( + epoch_store.epoch(), + transaction, + self.name, + &*self.secret, + )) + } else { + None + }; // Check and write locks, to signed transaction, into the database // The call to self.set_transaction_lock checks the lock is not conflicting, // and returns ConflictingTransaction error in case there is a lock on a different // existing transaction. self.get_cache_writer() - .acquire_transaction_locks(epoch_store, &owned_objects, signed_transaction.clone()) + .acquire_transaction_locks( + epoch_store, + &owned_objects, + tx_digest, + signed_transaction.clone(), + ) .await?; Ok(signed_transaction) @@ -973,9 +984,11 @@ impl AuthorityState { .start_timer(); self.metrics.tx_orders.inc(); - let signed = self.handle_transaction_impl(transaction, epoch_store).await; + let signed = self + .handle_transaction_impl(transaction, false, epoch_store) + .await; match signed { - Ok(s) => { + Ok(Some(s)) => { if self.is_validator(epoch_store) { if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer { let tx = s.clone(); @@ -991,6 +1004,7 @@ impl AuthorityState { status: TransactionStatus::Signed(s.into_inner().into_sig()), }) } + Ok(None) => panic!("handle_transaction_impl should return a signed transaction"), // It happens frequently that while we are checking the validity of the transaction, it // has just been executed. // In that case, we could still return Ok to avoid showing confusing errors. @@ -1034,10 +1048,14 @@ impl AuthorityState { return Err(SuiError::ValidatorHaltedAtEpochEnd); } - match self.handle_transaction_impl(transaction, epoch_store).await { - // TODO(fastpath): We don't actually need the signed transaction here but just call - // into this function to acquire locks. Consider refactoring to avoid the extra work. - Ok(_signed) => Ok(None), + match self + .handle_transaction_impl(transaction, false, epoch_store) + .await + { + Ok(Some(_)) => { + panic!("handle_transaction_impl should not return a signed transaction") + } + Ok(None) => Ok(None), // It happens frequently that while we are checking the validity of the transaction, it // has just been executed. // In that case, we could still return Ok to avoid showing confusing errors. @@ -2973,7 +2991,14 @@ impl AuthorityState { &self.transaction_manager } - /// Adds certificates to transaction manager for ordered execution. + /// Adds transactions / certificates to transaction manager for ordered execution. + pub fn enqueue_transactions_for_execution( + &self, + txns: Vec, + epoch_store: &Arc, + ) { + self.transaction_manager.enqueue(txns, epoch_store) + } pub fn enqueue_certificates_for_execution( &self, certs: Vec, diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index b49ae87c33881..27f2e9467e0f0 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -749,7 +749,7 @@ impl AuthorityEpochTables { pub fn write_transaction_locks( &self, - transaction: VerifiedSignedTransaction, + signed_transaction: Option, locks_to_write: impl Iterator, ) -> SuiResult { let mut batch = self.owned_object_locked_transactions.batch(); @@ -757,10 +757,15 @@ impl AuthorityEpochTables { &self.owned_object_locked_transactions, locks_to_write.map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))), )?; - batch.insert_batch( - &self.signed_transactions, - std::iter::once((*transaction.digest(), transaction.serializable_ref())), - )?; + if let Some(signed_transaction) = signed_transaction { + batch.insert_batch( + &self.signed_transactions, + std::iter::once(( + *signed_transaction.digest(), + signed_transaction.serializable_ref(), + )), + )?; + } batch.write()?; Ok(()) } @@ -791,16 +796,24 @@ impl AuthorityEpochTables { let shared_input_object_ids: BTreeSet<_> = transactions .iter() - .filter_map(|tx| { - if let SequencedConsensusTransactionKind::External(ConsensusTransaction { + .filter_map(|tx| match &tx.0.transaction { + SequencedConsensusTransactionKind::External(ConsensusTransaction { kind: ConsensusTransactionKind::CertifiedTransaction(tx), .. - }) = &tx.0.transaction - { - Some(tx.shared_input_objects().map(|obj| obj.id)) - } else { - None - } + }) => Some( + tx.shared_input_objects() + .map(|obj| obj.id) + .collect::>(), + ), + SequencedConsensusTransactionKind::External(ConsensusTransaction { + kind: ConsensusTransactionKind::UserTransaction(tx), + .. + }) => Some( + tx.shared_input_objects() + .map(|obj| obj.id) + .collect::>(), + ), + _ => None, }) .flatten() .collect(); @@ -1971,10 +1984,10 @@ impl AuthorityPerEpochStore { self.tables()? .pending_consensus_transactions .multi_remove(keys)?; - // TODO: lock once for all remove() calls. + let mut pending_consensus_certificates = self.pending_consensus_certificates.write(); for key in keys { - if let ConsensusTransactionKey::Certificate(cert) = key { - self.pending_consensus_certificates.write().remove(cert); + if let ConsensusTransactionKey::Certificate(digest) = key { + pending_consensus_certificates.remove(digest); } } Ok(()) @@ -2005,17 +2018,6 @@ impl AuthorityPerEpochStore { .is_empty() } - /// Check whether certificate was processed by consensus. - /// For shared lock certificates, if this function returns true means shared locks for this certificate are set - pub fn is_tx_cert_consensus_message_processed( - &self, - certificate: &CertifiedTransaction, - ) -> SuiResult { - self.is_consensus_message_processed(&SequencedConsensusTransactionKey::External( - ConsensusTransactionKey::Certificate(*certificate.digest()), - )) - } - /// Check whether any certificates were processed by consensus. /// This handles multiple certificates at once. pub fn is_any_tx_certs_consensus_message_processed<'a>( diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index cb4dc555d975c..6b23b3fc53d80 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -993,9 +993,9 @@ impl AuthorityStore { &self, epoch_store: &AuthorityPerEpochStore, owned_input_objects: &[ObjectRef], - transaction: VerifiedSignedTransaction, + tx_digest: TransactionDigest, + signed_transaction: Option, ) -> SuiResult { - let tx_digest = *transaction.digest(); let epoch = epoch_store.epoch(); // Other writers may be attempting to acquire locks on the same objects, so a mutex is // required. @@ -1067,7 +1067,7 @@ impl AuthorityStore { if !locks_to_write.is_empty() { trace!(?locks_to_write, "Writing locks"); - epoch_tables.write_transaction_locks(transaction, locks_to_write.into_iter())?; + epoch_tables.write_transaction_locks(signed_transaction, locks_to_write.into_iter())?; } Ok(()) diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 0e9aeb30ee1ef..274677feb5d69 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -781,7 +781,7 @@ impl ValidatorService { ConsensusTransactionKind::CertifiedTransaction(tx) => { tx.contains_shared_object() } - ConsensusTransactionKind::UserTransaction(_) => true, + ConsensusTransactionKind::UserTransaction(tx) => tx.contains_shared_object(), _ => false, }) { Some(self.metrics.consensus_latency.start_timer()) @@ -830,7 +830,7 @@ impl ValidatorService { ConsensusTransactionKind::CertifiedTransaction(certificate) => { // Certificates already verified by callers of this function. let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone())); - self.state + self.state .execute_certificate(&certificate, epoch_store) .await? } diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index 22b94b87f1f9a..e54eefca610d9 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -341,6 +341,9 @@ impl ConsensusAdapter { ConsensusTransactionKind::CertifiedTransaction(certificate) => { Some(certificate.digest()) } + ConsensusTransactionKind::UserTransaction(transaction) => { + Some(transaction.digest()) + } _ => None, }) .min(); @@ -563,6 +566,7 @@ impl ConsensusAdapter { ), SuiError::InvalidTxKindInSoftBundle ); + // TODO(fastpath): support batch of UserTransaction. } } @@ -804,6 +808,10 @@ impl ConsensusAdapter { || matches!( transactions[0].kind, ConsensusTransactionKind::CertifiedTransaction(_) + ) + || matches!( + transactions[0].kind, + ConsensusTransactionKind::UserTransaction(_) ); let send_end_of_publish = if is_user_tx { // If we are in RejectUserCerts state and we just drained the list we need to @@ -852,13 +860,11 @@ impl ConsensusAdapter { ) -> ProcessedMethod { let notifications = FuturesUnordered::new(); for transaction_key in transaction_keys { - let transaction_digests = if let SequencedConsensusTransactionKey::External( - ConsensusTransactionKey::Certificate(digest), - ) = transaction_key - { - vec![digest] - } else { - vec![] + let transaction_digests = match transaction_key { + SequencedConsensusTransactionKey::External( + ConsensusTransactionKey::Certificate(digest), + ) => vec![digest], + _ => vec![], }; let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External( diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 45e1a269a7876..5638d4529e6d1 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -654,20 +654,18 @@ impl SequencedConsensusTransactionKind { pub fn is_executable_transaction(&self) -> bool { match self { - SequencedConsensusTransactionKind::External(ext) => ext.is_certified_transaction(), + SequencedConsensusTransactionKind::External(ext) => ext.is_executable_transaction(), SequencedConsensusTransactionKind::System(_) => true, } } pub fn executable_transaction_digest(&self) -> Option { match self { - SequencedConsensusTransactionKind::External(ext) => { - if let ConsensusTransactionKind::CertifiedTransaction(txn) = &ext.kind { - Some(*txn.digest()) - } else { - None - } - } + SequencedConsensusTransactionKind::External(ext) => match &ext.kind { + ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()), + ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()), + _ => None, + }, SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()), } } @@ -712,14 +710,17 @@ impl SequencedConsensusTransaction { // which will eventually fail when the randomness state object is not found. return false; } - let SequencedConsensusTransactionKind::External(ConsensusTransaction { - kind: ConsensusTransactionKind::CertifiedTransaction(certificate), - .. - }) = &self.transaction - else { - return false; - }; - certificate.transaction_data().uses_randomness() + match &self.transaction { + SequencedConsensusTransactionKind::External(ConsensusTransaction { + kind: ConsensusTransactionKind::CertifiedTransaction(cert), + .. + }) => cert.transaction_data().uses_randomness(), + SequencedConsensusTransactionKind::External(ConsensusTransaction { + kind: ConsensusTransactionKind::UserTransaction(txn), + .. + }) => txn.transaction_data().uses_randomness(), + _ => false, + } } pub fn as_shared_object_txn(&self) -> Option<&SenderSignedData> { @@ -728,6 +729,10 @@ impl SequencedConsensusTransaction { kind: ConsensusTransactionKind::CertifiedTransaction(certificate), .. }) if certificate.contains_shared_object() => Some(certificate.data()), + SequencedConsensusTransactionKind::External(ConsensusTransaction { + kind: ConsensusTransactionKind::UserTransaction(txn), + .. + }) if txn.contains_shared_object() => Some(txn.data()), SequencedConsensusTransactionKind::System(txn) if txn.contains_shared_object() => { Some(txn.data()) } @@ -968,6 +973,7 @@ mod tests { crypto::deterministic_random_account_key, messages_consensus::{ AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind, + TransactionIndex, }, object::Object, supported_protocol_versions::SupportedProtocolVersions, @@ -1382,6 +1388,9 @@ mod tests { ConsensusTransactionKind::CertifiedTransaction(txn) => { format!("user({})", txn.transaction_data().gas_price()) } + ConsensusTransactionKind::UserTransaction(txn) => { + format!("user({})", txn.transaction_data().gas_price()) + } _ => unreachable!(), }, SequencedConsensusTransactionKind::System(_) => unreachable!(), diff --git a/crates/sui-core/src/consensus_validator.rs b/crates/sui-core/src/consensus_validator.rs index ad47a537b2110..37ce53b675d01 100644 --- a/crates/sui-core/src/consensus_validator.rs +++ b/crates/sui-core/src/consensus_validator.rs @@ -16,14 +16,14 @@ use tap::TapFallible; use tracing::{info, warn}; use crate::{ - authority::authority_per_epoch_store::AuthorityPerEpochStore, - checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager, + authority::AuthorityState, checkpoints::CheckpointServiceNotify, + transaction_manager::TransactionManager, }; /// Allows verifying the validity of transactions #[derive(Clone)] pub struct SuiTxValidator { - epoch_store: Arc, + authority_state: Arc, checkpoint_service: Arc, _transaction_manager: Arc, metrics: Arc, @@ -31,44 +31,38 @@ pub struct SuiTxValidator { impl SuiTxValidator { pub fn new( - epoch_store: Arc, + authority_state: Arc, checkpoint_service: Arc, transaction_manager: Arc, metrics: Arc, ) -> Self { + let epoch_store = authority_state.load_epoch_store_one_call_per_task().clone(); info!( "SuiTxValidator constructed for epoch {}", epoch_store.epoch() ); Self { - epoch_store, + authority_state, checkpoint_service, _transaction_manager: transaction_manager, metrics, } } - fn validate_transactions( - &self, - txs: Vec, - ) -> Result<(), eyre::Report> { + fn validate_transactions(&self, txs: &[ConsensusTransactionKind]) -> Result<(), eyre::Report> { + let epoch_store = self.authority_state.load_epoch_store_one_call_per_task(); + let mut cert_batch = Vec::new(); let mut ckpt_messages = Vec::new(); let mut ckpt_batch = Vec::new(); - for tx in txs.into_iter() { + for tx in txs.iter() { match tx { ConsensusTransactionKind::CertifiedTransaction(certificate) => { - cert_batch.push(*certificate); - - // if !certificate.contains_shared_object() { - // // new_unchecked safety: we do not use the certs in this list until all - // // have had their signatures verified. - // owned_tx_certs.push(VerifiedCertificate::new_unchecked(*certificate)); - // } + cert_batch.push(certificate.as_ref()); } ConsensusTransactionKind::CheckpointSignature(signature) => { - ckpt_messages.push(signature.clone()); - ckpt_batch.push(signature.summary); + ckpt_messages.push(signature.as_ref()); + ckpt_batch.push(&signature.summary); } ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => { if bytes.len() > dkg::DKG_MESSAGES_MAX_SIZE { @@ -91,13 +85,12 @@ impl SuiTxValidator { | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {} ConsensusTransactionKind::UserTransaction(_tx) => { - if !self.epoch_store.protocol_config().mysticeti_fastpath() { + if !epoch_store.protocol_config().mysticeti_fastpath() { return Err(SuiError::UnexpectedMessage( "ConsensusTransactionKind::UserTransaction is unsupported".to_string(), ) .into()); } - // TODO(fastpath): implement verification for uncertified user transactions. } } } @@ -106,7 +99,7 @@ impl SuiTxValidator { let cert_count = cert_batch.len(); let ckpt_count = ckpt_batch.len(); - self.epoch_store + epoch_store .signature_verifier .verify_certs_and_checkpoints(cert_batch, ckpt_batch) .tap_err(|e| warn!("batch verification error: {}", e)) @@ -115,7 +108,7 @@ impl SuiTxValidator { // All checkpoint sigs have been verified, forward them to the checkpoint service for ckpt in ckpt_messages { self.checkpoint_service - .notify_checkpoint_signature(&self.epoch_store, &ckpt)?; + .notify_checkpoint_signature(&epoch_store, ckpt)?; } self.metrics @@ -125,15 +118,45 @@ impl SuiTxValidator { .checkpoint_signatures_verified .inc_by(ckpt_count as u64); Ok(()) + } + + async fn vote_transactions(&self, txs: Vec) -> Vec { + let epoch_store = self.authority_state.load_epoch_store_one_call_per_task(); + if !epoch_store.protocol_config().mysticeti_fastpath() { + return vec![]; + } - // todo - we should un-comment line below once we have a way to revert those transactions at the end of epoch - // all certificates had valid signatures, schedule them for execution prior to sequencing - // which is unnecessary for owned object transactions. - // It is unnecessary to write to pending_certificates table because the certs will be written - // via consensus output. - // self.transaction_manager - // .enqueue_certificates(owned_tx_certs, &self.epoch_store) - // .wrap_err("Failed to schedule certificates for execution") + let mut result = Vec::new(); + for (i, tx) in txs.into_iter().enumerate() { + let ConsensusTransactionKind::UserTransaction(tx) = tx else { + continue; + }; + + // Currently validity_check() and verify_transaction() are not required to be consistent across validators, + // so they do not run in validate_transactions(). They can run there once we confirm it is safe. + if tx + .validity_check(epoch_store.protocol_config(), epoch_store.epoch()) + .is_err() + { + result.push(i as TransactionIndex); + continue; + } + let Ok(tx) = epoch_store.verify_transaction(*tx.clone()) else { + result.push(i as TransactionIndex); + continue; + }; + + if self + .authority_state + .handle_transaction_v2(&epoch_store, tx) + .await + .is_err() + { + result.push(i as TransactionIndex); + } + } + + result } } @@ -142,6 +165,7 @@ fn tx_from_bytes(tx: &[u8]) -> Result { .wrap_err("Malformed transaction (failed to deserialize)") } +#[async_trait::async_trait] impl TransactionVerifier for SuiTxValidator { fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> { let _scope = monitored_scope("ValidateBatch"); @@ -155,12 +179,29 @@ impl TransactionVerifier for SuiTxValidator { }) .collect::, _>>()?; - self.validate_transactions(txs) + self.validate_transactions(&txs) .map_err(|e| ValidationError::InvalidTransaction(e.to_string())) } - fn vote_batch(&self, _batch: &[&[u8]]) -> Vec { - vec![] + async fn verify_and_vote_batch( + &self, + batch: &[&[u8]], + ) -> Result, ValidationError> { + let _scope = monitored_scope("VerifyAndVoteBatch"); + + let txs = batch + .iter() + .map(|tx| { + tx_from_bytes(tx) + .map(|tx| tx.kind) + .map_err(|e| ValidationError::InvalidTransaction(e.to_string())) + }) + .collect::, _>>()?; + + self.validate_transactions(&txs) + .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))?; + + Ok(self.vote_transactions(txs).await) } } @@ -234,7 +275,7 @@ mod tests { let metrics = SuiTxValidatorMetrics::new(&Default::default()); let validator = SuiTxValidator::new( - state.epoch_store_for_testing().clone(), + state.clone(), Arc::new(CheckpointServiceNoop {}), state.transaction_manager().clone(), metrics, diff --git a/crates/sui-core/src/execution_cache.rs b/crates/sui-core/src/execution_cache.rs index c26c57134568c..16739e5e8990a 100644 --- a/crates/sui-core/src/execution_cache.rs +++ b/crates/sui-core/src/execution_cache.rs @@ -642,7 +642,8 @@ pub trait ExecutionCacheWrite: Send + Sync { &'a self, epoch_store: &'a AuthorityPerEpochStore, owned_input_objects: &'a [ObjectRef], - transaction: VerifiedSignedTransaction, + tx_digest: TransactionDigest, + signed_transaction: Option, ) -> BoxFuture<'a, SuiResult>; } diff --git a/crates/sui-core/src/execution_cache/object_locks.rs b/crates/sui-core/src/execution_cache/object_locks.rs index f77b9fde6155a..b9889c0094572 100644 --- a/crates/sui-core/src/execution_cache/object_locks.rs +++ b/crates/sui-core/src/execution_cache/object_locks.rs @@ -6,6 +6,7 @@ use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use mysten_common::*; use sui_types::base_types::{ObjectID, ObjectRef}; +use sui_types::digests::TransactionDigest; use sui_types::error::{SuiError, SuiResult, UserInputError}; use sui_types::object::Object; use sui_types::storage::ObjectStore; @@ -192,10 +193,9 @@ impl ObjectLocks { cache: &WritebackCache, epoch_store: &AuthorityPerEpochStore, owned_input_objects: &[ObjectRef], - transaction: VerifiedSignedTransaction, + tx_digest: TransactionDigest, + signed_transaction: Option, ) -> SuiResult { - let tx_digest = *transaction.digest(); - let object_ids = owned_input_objects.iter().map(|o| o.0).collect::>(); let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?; @@ -243,7 +243,7 @@ impl ObjectLocks { // commit all writes to DB epoch_store .tables()? - .write_transaction_locks(transaction, locks_to_write.iter().cloned())?; + .write_transaction_locks(signed_transaction, locks_to_write.iter().cloned())?; // remove pending locks from unbounded storage self.clear_cached_locks(&locks_to_write); @@ -279,7 +279,7 @@ mod tests { let tx1 = s.make_signed_transaction(&outputs.transaction); s.cache - .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1) + .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx1.digest(), Some(tx1)) .await .expect("locks should be available"); @@ -291,19 +291,34 @@ mod tests { // both locks are held by tx1, so this should fail s.cache - .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone()) + .acquire_transaction_locks( + &s.epoch_store, + &[new1, new2], + *tx2.digest(), + Some(tx2.clone()), + ) .await .unwrap_err(); // new3 is lockable, but new2 is not, so this should fail s.cache - .acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone()) + .acquire_transaction_locks( + &s.epoch_store, + &[new3, new2], + *tx2.digest(), + Some(tx2.clone()), + ) .await .unwrap_err(); // new3 is unlocked s.cache - .acquire_transaction_locks(&s.epoch_store, &[new3], tx2) + .acquire_transaction_locks( + &s.epoch_store, + &[new3], + *tx2.digest(), + Some(tx2.clone()), + ) .await .expect("new3 should be unlocked"); }) @@ -332,14 +347,24 @@ mod tests { // fails because we are referring to an old object s.cache - .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone()) + .acquire_transaction_locks( + &s.epoch_store, + &[new1, old2], + *tx.digest(), + Some(tx.clone()), + ) .await .unwrap_err(); // succeeds because the above call releases the lock on new1 after failing // to get the lock on old2 s.cache - .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx) + .acquire_transaction_locks( + &s.epoch_store, + &[new1, new2], + *tx.digest(), + Some(tx.clone()), + ) .await .expect("new1 should be unlocked after revert"); }) @@ -368,7 +393,12 @@ mod tests { // fails because we are referring to an old object s.cache - .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx) + .acquire_transaction_locks( + &s.epoch_store, + &[new1, old2], + *tx.digest(), + Some(tx.clone()), + ) .await .unwrap_err(); @@ -381,7 +411,7 @@ mod tests { // succeeds because the above call releases the lock on new1 after failing // to get the lock on old2 s.cache - .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2) + .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx2.digest(), Some(tx2)) .await .expect("new1 should be unlocked after revert"); }) @@ -407,7 +437,12 @@ mod tests { // assert that acquire_transaction_locks is sync in non-simtest, which causes the // fail_point_async! macros above to be elided s.cache - .acquire_transaction_locks(&s.epoch_store, &objects, tx2) + .acquire_transaction_locks( + &s.epoch_store, + &objects, + *tx2.digest(), + Some(tx2.clone()), + ) .now_or_never() .unwrap() .unwrap(); diff --git a/crates/sui-core/src/execution_cache/passthrough_cache.rs b/crates/sui-core/src/execution_cache/passthrough_cache.rs index 1518bda18a0d6..9707ecb5ff594 100644 --- a/crates/sui-core/src/execution_cache/passthrough_cache.rs +++ b/crates/sui-core/src/execution_cache/passthrough_cache.rs @@ -281,10 +281,16 @@ impl ExecutionCacheWrite for PassthroughCache { &'a self, epoch_store: &'a AuthorityPerEpochStore, owned_input_objects: &'a [ObjectRef], - transaction: VerifiedSignedTransaction, + tx_digest: TransactionDigest, + signed_transaction: Option, ) -> BoxFuture<'a, SuiResult> { self.store - .acquire_transaction_locks(epoch_store, owned_input_objects, transaction) + .acquire_transaction_locks( + epoch_store, + owned_input_objects, + tx_digest, + signed_transaction, + ) .boxed() } } diff --git a/crates/sui-core/src/execution_cache/proxy_cache.rs b/crates/sui-core/src/execution_cache/proxy_cache.rs index def5bf824190f..6aa61e15b0f3c 100644 --- a/crates/sui-core/src/execution_cache/proxy_cache.rs +++ b/crates/sui-core/src/execution_cache/proxy_cache.rs @@ -249,12 +249,14 @@ impl ExecutionCacheWrite for ProxyCache { &'a self, epoch_store: &'a AuthorityPerEpochStore, owned_input_objects: &'a [ObjectRef], - transaction: VerifiedSignedTransaction, + tx_digest: TransactionDigest, + signed_transaction: Option, ) -> BoxFuture<'a, SuiResult> { delegate_method!(self.acquire_transaction_locks( epoch_store, owned_input_objects, - transaction + tx_digest, + signed_transaction )) } } diff --git a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs index 5df5de61ca47f..4275ab758c461 100644 --- a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs +++ b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs @@ -1108,7 +1108,12 @@ async fn test_concurrent_lockers() { for (tx1, _, a_ref, b_ref) in txns { results.push( cache - .acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1) + .acquire_transaction_locks( + &epoch_store, + &[a_ref, b_ref], + *tx1.digest(), + Some(tx1.clone()), + ) .await, ); barrier.wait().await; @@ -1127,7 +1132,12 @@ async fn test_concurrent_lockers() { for (_, tx2, a_ref, b_ref) in txns { results.push( cache - .acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx2) + .acquire_transaction_locks( + &epoch_store, + &[a_ref, b_ref], + *tx2.digest(), + Some(tx2.clone()), + ) .await, ); barrier.wait().await; @@ -1181,7 +1191,12 @@ async fn test_concurrent_lockers_same_tx() { for (tx1, a_ref, b_ref) in txns { results.push( cache - .acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1) + .acquire_transaction_locks( + &epoch_store, + &[a_ref, b_ref], + *tx1.digest(), + Some(tx1.clone()), + ) .await, ); barrier.wait().await; @@ -1200,7 +1215,12 @@ async fn test_concurrent_lockers_same_tx() { for (tx1, a_ref, b_ref) in txns { results.push( cache - .acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1) + .acquire_transaction_locks( + &epoch_store, + &[a_ref, b_ref], + *tx1.digest(), + Some(tx1.clone()), + ) .await, ); barrier.wait().await; diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index 5ff74be1c0182..f512902e68f01 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -1845,10 +1845,17 @@ impl ExecutionCacheWrite for WritebackCache { &'a self, epoch_store: &'a AuthorityPerEpochStore, owned_input_objects: &'a [ObjectRef], - transaction: VerifiedSignedTransaction, + tx_digest: TransactionDigest, + signed_transaction: Option, ) -> BoxFuture<'a, SuiResult> { self.object_locks - .acquire_transaction_locks(self, epoch_store, owned_input_objects, transaction) + .acquire_transaction_locks( + self, + epoch_store, + owned_input_objects, + tx_digest, + signed_transaction, + ) .boxed() } diff --git a/crates/sui-core/src/mock_consensus.rs b/crates/sui-core/src/mock_consensus.rs index 6d784c22b406a..5a47a90785e56 100644 --- a/crates/sui-core/src/mock_consensus.rs +++ b/crates/sui-core/src/mock_consensus.rs @@ -9,8 +9,9 @@ use crate::consensus_handler::SequencedConsensusTransaction; use prometheus::Registry; use std::sync::{Arc, Weak}; use sui_types::error::{SuiError, SuiResult}; +use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}; -use sui_types::transaction::VerifiedCertificate; +use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction}; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tracing::debug; @@ -73,10 +74,21 @@ impl MockConsensusClient { .unwrap(); } } - if let ConsensusTransactionKind::CertifiedTransaction(tx) = tx.kind { + if let ConsensusTransactionKind::CertifiedTransaction(tx) = &tx.kind { if tx.contains_shared_object() { validator.enqueue_certificates_for_execution( - vec![VerifiedCertificate::new_unchecked(*tx)], + vec![VerifiedCertificate::new_unchecked(*tx.clone())], + &epoch_store, + ); + } + } + if let ConsensusTransactionKind::UserTransaction(tx) = &tx.kind { + if tx.contains_shared_object() { + validator.enqueue_transactions_for_execution( + vec![VerifiedExecutableTransaction::new_from_consensus( + VerifiedTransaction::new_unchecked(*tx.clone()), + 0, + )], &epoch_store, ); } diff --git a/crates/sui-core/src/post_consensus_tx_reorder.rs b/crates/sui-core/src/post_consensus_tx_reorder.rs index 7f6d05fd7d61f..54019346e1a06 100644 --- a/crates/sui-core/src/post_consensus_tx_reorder.rs +++ b/crates/sui-core/src/post_consensus_tx_reorder.rs @@ -6,7 +6,10 @@ use crate::consensus_handler::{ }; use mysten_metrics::monitored_scope; use sui_protocol_config::ConsensusTransactionOrdering; -use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}; +use sui_types::{ + messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}, + transaction::TransactionDataAPI as _, +}; pub struct PostConsensusTxReorder {} @@ -34,6 +37,10 @@ impl PostConsensusTxReorder { tracking_id: _, kind: ConsensusTransactionKind::CertifiedTransaction(cert), }) => cert.gas_price(), + SequencedConsensusTransactionKind::External(ConsensusTransaction { + tracking_id: _, + kind: ConsensusTransactionKind::UserTransaction(txn), + }) => txn.transaction_data().gas_price(), // Non-user transactions are considered to have gas price of MAX u64 and are // put to the beginning. _ => u64::MAX, diff --git a/crates/sui-core/src/quorum_driver/mod.rs b/crates/sui-core/src/quorum_driver/mod.rs index 69923bac8c26e..2b67a32026669 100644 --- a/crates/sui-core/src/quorum_driver/mod.rs +++ b/crates/sui-core/src/quorum_driver/mod.rs @@ -51,6 +51,12 @@ const TASK_QUEUE_SIZE: usize = 2000; const EFFECTS_QUEUE_SIZE: usize = 10000; const TX_MAX_RETRY_TIMES: u32 = 10; +pub trait AuthorityAggregatorUpdatable: Send + Sync + 'static { + fn epoch(&self) -> EpochId; + fn authority_aggregator(&self) -> Arc>; + fn update_authority_aggregator(&self, new_authorities: Arc>); +} + #[derive(Clone)] pub struct QuorumDriverTask { pub request: ExecuteTransactionRequestV3, @@ -509,14 +515,6 @@ where Ok(response) } - pub async fn update_validators(&self, new_validators: Arc>) { - info!( - "Quorum Driver updating AuthorityAggregator with committee {}", - new_validators.committee - ); - self.validators.store(new_validators); - } - /// Returns Some(true) if the conflicting transaction is executed successfully /// (or already executed), or Some(false) if it did not. #[instrument(level = "trace", skip_all)] @@ -610,6 +608,27 @@ where } } +impl AuthorityAggregatorUpdatable for QuorumDriver +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + fn epoch(&self) -> EpochId { + self.validators.load().committee.epoch + } + + fn authority_aggregator(&self) -> Arc> { + self.validators.load_full() + } + + fn update_authority_aggregator(&self, new_authorities: Arc>) { + info!( + "Quorum Driver updating AuthorityAggregator with committee {}", + new_authorities.committee + ); + self.validators.store(new_authorities); + } +} + pub struct QuorumDriverHandler { quorum_driver: Arc>, effects_subscriber: tokio::sync::broadcast::Receiver, diff --git a/crates/sui-core/src/quorum_driver/reconfig_observer.rs b/crates/sui-core/src/quorum_driver/reconfig_observer.rs index 896b4c712d32b..aca926b42f86e 100644 --- a/crates/sui-core/src/quorum_driver/reconfig_observer.rs +++ b/crates/sui-core/src/quorum_driver/reconfig_observer.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use super::QuorumDriver; +use super::AuthorityAggregatorUpdatable; use crate::{ authority_aggregator::AuthAggMetrics, authority_client::{AuthorityAPI, NetworkAuthorityClient}, @@ -19,7 +19,7 @@ use tracing::{info, warn}; #[async_trait] pub trait ReconfigObserver { - async fn run(&mut self, quorum_driver: Arc>); + async fn run(&mut self, epoch_updatable: Arc>); fn clone_boxed(&self) -> Box + Send + Sync>; } @@ -64,21 +64,21 @@ impl ReconfigObserver for OnsiteReconfigObserver { }) } - async fn run(&mut self, quorum_driver: Arc>) { + async fn run( + &mut self, + updatable: Arc>, + ) { loop { match self.reconfig_rx.recv().await { Ok(system_state) => { let epoch_start_state = system_state.into_epoch_start_state(); let committee = epoch_start_state.get_sui_committee(); info!("Got reconfig message. New committee: {}", committee); - if committee.epoch() > quorum_driver.current_epoch() { - let new_auth_agg = quorum_driver + if committee.epoch() > updatable.epoch() { + let new_auth_agg = updatable .authority_aggregator() - .load() .recreate_with_new_epoch_start_state(&epoch_start_state); - quorum_driver - .update_validators(Arc::new(new_auth_agg)) - .await; + updatable.update_authority_aggregator(Arc::new(new_auth_agg)); } else { // This should only happen when the node just starts warn!("Epoch number decreased - ignoring committee: {}", committee); @@ -112,5 +112,5 @@ where Box::new(Self {}) } - async fn run(&mut self, _quorum_driver: Arc>) {} + async fn run(&mut self, _quorum_driver: Arc>) {} } diff --git a/crates/sui-core/src/quorum_driver/tests.rs b/crates/sui-core/src/quorum_driver/tests.rs index 3c721d4667ef8..665ee2cd2a1b8 100644 --- a/crates/sui-core/src/quorum_driver/tests.rs +++ b/crates/sui-core/src/quorum_driver/tests.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::quorum_driver::reconfig_observer::DummyReconfigObserver; -use crate::quorum_driver::{AuthorityAggregator, QuorumDriverHandlerBuilder}; +use crate::quorum_driver::{ + AuthorityAggregator, AuthorityAggregatorUpdatable as _, QuorumDriverHandlerBuilder, +}; use crate::test_authority_clients::LocalAuthorityClient; use crate::test_authority_clients::LocalAuthorityClientFaultConfig; use crate::test_utils::make_transfer_sui_transaction; @@ -215,9 +217,7 @@ async fn test_quorum_driver_update_validators_and_max_retry_times() { let mut committee = aggregator.clone_inner_committee_test_only(); committee.epoch = 10; aggregator.committee = Arc::new(committee); - quorum_driver_clone - .update_validators(Arc::new(aggregator)) - .await; + quorum_driver_clone.update_authority_aggregator(Arc::new(aggregator)); assert_eq!( quorum_driver_handler.clone_quorum_driver().current_epoch(), 10 diff --git a/crates/sui-core/src/signature_verifier.rs b/crates/sui-core/src/signature_verifier.rs index b82de0c315c11..80acf6e871249 100644 --- a/crates/sui-core/src/signature_verifier.rs +++ b/crates/sui-core/src/signature_verifier.rs @@ -7,7 +7,7 @@ use fastcrypto_zkp::bn254::zk_login::{OIDCProvider, JWK}; use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv; use futures::pin_mut; use im::hashmap::HashMap as ImHashMap; -use itertools::izip; +use itertools::{izip, Itertools as _}; use mysten_metrics::monitored_scope; use parking_lot::{Mutex, MutexGuard, RwLock}; use prometheus::{register_int_counter_with_registry, IntCounter, Registry}; @@ -190,8 +190,8 @@ impl SignatureVerifier { /// Verifies all certs, returns Ok only if all are valid. pub fn verify_certs_and_checkpoints( &self, - certs: Vec, - checkpoints: Vec, + certs: Vec<&CertifiedTransaction>, + checkpoints: Vec<&SignedCheckpointSummary>, ) -> SuiResult { let certs: Vec<_> = certs .into_iter() @@ -329,7 +329,11 @@ impl SignatureVerifier { ) { let _scope = monitored_scope("BatchCertificateVerifier::process_queue"); - let results = batch_verify_certificates(&committee, &buffer.certs, zklogin_inputs_cache); + let results = batch_verify_certificates( + &committee, + &buffer.certs.iter().collect_vec(), + zklogin_inputs_cache, + ); izip!( results.into_iter(), buffer.certs.into_iter(), @@ -516,8 +520,8 @@ impl SignatureVerifierMetrics { /// Verifies all certificates - if any fail return error. pub fn batch_verify_all_certificates_and_checkpoints( committee: &Committee, - certs: &[CertifiedTransaction], - checkpoints: &[SignedCheckpointSummary], + certs: &[&CertifiedTransaction], + checkpoints: &[&SignedCheckpointSummary], ) -> SuiResult { // certs.data() is assumed to be verified already by the caller. @@ -531,7 +535,7 @@ pub fn batch_verify_all_certificates_and_checkpoints( /// Verifies certificates in batch mode, but returns a separate result for each cert. pub fn batch_verify_certificates( committee: &Committee, - certs: &[CertifiedTransaction], + certs: &[&CertifiedTransaction], zk_login_cache: Arc>, ) -> Vec { // certs.data() is assumed to be verified already by the caller. @@ -555,8 +559,8 @@ pub fn batch_verify_certificates( fn batch_verify( committee: &Committee, - certs: &[CertifiedTransaction], - checkpoints: &[SignedCheckpointSummary], + certs: &[&CertifiedTransaction], + checkpoints: &[&SignedCheckpointSummary], ) -> SuiResult { let mut obligation = VerificationObligation::default(); diff --git a/crates/sui-core/src/transaction_orchestrator.rs b/crates/sui-core/src/transaction_orchestrator.rs index 9dd2af551bff3..b6da3daef4842 100644 --- a/crates/sui-core/src/transaction_orchestrator.rs +++ b/crates/sui-core/src/transaction_orchestrator.rs @@ -98,15 +98,14 @@ where prometheus_registry: &Registry, reconfig_observer: OnsiteReconfigObserver, ) -> Self { + let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry)); let notifier = Arc::new(NotifyRead::new()); + let reconfig_observer = Arc::new(reconfig_observer); let quorum_driver_handler = Arc::new( - QuorumDriverHandlerBuilder::new( - validators, - Arc::new(QuorumDriverMetrics::new(prometheus_registry)), - ) - .with_notifier(notifier.clone()) - .with_reconfig_observer(Arc::new(reconfig_observer)) - .start(), + QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone()) + .with_notifier(notifier.clone()) + .with_reconfig_observer(reconfig_observer.clone()) + .start(), ); let effects_receiver = quorum_driver_handler.subscribe_to_effects(); diff --git a/crates/sui-core/src/unit_tests/batch_verification_tests.rs b/crates/sui-core/src/unit_tests/batch_verification_tests.rs index 7dec8510969ac..fbbe2fe1cd2c8 100644 --- a/crates/sui-core/src/unit_tests/batch_verification_tests.rs +++ b/crates/sui-core/src/unit_tests/batch_verification_tests.rs @@ -5,6 +5,7 @@ use crate::signature_verifier::*; use crate::test_utils::{make_cert_with_large_committee, make_dummy_tx}; use fastcrypto::traits::KeyPair; use futures::future::join_all; +use itertools::Itertools as _; use prometheus::Registry; use rand::{thread_rng, Rng}; use std::sync::Arc; @@ -81,12 +82,22 @@ async fn test_batch_verify() { let certs = gen_certs(&committee, &key_pairs, 16); let ckpts = gen_ckpts(&committee, &key_pairs, 16); - batch_verify_all_certificates_and_checkpoints(&committee, &certs, &ckpts).unwrap(); + batch_verify_all_certificates_and_checkpoints( + &committee, + &certs.iter().collect_vec(), + &ckpts.iter().collect_vec(), + ) + .unwrap(); { let mut ckpts = gen_ckpts(&committee, &key_pairs, 16); *ckpts[0].auth_sig_mut_for_testing() = ckpts[1].auth_sig().clone(); - batch_verify_all_certificates_and_checkpoints(&committee, &certs, &ckpts).unwrap_err(); + batch_verify_all_certificates_and_checkpoints( + &committee, + &certs.iter().collect_vec(), + &ckpts.iter().collect_vec(), + ) + .unwrap_err(); } let (other_sender, other_sender_sec): (_, AccountKeyPair) = get_key_pair(); @@ -98,11 +109,16 @@ async fn test_batch_verify() { let other_tx = make_dummy_tx(receiver, other_sender, &other_sender_sec); let other_cert = make_cert_with_large_committee(&committee, &key_pairs, &other_tx); *certs[i].auth_sig_mut_for_testing() = other_cert.auth_sig().clone(); - batch_verify_all_certificates_and_checkpoints(&committee, &certs, &ckpts).unwrap_err(); + batch_verify_all_certificates_and_checkpoints( + &committee, + &certs.iter().collect_vec(), + &ckpts.iter().collect_vec(), + ) + .unwrap_err(); let results = batch_verify_certificates( &committee, - &certs, + &certs.iter().collect_vec(), Arc::new(VerifiedDigestCache::new_empty()), ); results[i].as_ref().unwrap_err(); diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index f14a3eb560a0d..cfe9a3dd70a32 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -96,7 +96,7 @@ async fn test_mysticeti_manager() { epoch_store.clone(), consensus_handler_initializer, SuiTxValidator::new( - epoch_store.clone(), + state.clone(), Arc::new(CheckpointServiceNoop {}), state.transaction_manager().clone(), SuiTxValidatorMetrics::new(&Registry::new()), diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 9fd3db906fd60..09f46d40d16be 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -1348,7 +1348,7 @@ impl SuiNode { epoch_store.clone(), consensus_handler_initializer, SuiTxValidator::new( - epoch_store.clone(), + state.clone(), checkpoint_service.clone(), state.transaction_manager().clone(), sui_tx_validator_metrics.clone(), diff --git a/crates/sui-rest-api/proto/rest.proto b/crates/sui-rest-api/proto/rest.proto index 099efa57e81bc..b94e5b73d2637 100644 --- a/crates/sui-rest-api/proto/rest.proto +++ b/crates/sui-rest-api/proto/rest.proto @@ -112,6 +112,7 @@ message BalanceChange { message EffectsFinality { optional ValidatorAggregatedSignature signature = 1; optional uint64 checkpoint = 2; + optional bool quorum_executed = 3; } message TransactionExecutionResponse { diff --git a/crates/sui-rest-api/src/proto/generated/sui.rest.rs b/crates/sui-rest-api/src/proto/generated/sui.rest.rs index 6a0f29adb7181..1605838672557 100644 --- a/crates/sui-rest-api/src/proto/generated/sui.rest.rs +++ b/crates/sui-rest-api/src/proto/generated/sui.rest.rs @@ -151,6 +151,8 @@ pub struct EffectsFinality { pub signature: ::core::option::Option, #[prost(uint64, optional, tag = "2")] pub checkpoint: ::core::option::Option, + #[prost(bool, optional, tag = "3")] + pub quorum_executed: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TransactionExecutionResponse { diff --git a/crates/sui-rest-api/src/proto/mod.rs b/crates/sui-rest-api/src/proto/mod.rs index 546cd7f0d01b0..3ed3a4dd28419 100644 --- a/crates/sui-rest-api/src/proto/mod.rs +++ b/crates/sui-rest-api/src/proto/mod.rs @@ -788,18 +788,20 @@ impl TryFrom<&crate::transactions::EffectsFinality> for EffectsFinality { type Error = bcs::Error; fn try_from(value: &crate::transactions::EffectsFinality) -> Result { - let (signature, checkpoint) = match value { + let (signature, checkpoint, quorum_executed) = match value { crate::transactions::EffectsFinality::Certified { signature } => { - (Some(signature.try_into()?), None) + (Some(signature.try_into()?), None, None) } crate::transactions::EffectsFinality::Checkpointed { checkpoint } => { - (None, Some(*checkpoint)) + (None, Some(*checkpoint), None) } + crate::transactions::EffectsFinality::QuorumExecuted => (None, None, Some(true)), }; Ok(Self { signature, checkpoint, + quorum_executed, }) } } @@ -813,16 +815,15 @@ impl TryFrom<&EffectsFinality> for crate::transactions::EffectsFinality { .as_ref() .map(sui_sdk_types::types::ValidatorAggregatedSignature::try_from) .transpose()?; - match (signature, value.checkpoint) { - (Some(signature), _) => crate::transactions::EffectsFinality::Certified { signature }, - (None, Some(checkpoint)) => { - crate::transactions::EffectsFinality::Checkpointed { checkpoint } + match (signature, value.checkpoint, value.quorum_executed) { + (Some(signature), None, None) => { + crate::transactions::EffectsFinality::Certified { signature } } - (None, None) => { - return Err(bcs::Error::Custom( - "missing signature or checkpoint field".into(), - )) + (None, Some(checkpoint), None) => { + crate::transactions::EffectsFinality::Checkpointed { checkpoint } } + (None, None, Some(true)) => crate::transactions::EffectsFinality::QuorumExecuted, + _ => return Err(bcs::Error::Custom("invalid EffectsFinality message".into())), } .pipe(Ok) } diff --git a/crates/sui-rest-api/src/transactions/execution.rs b/crates/sui-rest-api/src/transactions/execution.rs index 0e445f687cb6b..ed22b5c466e2e 100644 --- a/crates/sui-rest-api/src/transactions/execution.rs +++ b/crates/sui-rest-api/src/transactions/execution.rs @@ -109,6 +109,9 @@ async fn execute_transaction( _epoch, checkpoint, ) => EffectsFinality::Checkpointed { checkpoint }, + sui_types::quorum_driver_types::EffectsFinalityInfo::QuorumExecuted(_) => { + EffectsFinality::QuorumExecuted + } }; (effects.try_into()?, finality) @@ -217,6 +220,7 @@ pub enum EffectsFinality { Checkpointed { checkpoint: CheckpointSequenceNumber, }, + QuorumExecuted, } impl serde::Serialize for EffectsFinality { @@ -232,6 +236,7 @@ impl serde::Serialize for EffectsFinality { EffectsFinality::Checkpointed { checkpoint } => { ReadableEffectsFinality::Checkpointed { checkpoint } } + EffectsFinality::QuorumExecuted => ReadableEffectsFinality::QuorumExecuted, }; readable.serialize(serializer) } else { @@ -242,6 +247,7 @@ impl serde::Serialize for EffectsFinality { EffectsFinality::Checkpointed { checkpoint } => { BinaryEffectsFinality::Checkpointed { checkpoint } } + EffectsFinality::QuorumExecuted => BinaryEffectsFinality::QuorumExecuted, }; binary.serialize(serializer) } @@ -261,6 +267,7 @@ impl<'de> serde::Deserialize<'de> for EffectsFinality { ReadableEffectsFinality::Checkpointed { checkpoint } => { EffectsFinality::Checkpointed { checkpoint } } + ReadableEffectsFinality::QuorumExecuted => EffectsFinality::QuorumExecuted, }) } else { BinaryEffectsFinality::deserialize(deserializer).map(|binary| match binary { @@ -270,6 +277,7 @@ impl<'de> serde::Deserialize<'de> for EffectsFinality { BinaryEffectsFinality::Checkpointed { checkpoint } => { EffectsFinality::Checkpointed { checkpoint } } + BinaryEffectsFinality::QuorumExecuted => EffectsFinality::QuorumExecuted, }) } } @@ -298,6 +306,7 @@ enum ReadableEffectsFinality { #[schemars(with = "crate::_schemars::U64")] checkpoint: CheckpointSequenceNumber, }, + QuorumExecuted, } #[derive(serde::Serialize, serde::Deserialize)] @@ -309,6 +318,7 @@ enum BinaryEffectsFinality { Checkpointed { checkpoint: CheckpointSequenceNumber, }, + QuorumExecuted, } fn coins(objects: &[Object]) -> impl Iterator)> + '_ { diff --git a/crates/sui-types/src/messages_consensus.rs b/crates/sui-types/src/messages_consensus.rs index 29a4cc08dcda5..432c5c71fd23d 100644 --- a/crates/sui-types/src/messages_consensus.rs +++ b/crates/sui-types/src/messages_consensus.rs @@ -111,7 +111,6 @@ pub enum ConsensusTransactionKey { NewJWKFetched(Box<(AuthorityName, JwkId, JWK)>), RandomnessDkgMessage(AuthorityName), RandomnessDkgConfirmation(AuthorityName), - UserTransaction(TransactionDigest), } impl Debug for ConsensusTransactionKey { @@ -144,7 +143,6 @@ impl Debug for ConsensusTransactionKey { Self::RandomnessDkgConfirmation(name) => { write!(f, "RandomnessDkgConfirmation({:?})", name.concise()) } - Self::UserTransaction(digest) => write!(f, "UserTransaction({:?})", digest), } } } @@ -555,13 +553,17 @@ impl ConsensusTransaction { ConsensusTransactionKey::RandomnessDkgConfirmation(*authority) } ConsensusTransactionKind::UserTransaction(tx) => { - ConsensusTransactionKey::UserTransaction(*tx.digest()) + // Use the same key format as ConsensusTransactionKind::CertifiedTransaction, + // because existing usages of ConsensusTransactionKey should not differentiate + // between CertifiedTransaction and UserTransaction. + ConsensusTransactionKey::Certificate(*tx.digest()) } } } - pub fn is_certified_transaction(&self) -> bool { + pub fn is_executable_transaction(&self) -> bool { matches!(self.kind, ConsensusTransactionKind::CertifiedTransaction(_)) + || matches!(self.kind, ConsensusTransactionKind::UserTransaction(_)) } pub fn is_user_transaction(&self) -> bool { diff --git a/crates/sui-types/src/quorum_driver_types.rs b/crates/sui-types/src/quorum_driver_types.rs index 6a0b6e39d1a07..e35e9804839c9 100644 --- a/crates/sui-types/src/quorum_driver_types.rs +++ b/crates/sui-types/src/quorum_driver_types.rs @@ -84,6 +84,7 @@ pub enum TransactionType { pub enum EffectsFinalityInfo { Certified(AuthorityStrongQuorumSignInfo), Checkpointed(EpochId, CheckpointSequenceNumber), + QuorumExecuted(EpochId), } /// When requested to execute a transaction with WaitForLocalExecution, @@ -120,22 +121,6 @@ pub struct QuorumDriverResponse { pub auxiliary_data: Option>, } -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct ExecuteTransactionRequest { - pub transaction: Transaction, - pub request_type: ExecuteTransactionRequestType, -} - -impl ExecuteTransactionRequest { - pub fn transaction_type(&self) -> TransactionType { - if self.transaction.contains_shared_object() { - TransactionType::SharedObject - } else { - TransactionType::SingleWriter - } - } -} - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ExecuteTransactionRequestV3 { pub transaction: Transaction, @@ -146,17 +131,6 @@ pub struct ExecuteTransactionRequestV3 { pub include_auxiliary_data: bool, } -#[derive(Clone, Debug)] -pub struct VerifiedExecuteTransactionResponseV3 { - pub effects: VerifiedCertifiedTransactionEffects, - pub events: Option, - // Input objects will only be populated in the happy path - pub input_objects: Option>, - // Output objects will only be populated in the happy path - pub output_objects: Option>, - pub auxiliary_data: Option>, -} - impl ExecuteTransactionRequestV3 { pub fn new_v2>(transaction: T) -> Self { Self { @@ -200,6 +174,11 @@ impl FinalizedEffects { match &self.finality_info { EffectsFinalityInfo::Certified(cert) => cert.epoch, EffectsFinalityInfo::Checkpointed(epoch, _) => *epoch, + EffectsFinalityInfo::QuorumExecuted(epoch) => *epoch, } } + + pub fn data(&self) -> &TransactionEffects { + &self.effects + } }