Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement consistency pruning in the builit-in pruner #20702

Merged
merged 8 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures.workspace = true
prometheus.workspace = true
reqwest.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
Expand Down
188 changes: 179 additions & 9 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ use pipeline::{
sequential::{self, SequentialConfig},
Processor,
};
use sui_pg_db::{Db, DbArgs};
use sui_pg_db::{temp::TempDb, Db, DbArgs};
use task::graceful_shutdown;
use tempfile::tempdir;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use watermarks::CommitterWatermark;
use watermarks::{CommitterWatermark, PrunerWatermark};

pub mod handlers;
pub mod ingestion;
Expand Down Expand Up @@ -175,6 +176,25 @@ impl Indexer {
})
}

pub async fn new_for_testing() -> (Self, TempDb) {
let temp_db = TempDb::new().unwrap();
let db_args = DbArgs::new_for_testing(temp_db.database().url().clone());
let indexer = Indexer::new(
db_args,
IndexerArgs::default(),
ClientArgs {
remote_store_url: None,
local_ingestion_path: Some(tempdir().unwrap().into_path()),
},
IngestionConfig::default(),
&MIGRATIONS,
CancellationToken::new(),
)
.await
.unwrap();
(indexer, temp_db)
}

/// The database connection pool used by the indexer.
pub fn db(&self) -> &Db {
&self.db
Expand All @@ -196,7 +216,8 @@ impl Indexer {
handler: H,
config: ConcurrentConfig,
) -> Result<()> {
let Some(watermark) = self.add_pipeline::<H>().await? else {
let start_from_pruner_watermark = H::PRUNING_REQUIRES_PROCESSED_VALUES;
let Some(watermark) = self.add_pipeline::<H>(start_from_pruner_watermark).await? else {
return Ok(());
};

Expand Down Expand Up @@ -236,7 +257,7 @@ impl Indexer {
handler: H,
config: SequentialConfig,
) -> Result<()> {
let Some(watermark) = self.add_pipeline::<H>().await? else {
let Some(watermark) = self.add_pipeline::<H>(false).await? else {
return Ok(());
};

Expand Down Expand Up @@ -366,8 +387,13 @@ impl Indexer {
/// handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is disabled,
/// `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and
/// `Ok(Some(Some(watermark)))` if the pipeline is enabled and the watermark is found.
///
/// If `start_from_pruner_watermark` is true, the indexer will start ingestion from just after
/// the pruner watermark, so that the pruner have access to the processed values for any
/// unpruned checkpoints.
async fn add_pipeline<P: Processor + 'static>(
&mut self,
start_from_pruner_watermark: bool,
) -> Result<Option<Option<CommitterWatermark<'static>>>> {
ensure!(
self.added_pipelines.insert(P::NAME),
Expand All @@ -388,11 +414,24 @@ impl Indexer {
.await
.with_context(|| format!("Failed to get watermark for {}", P::NAME))?;

// TODO(amnn): Test this (depends on supporting migrations and tempdb).
self.first_checkpoint_from_watermark = watermark
.as_ref()
.map_or(0, |w| w.checkpoint_hi_inclusive as u64 + 1)
.min(self.first_checkpoint_from_watermark);
let expected_first_checkpoint = if start_from_pruner_watermark {
// If the pruner of this pipeline requires processed values in order to prune,
// we must start ingestion from just after the pruner watermark,
// so that we can process all values needed by the pruner.
PrunerWatermark::get(&mut conn, P::NAME, Default::default())
.await
.with_context(|| format!("Failed to get pruner watermark for {}", P::NAME))?
.map(|w| (w.pruner_hi as u64) + 1)
.unwrap_or_default()
} else {
watermark
.as_ref()
.map(|w| w.checkpoint_hi_inclusive as u64 + 1)
.unwrap_or_default()
};

self.first_checkpoint_from_watermark =
expected_first_checkpoint.min(self.first_checkpoint_from_watermark);

Ok(Some(watermark))
}
Expand All @@ -409,3 +448,134 @@ impl Default for IndexerArgs {
}
}
}

#[cfg(test)]
mod tests {
use async_trait::async_trait;
use sui_field_count::FieldCount;
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;

use super::*;

#[derive(FieldCount)]
struct V {
_v: u64,
}

macro_rules! define_test_concurrent_pipeline {
($name:ident) => {
define_test_concurrent_pipeline!($name, false);
};
($name:ident, $pruning_requires_processed_values:expr) => {
struct $name;
impl Processor for $name {
const NAME: &'static str = stringify!($name);
type Value = V;
fn process(
&self,
_checkpoint: &Arc<CheckpointData>,
) -> anyhow::Result<Vec<Self::Value>> {
todo!()
}
}

#[async_trait]
impl concurrent::Handler for $name {
const PRUNING_REQUIRES_PROCESSED_VALUES: bool = $pruning_requires_processed_values;
async fn commit(
_values: &[Self::Value],
_conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
todo!()
}
}
};
}

define_test_concurrent_pipeline!(ConcurrentPipeline1);
define_test_concurrent_pipeline!(ConcurrentPipeline2);
define_test_concurrent_pipeline!(ConcurrentPipeline3, true);

#[tokio::test]
async fn test_add_new_pipeline() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if Indexer::new_for_testing is meant for just these tests, we could have a function that spawns an indexer in the test module rather than a pub fn in the source code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just because we have not written any tests that would start the indexer so far?

indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 0);
}

#[tokio::test]
async fn test_add_existing_pipeline() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
let watermark = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10);
watermark
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
}

#[tokio::test]
async fn test_add_multiple_pipelines() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
let watermark1 = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10);
watermark1
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
let watermark2 = CommitterWatermark::new_for_testing(ConcurrentPipeline2::NAME, 20);
watermark2
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();

