From 3c99eb435b339d3cc5cdfb31c6ea810b624721fb Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 17 Dec 2024 13:30:58 -0800 Subject: [PATCH] [indexer-alt] Do not prune in the case of object creation (#20641) ## Description We do not need to prune anything when we create or unwrap objects in a checkpoint, since we know that the object did not exist prior to this checkpoint in the table. This saves a lot of unnecessary pruning requests. ## Test plan Deployed in prod. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../src/handlers/coin_balance_buckets.rs | 5 +- .../handlers/coin_balance_buckets_pruner.rs | 67 +++++++++++++++---- .../sui-indexer-alt/src/handlers/obj_info.rs | 9 --- .../src/handlers/obj_info_pruner.rs | 59 ++++++++++++---- 4 files changed, 103 insertions(+), 37 deletions(-) diff --git a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs index b1bba4ec74ba8..0c4d8c96ffcb6 100644 --- a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs +++ b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs @@ -187,7 +187,7 @@ impl TryInto for &ProcessedCoinBalanceBucket { /// Get the owner kind and address of a coin, if it is owned by a single address, /// either through fast-path ownership or ConsensusV2 ownership. -fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> { +pub(crate) fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> { match object.owner() { Owner::AddressOwner(owner_id) => Some((StoredCoinOwnerKind::Fastpath, *owner_id)), Owner::ConsensusV2 { authenticator, .. } => Some(( @@ -198,8 +198,9 @@ fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> } } -fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result { +pub(crate) fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result { let Some(coin) = coin.as_coin_maybe() else { + // TODO: We should make this an invariant violation. bail!("Failed to deserialize Coin for {}", coin.id()); }; let balance = coin.balance.value(); diff --git a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs index f6644534cf7e1..f3a9619692464 100644 --- a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs +++ b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs @@ -6,22 +6,62 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Result; use diesel::sql_query; use diesel_async::RunQueryDsl; +use sui_field_count::FieldCount; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_pg_db as db; -use sui_types::full_checkpoint_content::CheckpointData; +use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData}; -use super::coin_balance_buckets::{ - CoinBalanceBucketChangeKind, CoinBalanceBuckets, ProcessedCoinBalanceBucket, -}; +use super::coin_balance_buckets::{get_coin_balance_bucket, get_coin_owner}; pub(crate) struct CoinBalanceBucketsPruner; +pub(crate) struct CoinBalanceBucketToBePruned { + pub object_id: ObjectID, + pub cp_sequence_number_exclusive: u64, +} + impl Processor for CoinBalanceBucketsPruner { const NAME: &'static str = "coin_balance_buckets_pruner"; - type Value = ProcessedCoinBalanceBucket; + type Value = CoinBalanceBucketToBePruned; fn process(&self, checkpoint: &Arc) -> Result> { - CoinBalanceBuckets.process(checkpoint) + let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number; + let checkpoint_input_objects = checkpoint.checkpoint_input_objects(); + let latest_live_output_objects: BTreeMap<_, _> = checkpoint + .latest_live_output_objects() + .into_iter() + .map(|o| (o.id(), o)) + .collect(); + let mut values = Vec::new(); + for (object_id, input_object) in checkpoint_input_objects { + // This loop processes all coins that were owned by a single address prior to the checkpoint, + // but is now deleted/wrapped, or changed owner or coin balance bucket the checkpoint. + if !input_object.is_coin() { + continue; + } + let Some(input_coin_owner) = get_coin_owner(input_object) else { + continue; + }; + let input_coin_balance_bucket = get_coin_balance_bucket(input_object)?; + if let Some(output_object) = latest_live_output_objects.get(&object_id) { + let output_coin_owner = get_coin_owner(output_object); + let output_coin_balance_bucket = get_coin_balance_bucket(output_object)?; + if (output_coin_owner, output_coin_balance_bucket) + != (Some(input_coin_owner), input_coin_balance_bucket) + { + values.push(CoinBalanceBucketToBePruned { + object_id, + cp_sequence_number_exclusive: cp_sequence_number, + }); + } + } else { + values.push(CoinBalanceBucketToBePruned { + object_id, + cp_sequence_number_exclusive: cp_sequence_number + 1, + }); + } + } + Ok(values) } } @@ -32,13 +72,8 @@ impl Handler for CoinBalanceBucketsPruner { let mut to_prune = BTreeMap::new(); for v in values { - let object_id = v.object_id; - let cp_sequence_number_exclusive = match v.change { - CoinBalanceBucketChangeKind::Insert { .. } => v.cp_sequence_number, - CoinBalanceBucketChangeKind::Delete => v.cp_sequence_number + 1, - } as i64; - let cp = to_prune.entry(object_id).or_default(); - *cp = std::cmp::max(*cp, cp_sequence_number_exclusive); + let cp = to_prune.entry(v.object_id).or_default(); + *cp = std::cmp::max(*cp, v.cp_sequence_number_exclusive); } let values = to_prune .iter() @@ -66,3 +101,9 @@ impl Handler for CoinBalanceBucketsPruner { Ok(rows_deleted) } } + +impl FieldCount for CoinBalanceBucketToBePruned { + // This does not really matter since we are not limited by postgres' bound variable limit, because + // we don't bind parameters in the deletion statement. + const FIELD_COUNT: usize = 1; +} diff --git a/crates/sui-indexer-alt/src/handlers/obj_info.rs b/crates/sui-indexer-alt/src/handlers/obj_info.rs index 1da35917fedf0..a472c275daf83 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info.rs @@ -95,15 +95,6 @@ impl Handler for ObjInfo { } } -impl ProcessedObjInfo { - pub fn object_id(&self) -> ObjectID { - match &self.update { - ProcessedObjInfoUpdate::Insert(object) => object.id(), - ProcessedObjInfoUpdate::Delete(object_id) => *object_id, - } - } -} - impl FieldCount for ProcessedObjInfo { const FIELD_COUNT: usize = StoredObjInfo::FIELD_COUNT; } diff --git a/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs b/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs index fc6f25306b839..eabbaeb6944d8 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs @@ -6,20 +6,51 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Result; use diesel::sql_query; use diesel_async::RunQueryDsl; +use sui_field_count::FieldCount; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_pg_db as db; -use sui_types::full_checkpoint_content::CheckpointData; - -use super::obj_info::{ObjInfo, ProcessedObjInfo, ProcessedObjInfoUpdate}; +use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData}; pub(crate) struct ObjInfoPruner; +pub(crate) struct ObjInfoToBePruned { + pub object_id: ObjectID, + pub cp_sequence_number_exclusive: u64, +} + impl Processor for ObjInfoPruner { const NAME: &'static str = "obj_info_pruner"; - type Value = ProcessedObjInfo; + type Value = ObjInfoToBePruned; fn process(&self, checkpoint: &Arc) -> Result> { - ObjInfo.process(checkpoint) + let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number; + let checkpoint_input_objects = checkpoint.checkpoint_input_objects(); + let latest_live_output_objects = checkpoint + .latest_live_output_objects() + .into_iter() + .map(|o| (o.id(), o)) + .collect::>(); + let mut values = Vec::with_capacity(checkpoint_input_objects.len()); + // We only need to prune if an object is removed, or its owner changed. + // We do not need to prune when an object is created or unwrapped, since there would have not + // been an entry for it in the table prior to this checkpoint. + // This makes the logic different from the one in obj_info.rs. + for (object_id, input_object) in checkpoint_input_objects { + if let Some(output_object) = latest_live_output_objects.get(&object_id) { + if output_object.owner() != input_object.owner() { + values.push(ObjInfoToBePruned { + object_id, + cp_sequence_number_exclusive: cp_sequence_number, + }); + } + } else { + values.push(ObjInfoToBePruned { + object_id, + cp_sequence_number_exclusive: cp_sequence_number + 1, + }); + } + } + Ok(values) } } @@ -30,16 +61,12 @@ impl Handler for ObjInfoPruner { // For each (object_id, cp_sequence_number_exclusive), delete all entries in obj_info with // cp_sequence_number less than cp_sequence_number_exclusive that match the object_id. - // For each object_id, we first get the highest cp_sequence_number_exclusive. + + // Minor optimization:For each object_id, we first get the highest cp_sequence_number_exclusive. let mut to_prune = BTreeMap::new(); for v in values { - let object_id = v.object_id(); - let cp_sequence_number_exclusive = match v.update { - ProcessedObjInfoUpdate::Insert(_) => v.cp_sequence_number, - ProcessedObjInfoUpdate::Delete(_) => v.cp_sequence_number + 1, - } as i64; - let cp = to_prune.entry(object_id).or_default(); - *cp = std::cmp::max(*cp, cp_sequence_number_exclusive); + let cp = to_prune.entry(v.object_id).or_default(); + *cp = std::cmp::max(*cp, v.cp_sequence_number_exclusive); } let values = to_prune .iter() @@ -67,3 +94,9 @@ impl Handler for ObjInfoPruner { Ok(rows_deleted) } } + +impl FieldCount for ObjInfoToBePruned { + // This does not really matter since we are not limited by postgres' bound variable limit, because + // we don't bind parameters in the deletion statement. + const FIELD_COUNT: usize = 1; +}