Skip to content

Commit

Permalink
[indexer-alt] Fix starting checkpoint for consistent pipelines (#20880)
Browse files Browse the repository at this point in the history
## Description 

Fix a bug where we should start at pruner watermark without the `+1`.
For instance, when pruner watermark is 0, we need to start at checkpoint
0.

## Test plan 

I will add a few e2e tests for this scenario.
---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
lxfind authored Jan 14, 2025
1 parent 11e5ec9 commit 4ef192d
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 3 deletions.
4 changes: 2 additions & 2 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl Indexer {
PrunerWatermark::get(&mut conn, P::NAME, Default::default())
.await
.with_context(|| format!("Failed to get pruner watermark for {}", P::NAME))?
.map(|w| (w.pruner_hi as u64) + 1)
.map(|w| w.pruner_hi as u64)
.unwrap_or_default()
} else {
watermark
Expand Down Expand Up @@ -577,6 +577,6 @@ mod tests {
.concurrent_pipeline(ConcurrentPipeline3, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 6);
assert_eq!(indexer.first_checkpoint_from_watermark, 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sui_pg_db::{self as db, Db};
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::{metrics::IndexerMetrics, watermarks::CommitterWatermark};

Expand Down Expand Up @@ -195,6 +196,10 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> JoinHandle<()> {
info!(
pipeline = H::NAME,
"Starting pipeline with config: {:?}", config
);
let ConcurrentConfig {
committer: committer_config,
pruner: pruner_config,
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
return;
};

info!(
pipeline = H::NAME,
"Starting pruner with config: {:?}", config
);

// The pruner can pause for a while, waiting for the delay imposed by the
// `pruner_timestamp` to expire. In that case, the period between ticks should not be
// compressed to make up for missed ticks.
Expand Down Expand Up @@ -136,6 +141,11 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
break;
};

debug!(
pipeline = H::NAME,
"Pruning from {} to {}", from, to_exclusive
);

let affected = match handler.prune(from, to_exclusive, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub trait Merge {
}

#[DefaultConfig]
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct IndexerConfig {
/// How checkpoints are read by the indexer.
pub ingestion: IngestionLayer,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_pg_db::reset_database;
use tokio::fs;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -33,6 +34,7 @@ async fn main() -> Result<()> {
config,
} => {
let indexer_config = read_config(&config).await?;
info!("Starting indexer with config: {:?}", indexer_config);

start_indexer(
args.db_args,
Expand Down

0 comments on commit 4ef192d

Please sign in to comment.