indexer
.concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 21);
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
}

#[tokio::test]
async fn test_add_multiple_pipelines_pruning_requires_processed_values() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
let watermark1 = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10);
watermark1
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);

let watermark3 = CommitterWatermark::new_for_testing(ConcurrentPipeline3::NAME, 20);
watermark3
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
let pruner_watermark = PrunerWatermark::new_for_testing(ConcurrentPipeline3::NAME, 5);
assert!(pruner_watermark
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap());
indexer
.concurrent_pipeline(ConcurrentPipeline3, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 6);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also assert that the other pipeline is not affected

}
}
18 changes: 16 additions & 2 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ pub trait Handler: Processor<Value: FieldCount> {
/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;

/// Whether the pruner requires processed values in order to prune.
/// This will determine the first checkpoint to process when we start the pipeline.
/// If this is true, when the pipeline starts, it will process all checkpoints from the
/// pruner watermark, so that the pruner have access to the processed values for any unpruned
/// checkpoints.
/// If this is false, when the pipeline starts, it will process all checkpoints from the
/// committer watermark.
// TODO: There are two issues with this:
// 1. There is no static guarantee that this flag is set correctly when the pruner needs processed values.
// 2. The name is a bit abstract.
const PRUNING_REQUIRES_PROCESSED_VALUES: bool = false;

/// Take a chunk of values and commit them to the database, returning the number of rows
/// affected.
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>)
Expand All @@ -65,6 +77,7 @@ pub trait Handler: Processor<Value: FieldCount> {
/// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune(
&self,
_from: u64,
_to_exclusive: u64,
_conn: &mut db::Connection<'_>,
Expand Down Expand Up @@ -205,9 +218,10 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
// the global cancel signal. We achieve this by creating a child cancel token that we call
// cancel on once the committer tasks have shutdown.
let pruner_cancel = cancel.child_token();
let handler = Arc::new(handler);

let processor = processor(
handler,
handler.clone(),
checkpoint_rx,
processor_tx,
metrics.clone(),
Expand Down Expand Up @@ -250,7 +264,7 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
pruner_cancel.clone(),
);

let pruner = pruner::<H>(pruner_config, db, metrics, pruner_cancel.clone());
let pruner = pruner(handler, pruner_config, db, metrics, pruner_cancel.clone());

tokio::spawn(async move {
let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use super::{Handler, PrunerConfig};
///
/// The task will shutdown if the `cancel` token is signalled. If the `config` is `None`, the task
/// will shutdown immediately.
pub(super) fn pruner<H: Handler + 'static>(
pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
handler: Arc<H>,
config: Option<PrunerConfig>,
db: Db,
metrics: Arc<IndexerMetrics>,
Expand Down Expand Up @@ -135,7 +136,7 @@ pub(super) fn pruner<H: Handler + 'static>(
break;
};

let affected = match H::prune(from, to_exclusive, &mut conn).await {
let affected = match handler.prune(from, to_exclusive, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
watermark.pruner_hi = to_exclusive as i64;
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt-framework/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub trait Processor {
/// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters
/// an error -- there is no retry logic at this level.
pub(super) fn processor<P: Processor + Send + Sync + 'static>(
processor: P,
processor: Arc<P>,
rx: mpsc::Receiver<Arc<CheckpointData>>,
tx: mpsc::Sender<IndexedCheckpoint<P>>,
metrics: Arc<IndexerMetrics>,
Expand All @@ -57,7 +57,6 @@ pub(super) fn processor<P: Processor + Send + Sync + 'static>(
&metrics.latest_processed_checkpoint_timestamp_lag_ms,
&metrics.latest_processed_checkpoint,
);
let processor = Arc::new(processor);

match ReceiverStream::new(rx)
.try_for_each_spawned(P::FANOUT, |checkpoint| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);

let processor = processor(
handler,
Arc::new(handler),
checkpoint_rx,
processor_tx,
metrics.clone(),
Expand Down
21 changes: 21 additions & 0 deletions crates/sui-indexer-alt-framework/src/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ impl<'p> CommitterWatermark<'p> {
}
}

#[cfg(test)]
pub(crate) fn new_for_testing(pipeline: &'p str, checkpoint_hi_inclusive: u64) -> Self {
CommitterWatermark {
pipeline: pipeline.into(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
}
}
Comment on lines +105 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, we could have this in the test module instead of on the struct? idk how big of a deal/ difference it would be


/// The consensus timestamp associated with this checkpoint.
pub(crate) fn timestamp(&self) -> DateTime<Utc> {
DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive).unwrap_or_default()
Expand Down Expand Up @@ -185,6 +196,16 @@ impl PrunerWatermark<'static> {
}

impl<'p> PrunerWatermark<'p> {
#[cfg(test)]
pub(crate) fn new_for_testing(pipeline: &'p str, pruner_hi: u64) -> Self {
PrunerWatermark {
pipeline: pipeline.into(),
wait_for: 0,
reader_lo: 0,
pruner_hi: pruner_hi as i64,
}
}

/// How long to wait before the pruner can act on this information, or `None`, if there is no
/// need to wait.
pub(crate) fn wait_for(&self) -> Option<Duration> {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow.workspace = true
async-trait.workspace = true
bcs.workspace = true
clap.workspace = true
dashmap.workspace = true
diesel = { workspace = true, features = ["chrono"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
diesel_migrations.workspace = true
Expand Down
Loading
Loading