Skip to content

Commit

Permalink
[kv store] add timeout to bigtable ingestion + backfill mode
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Nov 12, 2024
1 parent 7758e33 commit b364e4d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
12 changes: 11 additions & 1 deletion crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use prometheus::Registry;
use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
Expand Down Expand Up @@ -45,6 +46,7 @@ struct ProgressStoreConfig {
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BigTableTaskConfig {
instance_id: String,
timeout_secs: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -62,6 +64,8 @@ struct IndexerConfig {
metrics_host: String,
#[serde(default = "default_metrics_port")]
metrics_port: u16,
#[serde(default)]
is_backfill: bool,
}

fn default_metrics_host() -> String {
Expand Down Expand Up @@ -119,6 +123,7 @@ async fn main() -> Result<()> {
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down Expand Up @@ -154,7 +159,12 @@ async fn main() -> Result<()> {
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?;
let client = BigTableClient::new_remote(
kv_config.instance_id,
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?;
let worker_pool = WorkerPool::new(
KvWorker { client },
task_config.name,
Expand Down
11 changes: 10 additions & 1 deletion crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber;
pub struct DynamoDBProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
}

impl DynamoDBProgressStore {
Expand All @@ -24,6 +25,7 @@ impl DynamoDBProgressStore {
aws_secret_access_key: &str,
aws_region: String,
table_name: String,
is_backfill: bool,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -44,7 +46,11 @@ impl DynamoDBProgressStore {
.load()
.await;
let client = Client::new(&aws_config);
Self { client, table_name }
Self {
client,
table_name,
is_backfill,
}
}
}

Expand All @@ -70,6 +76,9 @@ impl ProgressStore for DynamoDBProgressStore {
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> Result<()> {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down

0 comments on commit b364e4d

Please sign in to comment.