Skip to content

Commit

Permalink
Remove GetSharedLocks trait
Browse files Browse the repository at this point in the history
It's only ever populated with `AuthorityPerEpochStore`.

Also corrects the misnomer of "shared locks" everywhere to
"shared version assignments".
  • Loading branch information
aschran committed Jan 6, 2025
1 parent a56deae commit 9779d24
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 133 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ impl AuthorityState {
.start_timer();
let input_objects = &certificate.data().transaction_data().input_objects()?;
self.input_loader.read_objects_for_execution(
epoch_store.as_ref(),
epoch_store,
&certificate.key(),
tx_lock,
input_objects,
Expand Down
83 changes: 38 additions & 45 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use sui_types::messages_consensus::{
VersionedDkgConfirmation,
};
use sui_types::signature::GenericSignature;
use sui_types::storage::GetSharedLocks;
use sui_types::storage::{BackingPackageStore, InputKey, ObjectStore};
use sui_types::sui_system_state::epoch_start_sui_system_state::{
EpochStartSystemState, EpochStartSystemStateTrait,
Expand Down Expand Up @@ -1413,7 +1412,7 @@ impl AuthorityPerEpochStore {
key: &TransactionKey,
objects: &[InputObjectKind],
) -> SuiResult<BTreeSet<InputKey>> {
let shared_locks = once_cell::unsync::OnceCell::<
let assigned_shared_versions = once_cell::unsync::OnceCell::<
Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
>::new();
objects
Expand All @@ -1425,34 +1424,34 @@ impl AuthorityPerEpochStore {
initial_shared_version,
..
} => {
let shared_locks = shared_locks
let assigned_shared_versions = assigned_shared_versions
.get_or_init(|| {
self.get_shared_locks(key)
.expect("reading shared locks should not fail")
.map(|locks| locks.into_iter().collect())
self.get_assigned_shared_object_versions(key)
.expect("reading assigned shared versions should not fail")
.map(|versions| versions.into_iter().collect())
})
.as_ref()
// Shared version assignments could have been deleted if the tx just
// finished executing concurrently.
.ok_or(SuiError::GenericAuthorityError {
error: "no shared locks".to_string(),
error: "no assigned shared versions".to_string(),
})?;

let initial_shared_version =
if self.epoch_start_config().use_version_assignment_tables_v3() {
*initial_shared_version
} else {
// (before ConsensusV2 objects, we didn't track initial shared
// version for shared object locks)
// version for shared object version assignments)
SequenceNumber::UNKNOWN
};
// If we found locks, but they are missing the assignment for this object,
// it indicates a serious inconsistency!
let Some(version) = shared_locks.get(&(*id, initial_shared_version)) else {
// If we found assigned versions, but they are missing the assignment for
// this object, it indicates a serious inconsistency!
let Some(version) = assigned_shared_versions.get(&(*id, initial_shared_version)) else {
panic!(
"Shared object locks should have been set. key: {key:?}, obj \
id: {id:?}, initial_shared_version: {initial_shared_version:?}, \
shared_locks: {shared_locks:?}",
"Shared object version should have been assigned. key: {key:?}, \
obj id: {id:?}, initial_shared_version: {initial_shared_version:?}, \
assigned_shared_versions: {assigned_shared_versions:?}",
)
};
InputKey::VersionedObject {
Expand Down Expand Up @@ -1715,7 +1714,7 @@ impl AuthorityPerEpochStore {
// created the shared object originally - which transaction may not yet have been executed on
// this node).
//
// Because all paths that assign shared locks for a shared object transaction call this
// Because all paths that assign shared versions for a shared object transaction call this
// function, it is impossible for parent_sync to be updated before this function completes
// successfully for each affected object id.
pub(crate) async fn get_or_init_next_object_versions(
Expand Down Expand Up @@ -1813,6 +1812,26 @@ impl AuthorityPerEpochStore {
Ok(ret)
}

pub fn get_assigned_shared_object_versions(
&self,
key: &TransactionKey,
) -> SuiResult<Option<Vec<(ConsensusObjectSequenceKey, SequenceNumber)>>> {
if self.epoch_start_config().use_version_assignment_tables_v3() {
Ok(self.tables()?.assigned_shared_object_versions_v3.get(key)?)
} else {
Ok(self
.tables()?
.assigned_shared_object_versions_v2
.get(key)?
.map(|result| {
result
.into_iter()
.map(|(id, v)| ((id, SequenceNumber::UNKNOWN), v))
.collect()
}))
}
}

async fn set_assigned_shared_object_versions_with_db_batch(
&self,
versions: AssignedTxAndVersions,
Expand Down Expand Up @@ -2007,12 +2026,12 @@ impl AuthorityPerEpochStore {
}
}

/// Lock a sequence number for the shared objects of the input transaction based on the effects
/// of that transaction.
/// Assign a sequence number for the shared objects of the input transaction based on the
/// effects of that transaction.
/// Used by full nodes who don't listen to consensus, and validators who catch up by state sync.
// TODO: We should be able to pass in a vector of certs/effects and lock them all at once.
// TODO: We should be able to pass in a vector of certs/effects and acquire them all at once.
#[instrument(level = "trace", skip_all)]
pub async fn acquire_shared_locks_from_effects(
pub async fn acquire_shared_version_assignments_from_effects(
&self,
certificate: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
Expand Down Expand Up @@ -4569,32 +4588,6 @@ impl ConsensusCommitOutput {
}
}

impl GetSharedLocks for AuthorityPerEpochStore {
fn get_shared_locks(
&self,
key: &TransactionKey,
) -> SuiResult<Option<Vec<(ConsensusObjectSequenceKey, SequenceNumber)>>> {
if self.epoch_start_config().use_version_assignment_tables_v3() {
Ok(self.tables()?.assigned_shared_object_versions_v3.get(key)?)
} else {
Ok(self
.tables()?
.assigned_shared_object_versions_v2
.get(key)?
.map(|result| {
result
.into_iter()
.map(|(id, v)| ((id, SequenceNumber::UNKNOWN), v))
.collect()
}))
}
}

fn is_initial_shared_version_unknown(&self) -> bool {
!self.epoch_start_config().use_version_assignment_tables_v3()
}
}

impl ExecutionComponents {
fn new(
protocol_config: &ProtocolConfig,
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,13 @@ impl CheckpointExecutor {

if change_epoch_tx.contains_shared_object() {
epoch_store
.acquire_shared_locks_from_effects(
.acquire_shared_version_assignments_from_effects(
&change_epoch_tx,
&change_epoch_fx,
self.object_cache_reader.as_ref(),
)
.await
.expect("Acquiring shared locks for change_epoch tx cannot fail");
.expect("Acquiring shared version assignments for change_epoch tx cannot fail");
}

self.tx_manager.enqueue_with_expected_effects_digest(
Expand Down Expand Up @@ -1237,7 +1237,7 @@ async fn execute_transactions(
for (tx, _) in &executable_txns {
if tx.contains_shared_object() {
epoch_store
.acquire_shared_locks_from_effects(
.acquire_shared_version_assignments_from_effects(
tx,
digest_to_effects.get(tx.digest()).unwrap(),
object_cache_reader,
Expand Down
58 changes: 34 additions & 24 deletions crates/sui-core/src/transaction_input_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
authority::authority_per_epoch_store::CertLockGuard, execution_cache::ObjectCacheRead,
authority::{
authority_per_epoch_store::{AuthorityPerEpochStore, CertLockGuard},
epoch_start_configuration::EpochStartConfigTrait,
},
execution_cache::ObjectCacheRead,
};
use itertools::izip;
use mysten_common::fatal;
Expand All @@ -12,7 +16,7 @@ use std::sync::Arc;
use sui_types::{
base_types::{EpochId, ObjectRef, SequenceNumber, TransactionDigest},
error::{SuiError, SuiResult, UserInputError},
storage::{GetSharedLocks, ObjectKey},
storage::ObjectKey,
transaction::{
InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
Expand Down Expand Up @@ -112,7 +116,7 @@ impl TransactionInputLoader {

/// Read the inputs for a transaction that is ready to be executed.
///
/// shared_lock_store is used to resolve the versions of any shared input objects.
/// epoch_store is used to resolve the versions of any shared input objects.
///
/// This function panics if any inputs are not available, as TransactionManager should already
/// have verified that the transaction is ready to be executed.
Expand All @@ -127,13 +131,13 @@ impl TransactionInputLoader {
#[instrument(level = "trace", skip_all)]
pub fn read_objects_for_execution(
&self,
shared_lock_store: &impl GetSharedLocks,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx_key: &TransactionKey,
_tx_lock: &CertLockGuard, // see below for why this is needed
input_object_kinds: &[InputObjectKind],
epoch_id: EpochId,
) -> SuiResult<InputObjects> {
let shared_locks_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();
let assigned_shared_versions_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();

let mut results = vec![None; input_object_kinds.len()];
let mut object_keys = Vec::with_capacity(input_object_kinds.len());
Expand Down Expand Up @@ -161,32 +165,38 @@ impl TransactionInputLoader {
initial_shared_version,
..
} => {
let shared_locks = shared_locks_cell
let assigned_shared_versions = assigned_shared_versions_cell
.get_or_init(|| {
shared_lock_store
.get_shared_locks(tx_key)
.expect("loading shared locks should not fail")
.map(|locks| locks.into_iter().collect())
epoch_store
.get_assigned_shared_object_versions(tx_key)
.expect("loading assigned shared versions should not fail")
.map(|versions| versions.into_iter().collect())
})
.as_ref()
.unwrap_or_else(|| {
// Important to hold the _tx_lock here - otherwise it would be possible
// for a concurrent execution of the same tx to enter this point after the
// first execution has finished and the shared locks have been deleted.
fatal!("Failed to get shared locks for transaction {tx_key:?}");
// for a concurrent execution of the same tx to enter this point after
// the first execution has finished and the assigned shared versions
// have been deleted.
fatal!(
"Failed to get assigned shared versions for transaction {tx_key:?}"
);
});

let initial_shared_version =
if shared_lock_store.is_initial_shared_version_unknown() {
// (before ConsensusV2 objects, we didn't track initial shared
// version for shared object locks)
SequenceNumber::UNKNOWN
} else {
*initial_shared_version
};
// If we find a set of locks but an object is missing, it indicates a serious inconsistency:
let version = shared_locks.get(&(*id, initial_shared_version)).unwrap_or_else(|| {
panic!("Shared object locks should have been set. key: {tx_key:?}, obj id: {id:?}")
let initial_shared_version = if epoch_store
.epoch_start_config()
.use_version_assignment_tables_v3()
{
*initial_shared_version
} else {
// (before ConsensusV2 objects, we didn't track initial shared
// version for shared object locks)
SequenceNumber::UNKNOWN
};
// If we find a set of assigned versions but an object is missing, it indicates
// a serious inconsistency:
let version = assigned_shared_versions.get(&(*id, initial_shared_version)).unwrap_or_else(|| {
panic!("Shared object version should have been assigned. key: {tx_key:?}, obj id: {id:?}")
});
if version.is_cancelled() {
// Do not need to fetch shared object for cancelled transaction.
Expand Down
23 changes: 11 additions & 12 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use sui_types::messages_consensus::{
use sui_types::object::Data;
use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
use sui_types::randomness_state::get_randomness_state_obj_initial_shared_version;
use sui_types::storage::GetSharedLocks;
use sui_types::sui_system_state::SuiSystemStateWrapper;
use sui_types::supported_protocol_versions::SupportedProtocolVersions;
use sui_types::utils::{
Expand Down Expand Up @@ -4582,9 +4581,9 @@ async fn test_shared_object_transaction_ok() {
// Verify shared locks are now set for the transaction.
let shared_object_version = authority
.epoch_store_for_testing()
.get_shared_locks(&certificate.key())
.expect("Reading shared locks should not fail")
.expect("Locks should be set")
.get_assigned_shared_object_versions(&certificate.key())
.expect("Reading shared version assignments should not fail")
.expect("Versions should be set")
.into_iter()
.find_map(|((object_id, initial_shared_version), version)| {
if object_id == shared_object_id
Expand All @@ -4595,7 +4594,7 @@ async fn test_shared_object_transaction_ok() {
None
}
})
.expect("Shared object must be locked");
.expect("Shared object must be assigned a version");
assert_eq!(shared_object_version, OBJECT_START_VERSION);

// Finally (Re-)execute the contract should succeed.
Expand Down Expand Up @@ -4700,9 +4699,9 @@ async fn test_consensus_commit_prologue_generation() {
let get_assigned_version = |txn_key: &TransactionKey| -> SequenceNumber {
authority_state
.epoch_store_for_testing()
.get_shared_locks(txn_key)
.get_assigned_shared_object_versions(txn_key)
.unwrap()
.expect("locks should be set")
.expect("versions should be set")
.iter()
.filter_map(|((id, initial_shared_version), seq)| {
if id == &SUI_CLOCK_OBJECT_ID
Expand Down Expand Up @@ -4808,7 +4807,7 @@ async fn test_consensus_message_processed() {
} else {
let epoch_store = authority2.epoch_store_for_testing();
epoch_store
.acquire_shared_locks_from_effects(
.acquire_shared_version_assignments_from_effects(
&VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
&effects1,
authority2.get_object_cache_reader().as_ref(),
Expand Down Expand Up @@ -6071,9 +6070,9 @@ async fn test_consensus_handler_congestion_control_transaction_cancellation() {
// Check cancelled transaction shared locks.
let shared_object_version = authority
.epoch_store_for_testing()
.get_shared_locks(&cancelled_txn.key())
.expect("Reading shared locks should not fail")
.expect("locks should be set")
.get_assigned_shared_object_versions(&cancelled_txn.key())
.expect("Reading shared version assignments should not fail")
.expect("Versions should be set")
.into_iter()
.collect::<HashMap<_, _>>();
assert_eq!(
Expand Down Expand Up @@ -6102,7 +6101,7 @@ async fn test_consensus_handler_congestion_control_transaction_cancellation() {
let input_loader = TransactionInputLoader::new(authority.get_object_cache_reader().clone());
let input_objects = input_loader
.read_objects_for_execution(
authority.epoch_store_for_testing().as_ref(),
&authority.epoch_store_for_testing(),
&cancelled_txn.key(),
&CertLockGuard::dummy_for_tests(),
&cancelled_txn
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/congestion_control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async fn test_congestion_control_execution_cancellation() {
.unwrap();
authority_state_2
.epoch_store_for_testing()
.acquire_shared_locks_from_effects(
.acquire_shared_version_assignments_from_effects(
&VerifiedExecutableTransaction::new_from_certificate(cert.clone()),
&effects,
authority_state_2.get_object_cache_reader().as_ref(),
Expand Down
Loading

0 comments on commit 9779d24

Please sign in to comment.