Skip to content

Commit

Permalink
[bridge-indexer] move current checkpoint metric to retrieval stage (#…
Browse files Browse the repository at this point in the history
…19570)

## Description 

Currently indexer reports `current_checkpoint` when saving progress to
DB. This makes the metrics a bit out of state when we cache the metrics
in memory. This PR:
1. moves the metrics to data retrieval stage,
2. rename it to `latest_retrieved_checkpoints`
3. in EthSubscription task, periodically fetch the latest block height
and update this metric

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
longbowlu authored Sep 28, 2024
1 parent 2b1a4e8 commit 3864dca
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 106 deletions.
206 changes: 126 additions & 80 deletions crates/sui-bridge-indexer/src/eth_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ use anyhow::Error;
use async_trait::async_trait;
use ethers::prelude::Transaction;
use ethers::providers::{Http, Middleware, Provider, StreamExt, Ws};
use ethers::types::{Address as EthAddress, Block, Filter, H256};
use prometheus::{IntCounterVec, IntGaugeVec};
use ethers::types::{Address as EthAddress, Block, Filter, Log, H256};
use prometheus::{IntCounterVec, IntGauge, IntGaugeVec};
use sui_bridge::error::BridgeError;
use sui_bridge::eth_client::EthClient;
use sui_bridge::eth_syncer::EthSyncer;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use sui_bridge::retry_with_max_elapsed_time;
use sui_indexer_builder::Task;
use tap::tap::TapFallible;
use tokio::select;
use tokio::task::JoinHandle;
use tracing::{info, warn};

