Skip to content

Commit

Permalink
[kv store] add watermark table to bigtable (#20390)
Browse files Browse the repository at this point in the history
## Description 

The PR adds a watermark table to BigTable. The internal workflow
progress store now writes watermarks to both DynamoDB and BigTable. In
the future, the DynamoDB progress store will be deprecated

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Dec 12, 2024
1 parent 1488986 commit 7eadb13
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 17 deletions.
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod progress_store;
mod workers;

pub use progress_store::DynamoDBProgressStore;
pub use progress_store::IngestionWorkflowsProgressStore;
pub use workers::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig,
KVStoreWorker,
Expand Down
19 changes: 17 additions & 2 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
IngestionWorkflowsProgressStore, KVStoreTaskConfig, KVStoreWorker,
};
use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions};
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
Expand Down Expand Up @@ -119,12 +119,27 @@ async fn main() -> Result<()> {
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);

let progress_store = DynamoDBProgressStore::new(
let mut bigtable_client = None;
for task in &config.tasks {
if let Task::BigTableKV(kv_config) = &task.task {
bigtable_client = Some(
BigTableClient::new_remote(
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?,
);
}
}

let progress_store = IngestionWorkflowsProgressStore::new(
&config.progress_store.aws_access_key_id,
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
bigtable_client,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down
15 changes: 12 additions & 3 deletions crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ use aws_sdk_s3::config::{Credentials, Region};
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::ProgressStore;
use sui_kvstore::{BigTableClient, KeyValueStoreWriter};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct DynamoDBProgressStore {
pub struct IngestionWorkflowsProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
bigtable_client: Option<BigTableClient>,
}

impl DynamoDBProgressStore {
impl IngestionWorkflowsProgressStore {
pub async fn new(
aws_access_key_id: &str,
aws_secret_access_key: &str,
aws_region: String,
table_name: String,
is_backfill: bool,
bigtable_client: Option<BigTableClient>,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -50,12 +53,13 @@ impl DynamoDBProgressStore {
client,
table_name,
is_backfill,
bigtable_client,
}
}
}

#[async_trait]
impl ProgressStore for DynamoDBProgressStore {
impl ProgressStore for IngestionWorkflowsProgressStore {
async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
let item = self
.client
Expand All @@ -79,6 +83,11 @@ impl ProgressStore for DynamoDBProgressStore {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
if let Some(ref mut bigtable_client) = self.bigtable_client {
bigtable_client
.save_watermark(&task_name, checkpoint_number)
.await?;
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down
49 changes: 39 additions & 10 deletions crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ const OBJECTS_TABLE: &str = "objects";
const TRANSACTIONS_TABLE: &str = "transactions";
const CHECKPOINTS_TABLE: &str = "checkpoints";
const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest";
const WATERMARK_TABLE: &str = "watermark";

const COLUMN_FAMILY_NAME: &str = "sui";
const DEFAULT_COLUMN_QUALIFIER: &str = "";
const AGGREGATED_WATERMARK_NAME: &str = "bigtable";
const CHECKPOINT_SUMMARY_COLUMN_QUALIFIER: &str = "s";
const CHECKPOINT_SIGNATURES_COLUMN_QUALIFIER: &str = "sg";
const CHECKPOINT_CONTENTS_COLUMN_QUALIFIER: &str = "c";
Expand Down Expand Up @@ -131,6 +133,21 @@ impl KeyValueStoreWriter for BigTableClient {
)
.await
}

async fn save_watermark(
&mut self,
name: &str,
watermark: CheckpointSequenceNumber,
) -> Result<()> {
let key = name.as_bytes().to_vec();
let value = watermark.to_be_bytes().to_vec();
self.multi_set_with_timestamp(
WATERMARK_TABLE,
[(key, vec![(DEFAULT_COLUMN_QUALIFIER, value)])],
watermark as i64,
)
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -237,15 +254,7 @@ impl KeyValueStoreReader for BigTableClient {
}

async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber> {
let upper_limit = u64::MAX.to_be_bytes().to_vec();
match self
.reversed_scan(CHECKPOINTS_TABLE, upper_limit)
.await?
.pop()
{
Some((key_bytes, _)) => Ok(u64::from_be_bytes(key_bytes.as_slice().try_into()?)),
None => Ok(0),
}
self.get_watermark(AGGREGATED_WATERMARK_NAME).await
}

async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>> {
Expand All @@ -257,6 +266,17 @@ impl KeyValueStoreReader for BigTableClient {
}
Ok(None)
}

async fn get_watermark(&mut self, watermark_name: &str) -> Result<CheckpointSequenceNumber> {
let key = watermark_name.as_bytes().to_vec();
let mut response = self.multi_get(WATERMARK_TABLE, vec![key]).await?;
if let Some(row) = response.pop() {
if let Some((_, value)) = row.into_iter().next() {
return Ok(u64::from_be_bytes(value.as_slice().try_into()?));
}
}
Ok(0)
}
}

impl BigTableClient {
Expand Down Expand Up @@ -382,6 +402,15 @@ impl BigTableClient {
&mut self,
table_name: &str,
values: impl IntoIterator<Item = (Bytes, Vec<(&str, Bytes)>)> + std::marker::Send,
) -> Result<()> {
self.multi_set_with_timestamp(table_name, values, -1).await
}

async fn multi_set_with_timestamp(
&mut self,
table_name: &str,
values: impl IntoIterator<Item = (Bytes, Vec<(&str, Bytes)>)> + std::marker::Send,
timestamp: i64,
) -> Result<()> {
let mut entries = vec![];
for (row_key, cells) in values {
Expand All @@ -393,7 +422,7 @@ impl BigTableClient {
column_qualifier: column_name.to_owned().into_bytes(),
// The timestamp of the cell into which new data should be written.
// Use -1 for current Bigtable server time.
timestamp_micros: -1,
timestamp_micros: timestamp,
value,
})),
})
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-kvstore/src/bigtable/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
command+=(-project emulator)
fi

for table in objects transactions checkpoints checkpoints_by_digest; do
for table in objects transactions checkpoints checkpoints_by_digest watermark; do
(
set -x
"${command[@]}" createtable $table
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ pub trait KeyValueStoreReader {
) -> Result<Option<Checkpoint>>;
async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber>;
async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>>;
async fn get_watermark(&mut self, task_name: &str) -> Result<u64>;
}

#[async_trait]
pub trait KeyValueStoreWriter {
async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>;
async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>;
async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>;
async fn save_watermark(
&mut self,
name: &str,
watermark: CheckpointSequenceNumber,
) -> Result<()>;
}

#[derive(Clone, Debug)]
Expand Down

0 comments on commit 7eadb13

Please sign in to comment.