diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs index c201cfa3928e89..65d2c62b401ca2 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs @@ -111,7 +111,7 @@ pub(super) fn pruner( // (3) Prune chunk by chunk to avoid the task waiting on a long-running database // transaction, between tests for cancellation. - while !watermark.is_empty() { + while let Some((from, to)) = watermark.next_chunk(config.max_chunk_size) { if cancel.is_cancelled() { info!(pipeline = H::NAME, "Shutdown received"); break 'outer; @@ -135,10 +135,10 @@ pub(super) fn pruner( break; }; - let (from, to) = watermark.next_chunk(config.max_chunk_size); let affected = match H::prune(from, to, &mut conn).await { Ok(affected) => { guard.stop_and_record(); + // Since `to` is inclusive, we can directly set it to pruner_hi watermark. watermark.pruner_hi = to as i64; affected } diff --git a/crates/sui-indexer-alt-framework/src/watermarks.rs b/crates/sui-indexer-alt-framework/src/watermarks.rs index 88f7465a243e5f..7e8864beac5c9d 100644 --- a/crates/sui-indexer-alt-framework/src/watermarks.rs +++ b/crates/sui-indexer-alt-framework/src/watermarks.rs @@ -191,17 +191,17 @@ impl<'p> PrunerWatermark<'p> { (self.wait_for > 0).then(|| Duration::from_millis(self.wait_for as u64)) } - /// Whether the pruner has any work left to do on the range in this watermark. - pub(crate) fn is_empty(&self) -> bool { - self.pruner_hi >= self.reader_lo - } - /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. - /// Returns a tuple (from, to) where `from` is inclusive and `to` is exclusive. - pub(crate) fn next_chunk(&mut self, size: u64) -> (u64, u64) { + /// If no more checkpoints to prune, returns `None`. + /// Otherwise, returns a tuple (from, to) where both `from` and `to` are inclusive. + pub(crate) fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> { + if self.pruner_hi >= self.reader_lo { + return None; + } + let from = self.pruner_hi as u64; - let to = (from + size).min(self.reader_lo as u64); - (from, to) + let to = (from + size).min(self.reader_lo as u64) - 1; + Some((from, to)) } /// Update the pruner high watermark (only) for an existing watermark row, as long as this