Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new object_per_epoch_marker_table that includes consensus start versions #20822

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/simulacrum/src/store/in_mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ impl ChildObjectResolver for InMemoryStore {
receiving_object_id: &ObjectID,
receive_object_at_version: SequenceNumber,
_epoch_id: EpochId,
// TODO: Delete this parameter once table migration is complete.
_use_object_per_epoch_marker_table_v2: bool,
) -> sui_types::error::SuiResult<Option<Object>> {
let recv_object = match crate::store::SimulatorStore::get_object(self, receiving_object_id)
{
Expand Down
25 changes: 24 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_objects_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
Expand Down Expand Up @@ -1556,7 +1560,14 @@ impl AuthorityState {
inner_temporary_store,
);
self.get_cache_writer()
.write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
.write_transaction_outputs(
epoch_store.epoch(),
transaction_outputs.into(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

if certificate.transaction_data().is_end_of_epoch_tx() {
Expand Down Expand Up @@ -1796,6 +1807,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// make a gas object if one was not provided
Expand Down Expand Up @@ -1982,6 +1997,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// make a gas object if one was not provided
Expand Down Expand Up @@ -2144,6 +2163,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// Create and use a dummy gas object if there is no gas object provided.
Expand Down
11 changes: 6 additions & 5 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use sui_storage::mutex_table::{MutexGuard, MutexTable};
use sui_types::accumulator::Accumulator;
use sui_types::authenticator_state::{get_authenticator_state, ActiveJwk};
use sui_types::base_types::{
AuthorityName, ConsensusObjectSequenceKey, EpochId, ObjectID, SequenceNumber, TransactionDigest,
AuthorityName, ConsensusObjectSequenceKey, EpochId, FullObjectID, ObjectID, SequenceNumber,
TransactionDigest,
};
use sui_types::base_types::{ConciseableName, ObjectRef};
use sui_types::committee::Committee;
Expand Down Expand Up @@ -1437,7 +1438,7 @@ impl AuthorityPerEpochStore {
error: "no assigned shared versions".to_string(),
})?;

let initial_shared_version =
let modified_initial_shared_version =
if self.epoch_start_config().use_version_assignment_tables_v3() {
*initial_shared_version
} else {
Expand All @@ -1447,21 +1448,21 @@ impl AuthorityPerEpochStore {
};
// If we found assigned versions, but they are missing the assignment for
// this object, it indicates a serious inconsistency!
let Some(version) = assigned_shared_versions.get(&(*id, initial_shared_version)) else {
let Some(version) = assigned_shared_versions.get(&(*id, modified_initial_shared_version)) else {
panic!(
"Shared object version should have been assigned. key: {key:?}, \
obj id: {id:?}, initial_shared_version: {initial_shared_version:?}, \
assigned_shared_versions: {assigned_shared_versions:?}",
)
};
InputKey::VersionedObject {
id: *id,
id: FullObjectID::new(*id, Some(*initial_shared_version)),
version: *version,
}
}
InputObjectKind::MovePackage(id) => InputKey::Package { id: *id },
InputObjectKind::ImmOrOwnedMoveObject(objref) => InputKey::VersionedObject {
id: objref.0,
id: FullObjectID::new(objref.0, None),
version: objref.1,
},
})
Expand Down
118 changes: 85 additions & 33 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use sui_types::error::UserInputError;
use sui_types::execution::TypeLayoutStore;
use sui_types::message_envelope::Message;
use sui_types::storage::{
get_module, BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
get_module, BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone,
ObjectStore,
};
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
Expand Down Expand Up @@ -212,9 +213,12 @@ impl AuthorityStore {
// We can safely delete all entries in the per epoch marker table since this is only called
// at epoch boundaries (during reconfiguration). Therefore any entries that currently
// exist can be removed. Because of this we can use the `schedule_delete_all` method.
self.perpetual_tables
.object_per_epoch_marker_table
.schedule_delete_all()?;
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.object_per_epoch_marker_table_v2
.schedule_delete_all()?)
}

Expand Down Expand Up @@ -412,40 +416,70 @@ impl AuthorityStore {

pub fn get_marker_value(
&self,
object_id: &ObjectID,
version: &SequenceNumber,
object_key: FullObjectKey,
epoch_id: EpochId,
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult<Option<MarkerValue>> {
let object_key = (epoch_id, ObjectKey(*object_id, *version));
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.get(&object_key)?)
if use_object_per_epoch_marker_table_v2 {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table_v2
.get(&(epoch_id, object_key))?)
} else {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.get(&(epoch_id, object_key.into_object_key()))?)
}
}

pub fn get_latest_marker(
&self,
object_id: &ObjectID,
object_id: FullObjectID,
epoch_id: EpochId,
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
if use_object_per_epoch_marker_table_v2 {
let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));

let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.0, *object_id);
Ok(Some((key.1, marker)))
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table_v2
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.id(), object_id);
Ok(Some((key.version(), marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
} else {
let min_key = (epoch_id, ObjectKey::min_for_id(&object_id.id()));
let max_key = (epoch_id, ObjectKey::max_for_id(&object_id.id()));

let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.0, object_id.id());
Ok(Some((key.1, marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}

Expand Down Expand Up @@ -824,6 +858,8 @@ impl AuthorityStore {
&self,
epoch_id: EpochId,
tx_outputs: &[Arc<TransactionOutputs>],
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult {
let mut written = Vec::with_capacity(tx_outputs.len());
for outputs in tx_outputs {
Expand All @@ -834,7 +870,12 @@ impl AuthorityStore {

let mut write_batch = self.perpetual_tables.transactions.batch();
for outputs in tx_outputs {
self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
self.write_one_transaction_outputs(
&mut write_batch,
epoch_id,
outputs,
use_object_per_epoch_marker_table_v2,
)?;
}
// test crashing before writing the batch
fail_point_async!("crash");
Expand All @@ -859,6 +900,8 @@ impl AuthorityStore {
write_batch: &mut DBBatch,
epoch_id: EpochId,
tx_outputs: &TransactionOutputs,
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult {
let TransactionOutputs {
transaction,
Expand All @@ -883,12 +926,21 @@ impl AuthorityStore {
// Add batched writes for objects and locks.
let effects_digest = effects.digest();

write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
if use_object_per_epoch_marker_table_v2 {
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table_v2,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
} else {
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, key.into_object_key()), *marker_value)),
)?;
}

write_batch.insert_batch(
&self.perpetual_tables.objects,
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_types::accumulator::Accumulator;
use sui_types::base_types::SequenceNumber;
use sui_types::digests::TransactionEventsDigest;
use sui_types::effects::TransactionEffects;
use sui_types::storage::MarkerValue;
use sui_types::storage::{FullObjectKey, MarkerValue};
use typed_store::metrics::SamplingInterval;
use typed_store::rocks::util::{empty_compaction_filter, reference_count_merge_operator};
use typed_store::rocks::{
Expand Down Expand Up @@ -136,6 +136,7 @@ pub struct AuthorityPerpetualTables {
/// objects that have been deleted. This table is meant to be pruned per-epoch, and all
/// previous epochs other than the current epoch may be pruned safely.
pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
}

impl AuthorityPerpetualTables {
Expand Down Expand Up @@ -459,6 +460,7 @@ impl AuthorityPerpetualTables {
self.expected_network_sui_amount.unsafe_clear()?;
self.expected_storage_fund_imbalance.unsafe_clear()?;
self.object_per_epoch_marker_table.unsafe_clear()?;
self.object_per_epoch_marker_table_v2.unsafe_clear()?;
self.objects.rocksdb.flush()?;
Ok(())
}
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,14 @@ impl<'a> TestAuthorityBuilder<'a> {

state
.get_cache_commit()
.commit_transaction_outputs(epoch_store.epoch(), &[*genesis.transaction().digest()])
.commit_transaction_outputs(
epoch_store.epoch(),
&[*genesis.transaction().digest()],
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

// We want to insert these objects directly instead of relying on genesis because
Expand Down
18 changes: 16 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,14 @@ impl CheckpointExecutor {
let cache_commit = self.state.get_cache_commit();
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
cache_commit
.commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
.commit_transaction_outputs(
epoch_store.epoch(),
all_tx_digests,
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

epoch_store
Expand Down Expand Up @@ -657,7 +664,14 @@ impl CheckpointExecutor {

let cache_commit = self.state.get_cache_commit();
cache_commit
.commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
.commit_transaction_outputs(
cur_epoch,
&[change_epoch_tx_digest],
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;
fail_point_async!("prune-and-compact");

Expand Down
Loading
Loading