Expand Down Expand Up @@ -90,86 +91,37 @@ impl Datasource<RawEthData> for EthSubscriptionDatasource {

let eth_ws_url = self.eth_ws_url.clone();
let task_name = task.task_name.clone();
let task_name_clone = task_name.clone();
let progress_metric = self
.indexer_metrics
.tasks_latest_retrieved_checkpoints
.with_label_values(&[task.name_prefix(), task.type_str()]);
let handle = spawn_monitored_task!(async move {
let eth_ws_client = Provider::<Ws>::connect(&eth_ws_url).await.tap_err(|e| {
tracing::error!("Failed to connect to websocket: {:?}", e);
})?;

// TODO: enable a shared cache for blocks that can be used by both the subscription and finalized sync
let mut cached_blocks: HashMap<u64, Block<H256>> = HashMap::new();

let mut stream = eth_ws_client.subscribe_logs(&filter).await.tap_err(|e| {
let mut log_stream = eth_ws_client.subscribe_logs(&filter).await.tap_err(|e| {
tracing::error!("Failed to subscribe logs: {:?}", e);
})?;
while let Some(log) = stream.next().await {
tracing::info!(
task_name,
"EthSubscriptionDatasource retrieved log: {:?}",
log
);
let raw_log = RawEthLog {
block_number: log
.block_number
.ok_or(BridgeError::ProviderError(
"Provider returns log without block_number".into(),
))
.unwrap()
.as_u64(),
tx_hash: log
.transaction_hash
.ok_or(BridgeError::ProviderError(
"Provider returns log without transaction_hash".into(),
))
.unwrap(),
log,
};

let block_number = raw_log.block_number();

let block = if let Some(cached_block) = cached_blocks.get(&block_number) {
cached_block.clone()
} else {
let Ok(Ok(Some(block))) = retry_with_max_elapsed_time!(
eth_ws_client.get_block(block_number),
Duration::from_secs(30000)
) else {
panic!("Unable to get block from provider");
};

cached_blocks.insert(block_number, block.clone());
block
};

let Ok(Ok(Some(transaction))) = retry_with_max_elapsed_time!(
eth_ws_client.get_transaction(raw_log.tx_hash),
Duration::from_secs(30000)
) else {
panic!("Unable to get transaction from provider");
};
tracing::info!(
task_name,
"Sending data: {:?}",
(raw_log.tx_hash, block_number)
);
let raw_eth_data = vec![RawEthData {
log: raw_log,
block,
transaction,
is_finalized: false,
}];
data_sender
.send((block_number, raw_eth_data))
.await
.unwrap_or_else(|e| {
tracing::error!(
task_name,
"Failed to send data from EthSubscriptionDatasource: {:?}",
e
);
});
// Check latest block height every 5 sec
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
select! {
log = log_stream.next() => {
if let Some(log) = log {
Self::handle_log(&task_name_clone, log, &eth_ws_client, &data_sender).await;
} else {
panic!("EthSubscriptionDatasource log stream ended unexpectedly");
}
}
_ = interval.tick() => {
let latest_block = eth_ws_client.get_block_number().await?.as_u64();
progress_metric.set(latest_block as i64);
}
}
}
// We do not expect EthSubscriptionDatasource live task to exit
panic!("EthSubscriptionDatasource stream ended unexpectedly");
});
Ok(handle)
}
Expand Down Expand Up @@ -198,6 +150,83 @@ impl Datasource<RawEthData> for EthSubscriptionDatasource {
}
}

impl EthSubscriptionDatasource {
async fn handle_log(
task_name: &str,
log: Log,
eth_ws_client: &Provider<Ws>,
data_sender: &DataSender<RawEthData>,
) {
tracing::info!(
task_name,
"EthSubscriptionDatasource retrieved log: {:?}",
log
);
// TODO: enable a shared cache for blocks that can be used by both the subscription and finalized sync
let mut cached_blocks: HashMap<u64, Block<H256>> = HashMap::new();
let raw_log = RawEthLog {
block_number: log
.block_number
.ok_or(BridgeError::ProviderError(
"Provider returns log without block_number".into(),
))
.unwrap()
.as_u64(),
tx_hash: log
.transaction_hash
.ok_or(BridgeError::ProviderError(
"Provider returns log without transaction_hash".into(),
))
.unwrap(),
log,
};

let block_number = raw_log.block_number();

let block = if let Some(cached_block) = cached_blocks.get(&block_number) {
cached_block.clone()
} else {
let Ok(Ok(Some(block))) = retry_with_max_elapsed_time!(
eth_ws_client.get_block(block_number),
Duration::from_secs(30000)
) else {
panic!("Unable to get block from provider");
};

cached_blocks.insert(block_number, block.clone());
block
};

let Ok(Ok(Some(transaction))) = retry_with_max_elapsed_time!(
eth_ws_client.get_transaction(raw_log.tx_hash),
Duration::from_secs(30000)
) else {
panic!("Unable to get transaction from provider");
};
tracing::info!(
task_name,
"Sending data from EthSubscriptionDatasource: {:?}",
(raw_log.tx_hash, block_number)
);
let raw_eth_data = vec![RawEthData {
log: raw_log,
block,
transaction,
is_finalized: false,
}];
data_sender
.send((block_number, raw_eth_data))
.await
.unwrap_or_else(|e| {
tracing::error!(
task_name,
"Failed to send data from EthSubscriptionDatasource: {:?}",
e
);
});
}
}

pub struct EthFinalizedSyncDatasource {
bridge_addresses: Vec<EthAddress>,
eth_http_url: String,
Expand Down Expand Up @@ -237,6 +266,10 @@ impl Datasource<RawEthData> for EthFinalizedSyncDatasource {
Provider::<Http>::try_from(&self.eth_http_url)?
.interval(std::time::Duration::from_millis(2000)),
);
let progress_metric = self
.indexer_metrics
.tasks_latest_retrieved_checkpoints
.with_label_values(&[task.name_prefix(), task.type_str()]);
let bridge_addresses = self.bridge_addresses.clone();
let client = self.eth_client.clone();
let provider = provider.clone();
Expand All @@ -250,6 +283,7 @@ impl Datasource<RawEthData> for EthFinalizedSyncDatasource {
bridge_addresses,
data_sender,
bridge_metrics,
progress_metric,
)
.await?;
} else {
Expand All @@ -259,6 +293,7 @@ impl Datasource<RawEthData> for EthFinalizedSyncDatasource {
provider,
bridge_addresses,
data_sender,
progress_metric,
)
.await?;
}
Expand Down Expand Up @@ -299,6 +334,7 @@ async fn loop_retrieve_and_process_live_finalized_logs(
addresses: Vec<EthAddress>,
data_sender: DataSender<RawEthData>,
bridge_metrics: Arc<BridgeMetrics>,
progress_metric: IntGauge,
) -> Result<(), Error> {
let task_name = task.task_name.clone();
let starting_checkpoint = task.start_checkpoint;
Expand All @@ -307,13 +343,13 @@ async fn loop_retrieve_and_process_live_finalized_logs(
.iter()
.map(|address| (*address, starting_checkpoint)),
);

let (_, mut eth_events_rx, _) = EthSyncer::new(client.clone(), eth_contracts_to_watch)
.run(bridge_metrics.clone())
.await
.expect("Failed to start eth syncer");

// forward received events to the data sender
// EthSyncer sends items even when there is no matching events.
// We leverge this to update the progress metric.
while let Some((_, block, logs)) = eth_events_rx.recv().await {
let raw_logs: Vec<RawEthLog> = logs
.into_iter()
Expand All @@ -334,6 +370,7 @@ async fn loop_retrieve_and_process_live_finalized_logs(
)
.await
.expect("Failed to process logs");
progress_metric.set(block as i64);
}

panic!("Eth finalized syncer live task stopped unexpectedly");
Expand All @@ -345,6 +382,7 @@ async fn loop_retrieve_and_process_log_range(
provider: Arc<Provider<Http>>,
addresses: Vec<EthAddress>,
data_sender: DataSender<RawEthData>,
progress_metric: IntGauge,
) -> Result<(), Error> {
let task_name = task.task_name.clone();
let starting_checkpoint = task.start_checkpoint;
Expand Down Expand Up @@ -388,7 +426,9 @@ async fn loop_retrieve_and_process_log_range(
})
.tap_err(|e| {
tracing::error!(task_name, "Failed to process logs: {:?}", e);
})?;
})
.expect("Process logs should not fail");
progress_metric.set(target_checkpoint as i64);
Ok::<_, Error>(())
}

