diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index 71e49acb6f154..74fec8c31f2ea 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -6,6 +6,7 @@ use prometheus::Registry; use serde::{Deserialize, Serialize}; use std::env; use std::path::PathBuf; +use std::time::Duration; use sui_data_ingestion::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker, @@ -45,6 +46,7 @@ struct ProgressStoreConfig { #[derive(Serialize, Deserialize, Clone, Debug)] struct BigTableTaskConfig { instance_id: String, + timeout_secs: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -62,6 +64,8 @@ struct IndexerConfig { metrics_host: String, #[serde(default = "default_metrics_port")] metrics_port: u16, + #[serde(default)] + is_backfill: bool, } fn default_metrics_host() -> String { @@ -119,6 +123,7 @@ async fn main() -> Result<()> { &config.progress_store.aws_secret_access_key, config.progress_store.aws_region, config.progress_store.table_name, + config.is_backfill, ) .await; let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics); @@ -154,7 +159,12 @@ async fn main() -> Result<()> { executor.register(worker_pool).await?; } Task::BigTableKV(kv_config) => { - let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?; + let client = BigTableClient::new_remote( + kv_config.instance_id, + false, + Some(Duration::from_secs(kv_config.timeout_secs as u64)), + ) + .await?; let worker_pool = WorkerPool::new( KvWorker { client }, task_config.name, diff --git a/crates/sui-data-ingestion/src/progress_store.rs b/crates/sui-data-ingestion/src/progress_store.rs index b82d91c85b2ab..02857becfc626 100644 --- a/crates/sui-data-ingestion/src/progress_store.rs +++ b/crates/sui-data-ingestion/src/progress_store.rs @@ -16,6 +16,7 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; pub struct DynamoDBProgressStore { client: Client, table_name: String, + is_backfill: bool, } impl DynamoDBProgressStore { @@ -24,6 +25,7 @@ impl DynamoDBProgressStore { aws_secret_access_key: &str, aws_region: String, table_name: String, + is_backfill: bool, ) -> Self { let credentials = Credentials::new( aws_access_key_id, @@ -44,7 +46,11 @@ impl DynamoDBProgressStore { .load() .await; let client = Client::new(&aws_config); - Self { client, table_name } + Self { + client, + table_name, + is_backfill, + } } } @@ -70,6 +76,9 @@ impl ProgressStore for DynamoDBProgressStore { task_name: String, checkpoint_number: CheckpointSequenceNumber, ) -> Result<()> { + if self.is_backfill && checkpoint_number % 1000 != 0 { + return Ok(()); + } let backoff = backoff::ExponentialBackoff::default(); backoff::future::retry(backoff, || async { let result = self