Skip to content

Commit

Permalink
[indexer-alt] Fix a bug in consistent pruner due to partial pruning (#…
Browse files Browse the repository at this point in the history
…20888)

## Description 

If we cannot prune some checkpoint due to checkpoint not yet processed,
we must not remove the any of the prune information from the same batch,
since we will need to retry them all together later.
Given how tricky this is, we should definitely make this part of
framework, instead of re-implementing it again.

## Test plan 

Added tests.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
lxfind authored Jan 15, 2025
1 parent 527a475 commit 0598968
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 13 deletions.
32 changes: 21 additions & 11 deletions crates/sui-indexer-alt/src/consistent_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl PruningLookupTable {
/// Given a range of checkpoints to prune (from inclusive, to_exclusive exclusive), return the set of objects
/// that should be pruned, and for each object the checkpoint upper bound (exclusive) that
/// the objects should be pruned at.
pub fn take(
pub fn get_prune_info(
&self,
cp_from: u64,
cp_to_exclusive: u64,
Expand All @@ -73,21 +73,27 @@ impl PruningLookupTable {
for cp in cp_from..cp_to_exclusive {
let info = self
.table
.remove(&cp)
.ok_or_else(|| anyhow::anyhow!("Prune info for checkpoint {cp} not found"))?
.1
.info;
for (object_id, update_kind) in info {
.get(&cp)
.ok_or_else(|| anyhow::anyhow!("Prune info for checkpoint {cp} not found"))?;
for (object_id, update_kind) in &info.value().info {
let prune_checkpoint = match update_kind {
UpdateKind::Mutate => cp,
UpdateKind::Delete => cp + 1,
};
let entry = result.entry(object_id).or_default();
let entry = result.entry(*object_id).or_default();
*entry = (*entry).max(prune_checkpoint);
}
}
Ok(result)
}

// Remove prune info for checkpoints that we no longer need.
// NOTE: Only call this when we have successfully pruned all the checkpoints in the range.
pub fn gc_prune_info(&self, cp_from: u64, cp_to_exclusive: u64) {
for cp in cp_from..cp_to_exclusive {
self.table.remove(&cp);
}
}
}

#[cfg(test)]
Expand All @@ -111,10 +117,14 @@ mod tests {
table.insert(2, info2);

// Prune checkpoints 1-2
let result = table.take(1, 3).unwrap();
let result = table.get_prune_info(1, 3).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[&obj1], 1);
assert_eq!(result[&obj2], 2);

// Remove prune info for checkpoints 1-2
table.gc_prune_info(1, 3);
assert!(table.table.is_empty());
}

#[test]
Expand All @@ -133,7 +143,7 @@ mod tests {
table.insert(2, info2);

// Prune checkpoints 1-2
let result = table.take(1, 3).unwrap();
let result = table.get_prune_info(1, 3).unwrap();
assert_eq!(result.len(), 1);
// For deleted objects, we prune up to and including the deletion checkpoint
assert_eq!(result[&obj], 3);
Expand All @@ -149,7 +159,7 @@ mod tests {
table.insert(1, info);

// Try to prune checkpoint that doesn't exist in the lookup table.
assert!(table.take(2, 3).is_err());
assert!(table.get_prune_info(2, 3).is_err());
}

#[test]
Expand All @@ -168,7 +178,7 @@ mod tests {
table.insert(2, info2);

// Prune checkpoints 1-2
let result = table.take(1, 3).unwrap();
let result = table.get_prune_info(1, 3).unwrap();
assert_eq!(result.len(), 1);
// Should use the latest mutation checkpoint
assert_eq!(result[&obj], 2);
Expand Down
6 changes: 5 additions & 1 deletion crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ impl Handler for CoinBalanceBuckets {
) -> anyhow::Result<usize> {
use sui_indexer_alt_schema::schema::coin_balance_buckets::dsl;

let to_prune = self.pruning_lookup_table.take(from, to_exclusive)?;
let to_prune = self
.pruning_lookup_table
.get_prune_info(from, to_exclusive)?;

if to_prune.is_empty() {
self.pruning_lookup_table.gc_prune_info(from, to_exclusive);
return Ok(0);
}

Expand Down Expand Up @@ -212,6 +215,7 @@ impl Handler for CoinBalanceBuckets {
dsl::cp_sequence_number,
);
let rows_deleted = sql_query(query).execute(conn).await?;
self.pruning_lookup_table.gc_prune_info(from, to_exclusive);
Ok(rows_deleted)
}
}
Expand Down
58 changes: 57 additions & 1 deletion crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ impl Handler for ObjInfo {
) -> Result<usize> {
use sui_indexer_alt_schema::schema::obj_info::dsl;

let to_prune = self.pruning_lookup_table.take(from, to_exclusive)?;
let to_prune = self
.pruning_lookup_table
.get_prune_info(from, to_exclusive)?;

if to_prune.is_empty() {
self.pruning_lookup_table.gc_prune_info(from, to_exclusive);
return Ok(0);
}

Expand Down Expand Up @@ -146,6 +149,7 @@ impl Handler for ObjInfo {
dsl::cp_sequence_number,
);
let rows_deleted = sql_query(query).execute(conn).await?;
self.pruning_lookup_table.gc_prune_info(from, to_exclusive);
Ok(rows_deleted)
}
}
Expand Down Expand Up @@ -601,4 +605,56 @@ mod tests {
let all_obj_info = get_all_obj_info(&mut conn).await.unwrap();
assert_eq!(all_obj_info.len(), 0);
}

#[tokio::test]
async fn test_obj_info_prune_with_missing_data() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let obj_info = ObjInfo::default();
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_owned_object(0)
.finish_transaction();
let checkpoint = builder.build_checkpoint();
let values = obj_info.process(&Arc::new(checkpoint)).unwrap();
ObjInfo::commit(&values, &mut conn).await.unwrap();

// Cannot prune checkpoint 1 yet since we haven't processed the checkpoint 1 data.
// This should not yet remove the prune info for checkpoint 0.
assert!(obj_info.prune(0, 2, &mut conn).await.is_err());

builder = builder
.start_transaction(0)
.transfer_object(0, 1)
.finish_transaction();
let checkpoint = builder.build_checkpoint();
let values = obj_info.process(&Arc::new(checkpoint)).unwrap();
ObjInfo::commit(&values, &mut conn).await.unwrap();

// Now we can prune both checkpoints 0 and 1.
obj_info.prune(0, 2, &mut conn).await.unwrap();

builder = builder
.start_transaction(1)
.transfer_object(0, 0)
.finish_transaction();
let checkpoint = builder.build_checkpoint();
let values = obj_info.process(&Arc::new(checkpoint)).unwrap();
ObjInfo::commit(&values, &mut conn).await.unwrap();

// Checkpoint 3 is missing, so we can not prune it.
assert!(obj_info.prune(2, 4, &mut conn).await.is_err());

builder = builder
.start_transaction(2)
.delete_object(0)
.finish_transaction();
let checkpoint = builder.build_checkpoint();
let values = obj_info.process(&Arc::new(checkpoint)).unwrap();
ObjInfo::commit(&values, &mut conn).await.unwrap();

// Now we can prune checkpoint 2, as well as 3.
obj_info.prune(2, 4, &mut conn).await.unwrap();
}
}

0 comments on commit 0598968

Please sign in to comment.