From 26344f7229fd83e95360f91ad6f9b1816391adb6 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:17:48 -0800 Subject: [PATCH] [kv store] add timeout to bigtable ingestion --- crates/sui-data-ingestion-core/src/progress_store/mod.rs | 3 +++ crates/sui-data-ingestion/src/main.rs | 9 ++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/sui-data-ingestion-core/src/progress_store/mod.rs b/crates/sui-data-ingestion-core/src/progress_store/mod.rs index c7358eb1175804..4f0023e4095029 100644 --- a/crates/sui-data-ingestion-core/src/progress_store/mod.rs +++ b/crates/sui-data-ingestion-core/src/progress_store/mod.rs @@ -38,6 +38,9 @@ impl ProgressStore for ProgressStoreWrapper

{ task_name: String, checkpoint_number: CheckpointSequenceNumber, ) -> Result<()> { + if checkpoint_number % 1000 != 0 { + return Ok(()); + } self.progress_store .save(task_name.clone(), checkpoint_number) .await?; diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index 71e49acb6f1547..261a5c2b65c9ed 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -6,6 +6,7 @@ use prometheus::Registry; use serde::{Deserialize, Serialize}; use std::env; use std::path::PathBuf; +use std::time::Duration; use sui_data_ingestion::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker, @@ -45,6 +46,7 @@ struct ProgressStoreConfig { #[derive(Serialize, Deserialize, Clone, Debug)] struct BigTableTaskConfig { instance_id: String, + timeout_secs: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -154,7 +156,12 @@ async fn main() -> Result<()> { executor.register(worker_pool).await?; } Task::BigTableKV(kv_config) => { - let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?; + let client = BigTableClient::new_remote( + kv_config.instance_id, + false, + Some(Duration::from_secs(kv_config.timeout_secs as u64)), + ) + .await?; let worker_pool = WorkerPool::new( KvWorker { client }, task_config.name,