From 4ba4b29a0ca9c473e89a1ba5b01caf71c31c5c21 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Tue, 12 Nov 2024 12:47:34 -0500 Subject: [PATCH] [kv store] add timeout to bigtable ingestion + backfill mode (#20193) ## Description adds timeouts to the Bigtable ingestion pipeline and introduces a backfill mode that skips most progress updates to DynamoDB during backfilling --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-data-ingestion/src/main.rs | 12 +++++++++++- crates/sui-data-ingestion/src/progress_store.rs | 11 ++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index 71e49acb6f154..74fec8c31f2ea 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -6,6 +6,7 @@ use prometheus::Registry; use serde::{Deserialize, Serialize}; use std::env; use std::path::PathBuf; +use std::time::Duration; use sui_data_ingestion::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker, @@ -45,6 +46,7 @@ struct ProgressStoreConfig { #[derive(Serialize, Deserialize, Clone, Debug)] struct BigTableTaskConfig { instance_id: String, + timeout_secs: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -62,6 +64,8 @@ struct IndexerConfig { metrics_host: String, #[serde(default = "default_metrics_port")] metrics_port: u16, + #[serde(default)] + is_backfill: bool, } fn default_metrics_host() -> String { @@ -119,6 +123,7 @@ async fn main() -> Result<()> { &config.progress_store.aws_secret_access_key, config.progress_store.aws_region, config.progress_store.table_name, + config.is_backfill, ) .await; let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics); @@ -154,7 +159,12 @@ async fn main() -> Result<()> { executor.register(worker_pool).await?; } Task::BigTableKV(kv_config) => { - let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?; + let client = BigTableClient::new_remote( + kv_config.instance_id, + false, + Some(Duration::from_secs(kv_config.timeout_secs as u64)), + ) + .await?; let worker_pool = WorkerPool::new( KvWorker { client }, task_config.name, diff --git a/crates/sui-data-ingestion/src/progress_store.rs b/crates/sui-data-ingestion/src/progress_store.rs index b82d91c85b2ab..02857becfc626 100644 --- a/crates/sui-data-ingestion/src/progress_store.rs +++ b/crates/sui-data-ingestion/src/progress_store.rs @@ -16,6 +16,7 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; pub struct DynamoDBProgressStore { client: Client, table_name: String, + is_backfill: bool, } impl DynamoDBProgressStore { @@ -24,6 +25,7 @@ impl DynamoDBProgressStore { aws_secret_access_key: &str, aws_region: String, table_name: String, + is_backfill: bool, ) -> Self { let credentials = Credentials::new( aws_access_key_id, @@ -44,7 +46,11 @@ impl DynamoDBProgressStore { .load() .await; let client = Client::new(&aws_config); - Self { client, table_name } + Self { + client, + table_name, + is_backfill, + } } } @@ -70,6 +76,9 @@ impl ProgressStore for DynamoDBProgressStore { task_name: String, checkpoint_number: CheckpointSequenceNumber, ) -> Result<()> { + if self.is_backfill && checkpoint_number % 1000 != 0 { + return Ok(()); + } let backoff = backoff::ExponentialBackoff::default(); backoff::future::retry(backoff, || async { let result = self