Skip to content

Commit

Permalink
[kv store] add timeout to bigtable ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Nov 8, 2024
1 parent 7758e33 commit 26344f7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
3 changes: 3 additions & 0 deletions crates/sui-data-ingestion-core/src/progress_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ impl<P: ProgressStore> ProgressStore for ProgressStoreWrapper<P> {
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> Result<()> {
if checkpoint_number % 1000 != 0 {
return Ok(());
}
self.progress_store
.save(task_name.clone(), checkpoint_number)
.await?;
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use prometheus::Registry;
use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
Expand Down Expand Up @@ -45,6 +46,7 @@ struct ProgressStoreConfig {
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BigTableTaskConfig {
instance_id: String,
timeout_secs: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -154,7 +156,12 @@ async fn main() -> Result<()> {
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?;
let client = BigTableClient::new_remote(
kv_config.instance_id,
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?;
let worker_pool = WorkerPool::new(
KvWorker { client },
task_config.name,
Expand Down

0 comments on commit 26344f7

Please sign in to comment.