Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into storage-proofs
Browse files Browse the repository at this point in the history
  • Loading branch information
cchudant committed Dec 2, 2024
2 parents df0e376 + 73691f4 commit 8be1356
Show file tree
Hide file tree
Showing 47 changed files with 1,349 additions and 429 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Next release

- feat(v0.8.0-rc0): storage proofs for rpc version v0.8.0
- feat(warp): added warp update to madara
- docs(readme): updated README.md docs and added Docker Compose support
- fix(log): define RUST_LOG=info by default
- fix(tracing): RUST_LOG filtering support
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Madara is a powerful Starknet client written in Rust.
- [Starknet Compliant](#starknet-compliant)
- [Feeder-Gateway State Synchronization](#feeder-gateway-state-synchronization)
- [State Commitment Computation](#state-commitment-computation)
- [Database Migration](#database-migration)
- 💬 [Get in touch](#-get-in-touch)
- [Contributing](#contributing)
- [Partnerships](#partnerships)
Expand Down Expand Up @@ -573,6 +574,54 @@ Besu Bonsai Merkle Tries. See the [bonsai lib](https://github.com/madara-allianc
You can read more about Starknet Block structure and how it affects state
commitment [here](https://docs.starknet.io/architecture-and-concepts/network-architecture/block-structure/).
### Database Migration
When migration to a newer version of Madara you might need to update your
database. Instead of re-synchronizing the entirety of your chain's state from
genesis, you can use Madara's **warp update** feature.
> [!NOTE]
> Warp update requires an already synchronized _local_ node with a working
> database.
To begin the database migration, you will need to start an existing node with
[admin methods](#madara-specific-json-rpc-methods) and
[feeder gateway](#feeder-gateway-state-synchronization) enabled. This will be
the _source_ of the migration. You can do this with the `--warp-update-sender`
[preset](#4.-presets):
```bash
cargo run --releasae -- \
--name Sender \
--full \ # This also works with other types of nodes
--network mainnet \
--warp-update-sender
```
You will then need to start a second node to synchronize the state of your
database:
```bash
cargo run --releasae -- \
--name Receiver \
--base-path /tmp/madara_new \ # Where you want the new database to be stored
--full \
--network mainnet \
--l1-endpoint https://*** \
--warp-update-receiver
```
This will start generating a new up-to-date database under `/tmp/madara_new`.
Once this process is over, the warp update sender node will automatically
shutdown while the warp update receiver will take its place.
> [!WARNING]
> As of now, the warp update receiver has its rpc disabled, even after the
> migration process has completed. This will be fixed in the future, so that
> services that would otherwise conflict with the sender node will automatically
> start after the migration has finished, allowing for migrations with 0
> downtime.
## 💬 Get in touch
[⬅️ back to top](#-madara-starknet-client)
Expand Down
13 changes: 1 addition & 12 deletions crates/client/block_import/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,11 @@ pub struct BlockImporter {
backend: Arc<MadaraBackend>,
verify_apply: VerifyApply,
metrics: BlockMetrics,
always_force_flush: bool,
}

impl BlockImporter {
/// The starting block is used for metrics. Setting it to None means it will look at the database latest block number.
pub fn new(
backend: Arc<MadaraBackend>,
starting_block: Option<u64>,
always_force_flush: bool,
) -> anyhow::Result<Self> {
pub fn new(backend: Arc<MadaraBackend>, starting_block: Option<u64>) -> anyhow::Result<Self> {
let pool = Arc::new(RayonPool::new());
let starting_block = if let Some(n) = starting_block {
n
Expand All @@ -145,7 +140,6 @@ impl BlockImporter {
pool,
metrics: BlockMetrics::register(starting_block).context("Registering metrics for block import")?,
backend,
always_force_flush,
})
}

Expand Down Expand Up @@ -176,11 +170,6 @@ impl BlockImporter {
validation: BlockValidationContext,
) -> Result<BlockImportResult, BlockImportError> {
let result = self.verify_apply.verify_apply(block, validation).await?;
// Flush step.
let force = self.always_force_flush;
self.backend
.maybe_flush(force)
.map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?;
self.metrics.update(&result.header, &self.backend);
Ok(result)
}
Expand Down
57 changes: 24 additions & 33 deletions crates/client/db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
//! Madara database
use anyhow::{Context, Result};
use anyhow::Context;
use block_db::get_latest_block_n;
use bonsai_db::{BonsaiDb, DatabaseKeyMapping};
use bonsai_trie::{BonsaiStorage, BonsaiStorageConfig};
use db_metrics::DbMetrics;
use mp_chain_config::ChainConfig;
use mp_utils::service::Service;
use mp_utils::service::{MadaraService, Service};
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, Env, FlushOptions, MultiThreaded};
use rocksdb_options::rocksdb_global_options;
use snapshots::Snapshots;
use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::sync::Arc;
use std::{fmt, fs};
use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -43,7 +42,7 @@ pub type WriteBatchWithTransaction = rocksdb::WriteBatchWithTransaction<false>;

const DB_UPDATES_BATCH_SIZE: usize = 1024;

pub fn open_rocksdb(path: &Path) -> Result<Arc<DB>> {
pub fn open_rocksdb(path: &Path) -> anyhow::Result<Arc<DB>> {
let opts = rocksdb_global_options()?;
tracing::debug!("opening db at {:?}", path.display());
let db = DB::open_cf_descriptors(
Expand All @@ -55,14 +54,14 @@ pub fn open_rocksdb(path: &Path) -> Result<Arc<DB>> {
Ok(Arc::new(db))
}

/// This runs in anothr thread as the backup engine is not thread safe
/// This runs in another thread as the backup engine is not thread safe
fn spawn_backup_db_task(
backup_dir: &Path,
restore_from_latest_backup: bool,
db_path: &Path,
db_restored_cb: oneshot::Sender<()>,
mut recv: mpsc::Receiver<BackupRequest>,
) -> Result<()> {
) -> anyhow::Result<()> {
let mut backup_opts = BackupEngineOptions::new(backup_dir).context("Creating backup options")?;
let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1);
backup_opts.set_max_background_operations(cores);
Expand Down Expand Up @@ -264,7 +263,6 @@ impl Default for TrieLogConfig {
pub struct MadaraBackend {
backup_handle: Option<mpsc::Sender<BackupRequest>>,
db: Arc<DB>,
last_flush_time: Mutex<Option<Instant>>,
chain_config: Arc<ChainConfig>,
db_metrics: DbMetrics,
snapshots: Arc<Snapshots>,
Expand Down Expand Up @@ -323,7 +321,11 @@ impl DatabaseService {
}
}

impl Service for DatabaseService {}
impl Service for DatabaseService {
fn id(&self) -> MadaraService {
MadaraService::Database
}
}

struct BackupRequest {
callback: oneshot::Sender<()>,
Expand All @@ -333,7 +335,7 @@ struct BackupRequest {
impl Drop for MadaraBackend {
fn drop(&mut self) {
tracing::info!("⏳ Gracefully closing the database...");
self.maybe_flush(true).expect("Error when flushing the database"); // flush :)
self.flush().expect("Error when flushing the database"); // flush :)
}
}

Expand All @@ -350,7 +352,6 @@ impl MadaraBackend {
Arc::new(Self {
backup_handle: None,
db,
last_flush_time: Default::default(),
chain_config,
db_metrics: DbMetrics::register().unwrap(),
snapshots,
Expand All @@ -367,7 +368,7 @@ impl MadaraBackend {
restore_from_latest_backup: bool,
chain_config: Arc<ChainConfig>,
trie_log_config: TrieLogConfig,
) -> Result<Arc<MadaraBackend>> {
) -> anyhow::Result<Arc<MadaraBackend>> {
let db_path = db_config_dir.join("db");

// when backups are enabled, a thread is spawned that owns the rocksdb BackupEngine (it is not thread safe) and it receives backup requests using a mpsc channel
Expand Down Expand Up @@ -404,7 +405,6 @@ impl MadaraBackend {
db_metrics: DbMetrics::register().context("Registering db metrics")?,
backup_handle,
db,
last_flush_time: Default::default(),
chain_config: Arc::clone(&chain_config),
snapshots,
trie_log_config,
Expand All @@ -417,30 +417,21 @@ impl MadaraBackend {
Ok(backend)
}

pub fn maybe_flush(&self, force: bool) -> Result<bool> {
let mut inst = self.last_flush_time.lock().expect("poisoned mutex");
let will_flush = force
|| match *inst {
Some(inst) => inst.elapsed() >= Duration::from_secs(5),
None => true,
};
if will_flush {
tracing::debug!("doing a db flush");
let mut opts = FlushOptions::default();
opts.set_wait(true);
// we have to collect twice here :/
let columns = Column::ALL.iter().map(|e| self.db.get_column(*e)).collect::<Vec<_>>();
let columns = columns.iter().collect::<Vec<_>>();
self.db.flush_cfs_opt(&columns, &opts).context("Flushing database")?;

*inst = Some(Instant::now());
}
pub fn flush(&self) -> anyhow::Result<()> {
tracing::debug!("doing a db flush");
let mut opts = FlushOptions::default();
opts.set_wait(true);
// we have to collect twice here :/
let columns = Column::ALL.iter().map(|e| self.db.get_column(*e)).collect::<Vec<_>>();
let columns = columns.iter().collect::<Vec<_>>();

Ok(will_flush)
self.db.flush_cfs_opt(&columns, &opts).context("Flushing database")?;

Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn backup(&self) -> Result<()> {
pub async fn backup(&self) -> anyhow::Result<()> {
let (callback_sender, callback_recv) = oneshot::channel();
let _res = self
.backup_handle
Expand Down
2 changes: 1 addition & 1 deletion crates/client/devnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ mod tests {
let chain_config = Arc::new(ChainConfig::madara_devnet());
let block = g.build(&chain_config).unwrap();
let backend = MadaraBackend::open_for_testing(Arc::clone(&chain_config));
let importer = Arc::new(BlockImporter::new(Arc::clone(&backend), None, true).unwrap());
let importer = Arc::new(BlockImporter::new(Arc::clone(&backend), None).unwrap());

println!("{:?}", block.state_diff);
tokio::runtime::Runtime::new()
Expand Down
1 change: 1 addition & 0 deletions crates/client/eth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ httpmock = { workspace = true }
tracing-test = "0.2.5"
serial_test = { workspace = true }
lazy_static = { workspace = true }
mp-utils = { workspace = true, features = ["testing"] }
10 changes: 5 additions & 5 deletions crates/client/eth/src/l1_gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Context;
use mc_mempool::{GasPriceProvider, L1DataProvider};
use std::time::{Duration, UNIX_EPOCH};

use mp_utils::wait_or_graceful_shutdown;
use mp_utils::{service::ServiceContext, wait_or_graceful_shutdown};
use std::time::SystemTime;

pub async fn gas_price_worker_once(
Expand Down Expand Up @@ -36,12 +36,12 @@ pub async fn gas_price_worker(
eth_client: &EthereumClient,
l1_gas_provider: GasPriceProvider,
gas_price_poll_ms: Duration,
cancellation_token: tokio_util::sync::CancellationToken,
ctx: ServiceContext,
) -> anyhow::Result<()> {
l1_gas_provider.update_last_update_timestamp();
let mut interval = tokio::time::interval(gas_price_poll_ms);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
while wait_or_graceful_shutdown(interval.tick(), &cancellation_token).await.is_some() {
while wait_or_graceful_shutdown(interval.tick(), &ctx).await.is_some() {
gas_price_worker_once(eth_client, l1_gas_provider.clone(), gas_price_poll_ms).await?;
}
Ok(())
Expand Down Expand Up @@ -135,7 +135,7 @@ mod eth_client_gas_price_worker_test {
&eth_client,
l1_gas_provider,
Duration::from_millis(200),
tokio_util::sync::CancellationToken::new(),
ServiceContext::new_for_testing(),
)
.await
}
Expand Down Expand Up @@ -280,7 +280,7 @@ mod eth_client_gas_price_worker_test {
&eth_client,
l1_gas_provider.clone(),
Duration::from_millis(200),
tokio_util::sync::CancellationToken::new(),
ServiceContext::new_for_testing(),
),
)
.await;
Expand Down
Loading

0 comments on commit 8be1356

Please sign in to comment.