Skip to content

Commit

Permalink
chore: preload current block number (#371)
Browse files Browse the repository at this point in the history
* chore: preload current block number

* lint
  • Loading branch information
renancloudwalk authored Mar 16, 2024
1 parent 837af39 commit abea843
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
# storage
sled = "0.34.7"
sqlx = { version = "0.7.3", features = ["runtime-tokio", "postgres", "bigdecimal", "time"] }
num-traits = "0.2.18"

# containers
testcontainers = "0.15.0"
Expand Down
19 changes: 17 additions & 2 deletions src/eth/storage/hybrid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use std::time::Duration;
use async_trait::async_trait;
use indexmap::IndexMap;
use metrics::atomics::AtomicU64;
use num_traits::cast::ToPrimitive;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
use serde_json::Value;
use sqlx::postgres::PgPoolOptions;
use sqlx::Pool;
use tokio::sync::mpsc;
use tokio::sync::mpsc::channel;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -95,13 +97,26 @@ impl HybridPermanentStorage {
Self::worker(task_receiver, worker_pool).await;
});

let block_number = Self::preload_block_number(connection_pool.clone()).await?;
let state = RwLock::new(HybridPermanentStorageState::default());

Ok(Self {
state: RwLock::new(HybridPermanentStorageState::default()),
block_number: Default::default(),
state,
block_number,
task_sender,
})
}

async fn preload_block_number(pool: Pool<sqlx::Postgres>) -> anyhow::Result<AtomicU64> {
let blocks = sqlx::query!("SELECT block_number FROM neo_blocks ORDER BY block_number DESC LIMIT 1")
.fetch_all(&pool)
.await?;

let last_block_number = blocks.last().map(|b| b.block_number.to_u64()).unwrap_or(Some(0)).unwrap_or(0);

Ok(last_block_number.into())
}

async fn worker(mut receiver: tokio::sync::mpsc::Receiver<BlockTask>, pool: Arc<sqlx::Pool<sqlx::Postgres>>) {
tracing::info!("Starting worker");
while let Some(block_task) = receiver.recv().await {
Expand Down

0 comments on commit abea843

Please sign in to comment.