Skip to content

Commit

Permalink
[indexer-alt] Fix comments in pruner to (#20740)
Browse files Browse the repository at this point in the history
## Description 

Clarified in the doc comment for `prune` that `to` should be exclusive
instead of inclusive.
Also updated the prune loop to make it clearer.

## Test plan 

We probably want to write some tests.
---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
lxfind authored Jan 2, 2025
1 parent 820db16 commit f180f90
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ pub trait Handler: Processor<Value: FieldCount> {
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>)
-> anyhow::Result<usize>;

/// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning
/// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
async fn prune(
_from: u64,
_to_exclusive: u64,
_conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
Ok(0)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub(super) fn pruner<H: Handler + 'static>(

// (3) Prune chunk by chunk to avoid the task waiting on a long-running database
// transaction, between tests for cancellation.
while !watermark.is_empty() {
while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
if cancel.is_cancelled() {
info!(pipeline = H::NAME, "Shutdown received");
break 'outer;
Expand All @@ -135,11 +135,10 @@ pub(super) fn pruner<H: Handler + 'static>(
break;
};

let (from, to) = watermark.next_chunk(config.max_chunk_size);
let affected = match H::prune(from, to, &mut conn).await {
let affected = match H::prune(from, to_exclusive, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
watermark.pruner_hi = to as i64;
watermark.pruner_hi = to_exclusive as i64;
affected
}

Expand Down
18 changes: 9 additions & 9 deletions crates/sui-indexer-alt-framework/src/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,17 @@ impl<'p> PrunerWatermark<'p> {
(self.wait_for > 0).then(|| Duration::from_millis(self.wait_for as u64))
}

/// Whether the pruner has any work left to do on the range in this watermark.
pub(crate) fn is_empty(&self) -> bool {
self.pruner_hi >= self.reader_lo
}

/// The next chunk of checkpoints that the pruner should work on, to advance the watermark.
/// Returns a tuple (from, to) where `from` is inclusive and `to` is exclusive.
pub(crate) fn next_chunk(&mut self, size: u64) -> (u64, u64) {
/// If no more checkpoints to prune, returns `None`.
/// Otherwise, returns a tuple (from, to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive.
pub(crate) fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
if self.pruner_hi >= self.reader_lo {
return None;
}

let from = self.pruner_hi as u64;
let to = (from + size).min(self.reader_lo as u64);
(from, to)
let to_exclusive = (from + size).min(self.reader_lo as u64);
Some((from, to_exclusive))
}

/// Update the pruner high watermark (only) for an existing watermark row, as long as this
Expand Down

0 comments on commit f180f90

Please sign in to comment.