Expand Down Expand Up @@ -437,9 +477,15 @@ async fn process_logs(
.iter()
.map(|data| (data.log.tx_hash, data.block.number.map(|n| n.as_u64())))
.collect::<Vec<(H256, Option<u64>)>>();
tracing::info!(task_name, "Sending data: {:?}", tx_hashes);
data_sender.send((block_height, data)).await?;

tracing::info!(
task_name,
"Sending data from EthFinalizedSyncDatasource: {:?}",
tx_hashes
);
data_sender
.send((block_height, data))
.await
.expect("Failed to send data");
Ok::<_, Error>(())
}

Expand Down
2 changes: 0 additions & 2 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,12 @@ async fn main() -> Result<()> {
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
indexer_meterics.clone(),
);
let datastore_with_out_of_order_source = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
indexer_meterics.clone(),
);

let eth_client: Arc<EthClient<MeteredEthHttpProvier>> = Arc::new(
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-bridge-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct BridgeIndexerMetrics {
pub(crate) last_committed_sui_checkpoint: IntGauge,
pub(crate) backfill_tasks_remaining_checkpoints: IntGaugeVec,
pub(crate) tasks_processed_checkpoints: IntCounterVec,
pub(crate) tasks_current_checkpoints: IntGaugeVec,
pub(crate) tasks_latest_retrieved_checkpoints: IntGaugeVec,
pub(crate) inflight_live_tasks: IntGaugeVec,
}

Expand Down Expand Up @@ -95,9 +95,9 @@ impl BridgeIndexerMetrics {
registry,
)
.unwrap(),
tasks_current_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_indexer_tasks_current_checkpoints",
"Current checkpoint for each task",
tasks_latest_retrieved_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_indexer_tasks_latest_retrieved_checkpoints",
"latest retrieved checkpoint for each task",
&["task_name", "task_type"],
registry,
)
Expand Down
15 changes: 1 addition & 14 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;

use crate::metrics::BridgeIndexerMetrics;
use crate::postgres_manager::PgPool;
use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
Expand All @@ -27,19 +26,13 @@ use sui_indexer_builder::{
pub struct PgBridgePersistent {
pool: PgPool,
save_progress_policy: ProgressSavingPolicy,
indexer_metrics: BridgeIndexerMetrics,
}

impl PgBridgePersistent {
pub fn new(
pool: PgPool,
save_progress_policy: ProgressSavingPolicy,
indexer_metrics: BridgeIndexerMetrics,
) -> Self {
pub fn new(pool: PgPool, save_progress_policy: ProgressSavingPolicy) -> Self {
Self {
pool,
save_progress_policy,
indexer_metrics,
}
}
}
Expand Down Expand Up @@ -176,8 +169,6 @@ impl IndexerProgressStore for PgBridgePersistent {
return Ok(None);
}
let task_name = task.task_name.clone();
let task_name_prefix = task.name_prefix();
let task_type_label = task.type_str();
if let Some(checkpoint_to_save) = self
.save_progress_policy
.cache_progress(task, checkpoint_numbers)
Expand All @@ -200,10 +191,6 @@ impl IndexerProgressStore for PgBridgePersistent {
))
.execute(&mut conn)
.await?;
self.indexer_metrics
.tasks_current_checkpoints
.with_label_values(&[task_name_prefix, task_type_label])
.set(checkpoint_to_save as i64);
return Ok(Some(checkpoint_to_save));
}
Ok(None)
Expand Down
Loading

0 comments on commit 3864dca

Please sign in to comment.