From 0226d344c0bce1f71e1870b1abd5daf6c3b2a4ba Mon Sep 17 00:00:00 2001 From: Mark Logan Date: Wed, 23 Oct 2024 17:25:17 -0700 Subject: [PATCH] WIP rebase --- .../authority/authority_per_epoch_store.rs | 179 ++++++------------ .../shared_object_congestion_tracker.rs | 104 +++++----- .../unit_tests/congestion_control_tests.rs | 4 +- 3 files changed, 124 insertions(+), 163 deletions(-) diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 84e4a32ff5a52..839cc148410a2 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -52,7 +52,7 @@ use typed_store::{ use super::authority_store_tables::ENV_VAR_LOCKS_BLOCK_CACHE_SIZE; use super::epoch_start_configuration::EpochStartConfigTrait; use super::shared_object_congestion_tracker::{ - CongestionPerObjectDebt, SharedObjectCongestionTracker, + CongestionPerObjectDebt, Debt, SharedObjectCongestionTracker, }; use super::transaction_deferral::{transaction_deferral_within_limit, DeferralKey, DeferralReason}; use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration}; @@ -729,63 +729,6 @@ impl AuthorityEpochTables { Ok(()) } - // TODO(quarantine) - pub fn load_initial_object_debts( - &self, - current_round: Round, - for_randomness: bool, - protocol_config: &ProtocolConfig, - transactions: &[VerifiedSequencedConsensusTransaction], - ) -> SuiResult> { - let default_per_commit_budget = protocol_config - .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option() - .unwrap_or(0); - let (table, per_commit_budget) = if for_randomness { - ( - &self.congestion_control_randomness_object_debts, - protocol_config - .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option() - .unwrap_or(default_per_commit_budget), - ) - } else { - ( - &self.congestion_control_object_debts, - default_per_commit_budget, - ) - }; - - let shared_input_object_ids: BTreeSet<_> = transactions - .iter() - .filter_map(|tx| { - if let SequencedConsensusTransactionKind::External(ConsensusTransaction { - kind: ConsensusTransactionKind::CertifiedTransaction(tx), - .. - }) = &tx.0.transaction - { - Some(tx.shared_input_objects().map(|obj| obj.id)) - } else { - None - } - }) - .flatten() - .collect(); - Ok(table - .multi_get(shared_input_object_ids.iter())? - .into_iter() - .zip(shared_input_object_ids) - .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id))) - .map(move |(debt, object_id)| { - let (round, debt) = debt.into_v1(); - ( - object_id, - // Stored debts already account for the budget of the round in which - // they were accumulated. Application of budget from future rounds to - // the debt is handled here. - debt.saturating_sub(per_commit_budget * (current_round - round - 1)), - ) - })) - } - fn get_all_deferred_transactions( &self, ) -> SuiResult>> { @@ -2870,21 +2813,26 @@ impl AuthorityPerEpochStore { // We track transaction execution cost separately for regular transactions and transactions using randomness, since // they will be in different PendingCheckpoints. - let tables = self.tables()?; let shared_object_congestion_tracker = SharedObjectCongestionTracker::from_protocol_config( - &tables, + self.consensus_quarantine.read().load_initial_object_debts( + self, + consensus_commit_info.round, + false, + &sequenced_transactions, + )?, self.protocol_config(), - consensus_commit_info.round, false, - &sequenced_transactions, )?; let shared_object_using_randomness_congestion_tracker = SharedObjectCongestionTracker::from_protocol_config( - &tables, + self.consensus_quarantine.read().load_initial_object_debts( + self, + consensus_commit_info.round, + true, + &sequenced_transactions, + )?, self.protocol_config(), - consensus_commit_info.round, true, - &sequenced_randomness_transactions, )?; // We always order transactions using randomness last. @@ -4272,6 +4220,8 @@ impl AuthorityPerEpochStore { mod quarantine { use mysten_common::fatal; + use crate::authority::shared_object_congestion_tracker::Debt; + use super::*; /// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints @@ -4294,8 +4244,9 @@ mod quarantine { // The most recent congestion control debts for objects. Uses a ref-count to track // which objects still exist in some element of output_queue. - congestion_control_randomness_object_debts: RefCountedHashMap, - congestion_control_object_debts: RefCountedHashMap, + congestion_control_randomness_object_debts: + RefCountedHashMap, + congestion_control_object_debts: RefCountedHashMap, } impl ConsensusOutputQuarantine { @@ -4306,9 +4257,9 @@ mod quarantine { output_queue: VecDeque::new(), builder_checkpoint_summary: BTreeMap::new(), builder_digest_to_checkpoint: HashMap::new(), - shared_object_next_versions: Default::default(), - congestion_control_randomness_object_debts: Default::default(), - congestion_control_object_debts: Default::default(), + shared_object_next_versions: RefCountedHashMap::new(), + congestion_control_randomness_object_debts: RefCountedHashMap::new(), + congestion_control_object_debts: RefCountedHashMap::new(), } } } @@ -4540,12 +4491,20 @@ mod quarantine { } fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) { + let current_round = output.consensus_round; + for (object_id, debt) in output.congestion_control_object_debts.iter() { - self.congestion_control_object_debts.insert(*object_id, *debt); + self.congestion_control_object_debts.insert( + *object_id, + CongestionPerObjectDebt::new(current_round, *debt), + ); } for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() { - self.congestion_control_randomness_object_debts.insert(*object_id, *debt); + self.congestion_control_randomness_object_debts.insert( + *object_id, + CongestionPerObjectDebt::new(current_round, *debt), + ); } } @@ -4662,14 +4621,27 @@ mod quarantine { epoch_store: &AuthorityPerEpochStore, current_round: Round, for_randomness: bool, - per_commit_budget: u64, transactions: &[VerifiedSequencedConsensusTransaction], - ) -> SuiResult> { + ) -> SuiResult> { + let protocol_config = epoch_store.protocol_config(); let tables = epoch_store.tables()?; - let (hash_table, db_table) = if for_randomness { - (&self.congestion_control_randomness_object_debts, &tables.congestion_control_randomness_object_debts) + let default_per_commit_budget = protocol_config + .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option() + .unwrap_or(0); + let (hash_table, db_table, per_commit_budget) = if for_randomness { + ( + &self.congestion_control_randomness_object_debts, + &tables.congestion_control_randomness_object_debts, + protocol_config + .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option() + .unwrap_or(default_per_commit_budget), + ) } else { - (&self.congestion_control_object_debts, &tables.congestion_control_object_debts) + ( + &self.congestion_control_object_debts, + &tables.congestion_control_object_debts, + default_per_commit_budget, + ) }; let shared_input_object_ids: BTreeSet<_> = transactions .iter() @@ -4694,7 +4666,7 @@ mod quarantine { for (i, object_id) in shared_input_object_ids.iter().enumerate() { if let Some(debt) = hash_table.get(object_id) { - results.push(Some(*debt)); + results.push(Some(debt.into_v1())); } else { results.push(None); fallback_keys.push(object_id); @@ -4705,41 +4677,20 @@ mod quarantine { let fallback_results = db_table.multi_get(fallback_keys)?; assert_eq!(fallback_results.len(), fallback_indices.len()); for (i, result) in fallback_indices.into_iter().zip(fallback_results) { - results[i] = result; + results[i] = result.map(|debt| debt.into_v1()); } - Ok(results.into_iter().zip(shared_input_object_ids.into_iter()) - .map(|(debt, object_id)| { - let (round, debt) = debt.into_v1(); - ( - object_id, + Ok(results + .into_iter() + .zip(shared_input_object_ids) + .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id))) + .map(move |((round, debt), object_id)| { // Stored debts already account for the budget of the round in which // they were accumulated. Application of budget from future rounds to // the debt is handled here. - debt.saturating_sub(per_commit_budget * (current_round - round - 1)), - ) - })) - - ) - - Ok(results) - - - - Ok(hash_table - .multi_get(shared_input_object_ids.iter())? - .into_iter() - .flatten() - .zip(shared_input_object_ids) - .map(move |(debt, object_id)| { - let (round, debt) = debt.into_v1(); - ( - object_id, - // Stored debts already account for the budget of the round in which - // they were accumulated. Application of budget from future rounds to - // the debt is handled here. - debt.saturating_sub(per_commit_budget * (current_round - round - 1)), - ) + assert!(current_round > round); + let num_rounds = current_round - round - 1; + (object_id, debt.dec_by(per_commit_budget * num_rounds)) })) } } @@ -4780,8 +4731,8 @@ pub(crate) struct ConsensusCommitOutput { active_jwks: BTreeSet<(u64, (JwkId, JWK))>, // congestion control state - congestion_control_object_debts: Vec<(ObjectID, u64)>, - congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>, + congestion_control_object_debts: Vec<(ObjectID, Debt)>, + congestion_control_randomness_object_debts: Vec<(ObjectID, Debt)>, } impl ConsensusCommitOutput { @@ -4911,13 +4862,13 @@ impl ConsensusCommitOutput { self.active_jwks.insert((round, key)); } - fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) { + fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, Debt)>) { self.congestion_control_object_debts = object_debts; } fn set_congestion_control_randomness_object_debts( &mut self, - object_debts: Vec<(ObjectID, u64)>, + object_debts: Vec<(ObjectID, Debt)>, ) { self.congestion_control_randomness_object_debts = object_debts; } @@ -5164,8 +5115,4 @@ where pub fn get(&self, key: &K) -> Option<&V> { self.map.get(key).map(|(_, v)| v) } - - pub fn iter(&self) -> impl Iterator { - self.map.iter().map(|(k, (_, v))| (k, v)) - } } diff --git a/crates/sui-core/src/authority/shared_object_congestion_tracker.rs b/crates/sui-core/src/authority/shared_object_congestion_tracker.rs index 0d2b8cf1ed1bf..ca39d6978c688 100644 --- a/crates/sui-core/src/authority/shared_object_congestion_tracker.rs +++ b/crates/sui-core/src/authority/shared_object_congestion_tracker.rs @@ -1,9 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use super::authority_per_epoch_store::AuthorityEpochTables; use crate::authority::transaction_deferral::DeferralKey; -use crate::consensus_handler::VerifiedSequencedConsensusTransaction; use narwhal_types::Round; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -38,15 +36,17 @@ pub struct SharedObjectCongestionTracker { impl SharedObjectCongestionTracker { pub fn new( - initial_object_debts: impl IntoIterator, + initial_object_debts: impl IntoIterator, mode: PerObjectCongestionControlMode, max_accumulated_txn_cost_per_object_in_commit: Option, gas_budget_based_txn_cost_cap_factor: Option, gas_budget_based_txn_cost_absolute_cap_commit_count: Option, max_txn_cost_overage_per_object_in_commit: u64, ) -> Self { - let object_execution_cost: HashMap = - initial_object_debts.into_iter().collect(); + let object_execution_cost: HashMap = initial_object_debts + .into_iter() + .map(|(obj_id, debt)| (obj_id, debt.0)) + .collect(); let max_accumulated_txn_cost_per_object_in_commit = if mode == PerObjectCongestionControlMode::None { 0 @@ -79,21 +79,14 @@ impl SharedObjectCongestionTracker { } pub fn from_protocol_config( - tables: &AuthorityEpochTables, + initial_object_debts: impl IntoIterator, protocol_config: &ProtocolConfig, - round: Round, for_randomness: bool, - transactions: &[VerifiedSequencedConsensusTransaction], ) -> SuiResult { let max_accumulated_txn_cost_per_object_in_commit = protocol_config.max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option(); Ok(Self::new( - tables.load_initial_object_debts( - round, - for_randomness, - protocol_config, - transactions, - )?, + initial_object_debts, protocol_config.per_object_congestion_control_mode(), if for_randomness { protocol_config @@ -223,7 +216,7 @@ impl SharedObjectCongestionTracker { // Returns accumulated debts for objects whose budgets have been exceeded over the course // of the commit. Consumes the tracker object, since this should only be called once after // all tx have been processed. - pub fn accumulated_debts(self) -> Vec<(ObjectID, u64)> { + pub fn accumulated_debts(self) -> Vec<(ObjectID, Debt)> { if self.max_txn_cost_overage_per_object_in_commit == 0 { return vec![]; // early-exit if overage is not allowed } @@ -239,6 +232,7 @@ impl SharedObjectCongestionTracker { None } }) + .map(|(obj_id, debt)| (obj_id, Debt(debt))) .collect() } @@ -278,19 +272,28 @@ impl SharedObjectCongestionTracker { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub enum CongestionPerObjectDebt { V1(Round, u64), } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct Debt(pub u64); + +impl Debt { + pub fn dec_by(self, amount: u64) -> Self { + Self(self.0.saturating_sub(amount)) + } +} + impl CongestionPerObjectDebt { - pub fn new(round: Round, debt: u64) -> Self { - Self::V1(round, debt) + pub fn new(round: Round, debt: Debt) -> Self { + Self::V1(round, debt.0) } - pub fn into_v1(self) -> (Round, u64) { + pub fn into_v1(self) -> (Round, Debt) { match self { - Self::V1(round, debt) => (round, debt), + Self::V1(round, debt) => (round, Debt(debt)), } } } @@ -325,7 +328,7 @@ mod object_cost_tests { let object_id_2 = ObjectID::random(); let shared_object_congestion_tracker = SharedObjectCongestionTracker::new( - [(object_id_0, 5), (object_id_1, 10)], + [(object_id_0, Debt(5)), (object_id_1, Debt(10))], PerObjectCongestionControlMode::TotalGasBudget, Some(0), // not part of this test None, @@ -476,7 +479,7 @@ mod object_cost_tests { // object 0: | // object 1: | SharedObjectCongestionTracker::new( - [(shared_obj_0, 10), (shared_obj_1, 1)], + [(shared_obj_0, Debt(10)), (shared_obj_1, Debt(1))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), None, @@ -490,7 +493,7 @@ mod object_cost_tests { // object 0: | // object 1: | SharedObjectCongestionTracker::new( - [(shared_obj_0, 2), (shared_obj_1, 1)], + [(shared_obj_0, Debt(2)), (shared_obj_1, Debt(1))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), None, @@ -504,7 +507,7 @@ mod object_cost_tests { // object 0: | // object 1: | SharedObjectCongestionTracker::new( - [(shared_obj_0, 10), (shared_obj_1, 1)], + [(shared_obj_0, Debt(10)), (shared_obj_1, Debt(1))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), Some(45), // Make the cap just less than the gas budget, there are 1 objects in tx. @@ -693,7 +696,7 @@ mod object_cost_tests { // object 0: | // object 1: | SharedObjectCongestionTracker::new( - [(shared_obj_0, 102), (shared_obj_1, 90)], + [(shared_obj_0, Debt(102)), (shared_obj_1, Debt(90))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), None, @@ -707,7 +710,7 @@ mod object_cost_tests { // object 0: | // object 1: | SharedObjectCongestionTracker::new( - [(shared_obj_0, 3), (shared_obj_1, 2)], + [(shared_obj_0, Debt(3)), (shared_obj_1, Debt(2))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), None, @@ -721,7 +724,7 @@ mod object_cost_tests { // object 0: | // object 1: | SharedObjectCongestionTracker::new( - [(shared_obj_0, 100), (shared_obj_1, 90)], + [(shared_obj_0, Debt(100)), (shared_obj_1, Debt(90))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), Some(45), // Make the cap just less than the gas budget, there are 1 objects in tx. @@ -787,7 +790,7 @@ mod object_cost_tests { let cap_factor = Some(1); let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new( - [(object_id_0, 5), (object_id_1, 10)], + [(object_id_0, Debt(5)), (object_id_1, Debt(10))], mode, Some(0), // not part of this test cap_factor, @@ -802,7 +805,7 @@ mod object_cost_tests { assert_eq!( shared_object_congestion_tracker, SharedObjectCongestionTracker::new( - [(object_id_0, 5), (object_id_1, 10)], + [(object_id_0, Debt(5)), (object_id_1, Debt(10))], mode, Some(0), // not part of this test cap_factor, @@ -824,7 +827,10 @@ mod object_cost_tests { assert_eq!( shared_object_congestion_tracker, SharedObjectCongestionTracker::new( - [(object_id_0, expected_object_0_cost), (object_id_1, 10)], + [ + (object_id_0, Debt(expected_object_0_cost)), + (object_id_1, Debt(10)) + ], mode, Some(0), // not part of this test cap_factor, @@ -857,9 +863,9 @@ mod object_cost_tests { shared_object_congestion_tracker, SharedObjectCongestionTracker::new( [ - (object_id_0, expected_object_cost), - (object_id_1, expected_object_cost), - (object_id_2, expected_object_cost) + (object_id_0, Debt(expected_object_cost)), + (object_id_1, Debt(expected_object_cost)), + (object_id_2, Debt(expected_object_cost)) ], mode, Some(0), // not part of this test @@ -894,9 +900,9 @@ mod object_cost_tests { shared_object_congestion_tracker, SharedObjectCongestionTracker::new( [ - (object_id_0, expected_object_cost), - (object_id_1, expected_object_cost), - (object_id_2, expected_object_cost) + (object_id_0, Debt(expected_object_cost)), + (object_id_1, Debt(expected_object_cost)), + (object_id_2, Debt(expected_object_cost)) ], mode, Some(0), // not part of this test @@ -942,7 +948,7 @@ mod object_cost_tests { PerObjectCongestionControlMode::TotalGasBudget => { // Starting with two objects with accumulated cost 80. SharedObjectCongestionTracker::new( - [(shared_obj_0, 80), (shared_obj_1, 80)], + [(shared_obj_0, Debt(80)), (shared_obj_1, Debt(80))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), None, @@ -953,7 +959,7 @@ mod object_cost_tests { PerObjectCongestionControlMode::TotalGasBudgetWithCap => { // Starting with two objects with accumulated cost 80. SharedObjectCongestionTracker::new( - [(shared_obj_0, 80), (shared_obj_1, 80)], + [(shared_obj_0, Debt(80)), (shared_obj_1, Debt(80))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), Some(45), @@ -964,7 +970,7 @@ mod object_cost_tests { PerObjectCongestionControlMode::TotalTxCount => { // Starting with two objects with accumulated tx count 2. SharedObjectCongestionTracker::new( - [(shared_obj_0, 2), (shared_obj_1, 2)], + [(shared_obj_0, Debt(2)), (shared_obj_1, Debt(2))], mode, Some(max_accumulated_txn_cost_per_object_in_commit), None, @@ -986,13 +992,13 @@ mod object_cost_tests { match mode { PerObjectCongestionControlMode::None => unreachable!(), PerObjectCongestionControlMode::TotalGasBudget => { - assert_eq!(accumulated_debts[0], (shared_obj_0, 90)); // init 80 + cost 100 - budget 90 = 90 + assert_eq!(accumulated_debts[0], (shared_obj_0, Debt(90))); // init 80 + cost 100 - budget 90 = 90 } PerObjectCongestionControlMode::TotalGasBudgetWithCap => { - assert_eq!(accumulated_debts[0], (shared_obj_0, 80)); // init 80 + capped cost 90 - budget 90 = 80 + assert_eq!(accumulated_debts[0], (shared_obj_0, Debt(80))); // init 80 + capped cost 90 - budget 90 = 80 } PerObjectCongestionControlMode::TotalTxCount => { - assert_eq!(accumulated_debts[0], (shared_obj_0, 1)); // init 2 + 1 tx - budget 2 = 1 + assert_eq!(accumulated_debts[0], (shared_obj_0, Debt(1))); // init 2 + 1 tx - budget 2 = 1 } } } @@ -1004,7 +1010,11 @@ mod object_cost_tests { let object_id_2 = ObjectID::random(); let shared_object_congestion_tracker = SharedObjectCongestionTracker::new( - [(object_id_0, 5), (object_id_1, 10), (object_id_2, 100)], + [ + (object_id_0, Debt(5)), + (object_id_1, Debt(10)), + (object_id_2, Debt(100)), + ], PerObjectCongestionControlMode::TotalGasBudget, Some(100), None, @@ -1025,7 +1035,11 @@ mod object_cost_tests { let tx_gas_budget = 2000; let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new( - [(object_id_0, 5), (object_id_1, 10), (object_id_2, 100)], + [ + (object_id_0, Debt(5)), + (object_id_1, Debt(10)), + (object_id_2, Debt(100)), + ], PerObjectCongestionControlMode::TotalGasBudgetWithCap, Some(100), Some(1000), @@ -1056,6 +1070,6 @@ mod object_cost_tests { // Verify accumulated debts still uses the per-commit budget to decrement. let accumulated_debts = shared_object_congestion_tracker.accumulated_debts(); assert_eq!(accumulated_debts.len(), 1); - assert_eq!(accumulated_debts[0], (object_id_2, 200)); + assert_eq!(accumulated_debts[0], (object_id_2, Debt(200))); } } diff --git a/crates/sui-core/src/unit_tests/congestion_control_tests.rs b/crates/sui-core/src/unit_tests/congestion_control_tests.rs index 53016c0b38ef9..afcc377ce6185 100644 --- a/crates/sui-core/src/unit_tests/congestion_control_tests.rs +++ b/crates/sui-core/src/unit_tests/congestion_control_tests.rs @@ -2,7 +2,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::authority::shared_object_congestion_tracker::SharedObjectCongestionTracker; +use crate::authority::shared_object_congestion_tracker::{Debt, SharedObjectCongestionTracker}; use crate::{ authority::{ authority_tests::{ @@ -298,7 +298,7 @@ async fn test_congestion_control_execution_cancellation() { // Initialize shared object queue so that any transaction touches shared_object_1 should result in congestion and cancellation. register_fail_point_arg("initial_congestion_tracker", move || { Some(SharedObjectCongestionTracker::new( - [(shared_object_1.0, 10)], + [(shared_object_1.0, Debt(10))], PerObjectCongestionControlMode::TotalGasBudget, Some( test_setup