diff --git a/CHANGELOG.md b/CHANGELOG.md index 1aa7a504b..b44a050bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Next release +- feat(db): backups - fix: state root for nonce - fix: store the first history in storage ket - perf: improved perfs with parallelized iteration over tx hashes cache diff --git a/crates/client/db/src/error.rs b/crates/client/db/src/error.rs index 38dcb200d..7aa8b0197 100644 --- a/crates/client/db/src/error.rs +++ b/crates/client/db/src/error.rs @@ -13,6 +13,8 @@ pub enum DbError { Uuid(#[from] uuid::Error), #[error("A value was queryied that was not initialized at column: `{0}` key: `{1}`")] ValueNotInitialized(Column, String), + #[error("Format error: `{0}`")] + Format(String), } #[derive(Debug, Error)] diff --git a/crates/client/db/src/lib.rs b/crates/client/db/src/lib.rs index 1b89a5b1d..a3c21bd3c 100644 --- a/crates/client/db/src/lib.rs +++ b/crates/client/db/src/lib.rs @@ -11,9 +11,9 @@ //! `paritydb` and `rocksdb` are both supported, behind the `kvdb-rocksd` and `parity-db` feature //! flags. Support for custom databases is possible but not supported yet. -use std::fmt; use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock, RwLock}; +use std::{fmt, fs}; use anyhow::{bail, Context, Result}; use bonsai_db::{BonsaiDb, DatabaseKeyMapping}; @@ -21,12 +21,13 @@ use bonsai_trie::id::BasicId; use bonsai_trie::{BonsaiStorage, BonsaiStorageConfig}; use mapping_db::MappingDb; use meta_db::MetaDb; +use rocksdb::backup::{BackupEngine, BackupEngineOptions}; use sc_client_db::DatabaseSource; mod error; mod mapping_db; use rocksdb::{ - BoundColumnFamily, ColumnFamilyDescriptor, DBCompressionType, MultiThreaded, OptimisticTransactionDB, Options, + BoundColumnFamily, ColumnFamilyDescriptor, DBCompressionType, Env, MultiThreaded, OptimisticTransactionDB, Options, }; use starknet_api::hash::StarkHash; use starknet_types_core::hash::{Pedersen, Poseidon}; @@ -38,6 +39,7 @@ pub mod storage_updates; pub use error::{BonsaiDbError, DbError}; pub use mapping_db::MappingCommitment; use storage_handler::bonsai_identifier; +use tokio::sync::{mpsc, oneshot}; const DB_HASH_LEN: usize = 32; /// Hash type that this backend uses for the database. @@ -63,15 +65,26 @@ impl From<&DatabaseSettings> for BonsaiStorageConfig { pub type DB = OptimisticTransactionDB; -pub(crate) fn open_database(config: &DatabaseSettings) -> Result { +pub(crate) fn open_database( + config: &DatabaseSettings, + backup_dir: Option, + restore_from_latest_backup: bool, +) -> Result { Ok(match &config.source { - DatabaseSource::RocksDb { path, .. } => open_rocksdb(path, true)?, - DatabaseSource::Auto { paritydb_path: _, rocksdb_path, .. } => open_rocksdb(rocksdb_path, false)?, + DatabaseSource::RocksDb { path, .. } => open_rocksdb(path, true, backup_dir, restore_from_latest_backup)?, + DatabaseSource::Auto { paritydb_path: _, rocksdb_path, .. } => { + open_rocksdb(rocksdb_path, false, backup_dir, restore_from_latest_backup)? + } _ => bail!("only the rocksdb database source is supported at the moment"), }) } -pub(crate) fn open_rocksdb(path: &Path, create: bool) -> Result> { +pub(crate) fn open_rocksdb( + path: &Path, + create: bool, + backup_dir: Option, + restore_from_latest_backup: bool, +) -> Result> { let mut opts = Options::default(); opts.set_report_bg_io_stats(true); opts.set_use_fsync(false); @@ -81,8 +94,27 @@ pub(crate) fn open_rocksdb(path: &Path, create: bool) -> Result::open_cf_descriptors( &opts, path, @@ -92,6 +124,44 @@ pub(crate) fn open_rocksdb(path: &Path, create: bool) -> Result, + mut recv: mpsc::Receiver, +) -> Result<()> { + // we use a thread to do that as backup engine is not thread safe + + let mut backup_opts = BackupEngineOptions::new(backup_dir).context("creating backup options")?; + let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1); + backup_opts.set_max_background_operations(cores); + + let mut engine = BackupEngine::open(&backup_opts, &Env::new().context("creating rocksdb env")?) + .context("opening backup engine")?; + + if restore_from_latest_backup { + log::info!("⏳ Restoring latest backup"); + log::debug!("restore path is {db_path:?}"); + fs::create_dir_all(db_path).with_context(|| format!("creating directories {:?}", db_path))?; + + let opts = rocksdb::backup::RestoreOptions::default(); + engine.restore_from_latest_backup(db_path, db_path, &opts).context("restoring database")?; + log::debug!("restoring latest backup done"); + } + + db_restored_cb.send(()).ok().context("receiver dropped")?; + drop(db_restored_cb); + + while let Some(BackupRequest(callback)) = recv.blocking_recv() { + let db = DB_SINGLETON.get().context("getting rocksdb instance")?; + engine.create_new_backup_flush(db, true).context("creating rocksdb backup")?; + let _ = callback.send(()); + } + + Ok(()) +} + #[derive(Clone, Copy, PartialEq, Eq)] pub enum Column { Meta, @@ -224,12 +294,6 @@ impl DatabaseExt for DB { } } -pub mod static_keys { - pub const CURRENT_SYNCING_TIPS: &[u8] = b"CURRENT_SYNCING_TIPS"; - pub const LAST_PROVED_BLOCK: &[u8] = b"LAST_PROVED_BLOCK"; - pub const LAST_SYNCED_L1_EVENT_BLOCK: &[u8] = b"LAST_SYNCED_L1_EVENT_BLOCK"; -} - /// Returns the Starknet database directory. pub fn starknet_database_dir(db_config_dir: &Path, db_path: &str) -> PathBuf { db_config_dir.join("starknet").join(db_path) @@ -262,6 +326,9 @@ static BACKEND_SINGLETON: OnceLock> = OnceLock::new(); static DB_SINGLETON: OnceLock> = OnceLock::new(); +struct BackupRequest(oneshot::Sender<()>); +static DB_BACKUP_SINGLETON: OnceLock> = OnceLock::new(); + pub struct DBDropHook; // TODO(HACK): db really really shouldnt be in a static impl Drop for DBDropHook { fn drop(&mut self) { @@ -283,17 +350,36 @@ impl DeoxysBackend { pub fn open( database: &DatabaseSource, db_config_dir: &Path, + backup_dir: Option, + restore_from_latest_backup: bool, cache_more_things: bool, ) -> Result<&'static Arc> { + // load db after restoration BACKEND_SINGLETON - .set(Arc::new(Self::init(database, db_config_dir, cache_more_things).unwrap())) + .set(Arc::new( + Self::init(database, db_config_dir, cache_more_things, backup_dir, restore_from_latest_backup).unwrap(), + )) .ok() .context("Backend already initialized")?; Ok(BACKEND_SINGLETON.get().unwrap()) } - fn init(database: &DatabaseSource, db_config_dir: &Path, cache_more_things: bool) -> Result { + pub async fn backup() -> Result<()> { + let chann = DB_BACKUP_SINGLETON.get().context("backups are not enabled")?; + let (callback_sender, callback_recv) = oneshot::channel(); + chann.send(BackupRequest(callback_sender)).await.context("backups are not enabled")?; + callback_recv.await.context("backups task died :(")?; + Ok(()) + } + + fn init( + database: &DatabaseSource, + db_config_dir: &Path, + cache_more_things: bool, + backup_dir: Option, + restore_from_latest_backup: bool, + ) -> Result { Self::new( &DatabaseSettings { source: match database { @@ -314,12 +400,19 @@ impl DeoxysBackend { max_saved_snapshots: Some(0), snapshot_interval: u64::MAX, }, + backup_dir, + restore_from_latest_backup, cache_more_things, ) } - fn new(config: &DatabaseSettings, cache_more_things: bool) -> Result { - DB_SINGLETON.set(Arc::new(open_database(config)?)).unwrap(); + fn new( + config: &DatabaseSettings, + backup_dir: Option, + restore_from_latest_backup: bool, + cache_more_things: bool, + ) -> Result { + DB_SINGLETON.set(Arc::new(open_database(config, backup_dir, restore_from_latest_backup)?)).unwrap(); let db = DB_SINGLETON.get().unwrap(); let bonsai_config = BonsaiStorageConfig { max_saved_trie_logs: Some(0), diff --git a/crates/client/db/src/meta_db.rs b/crates/client/db/src/meta_db.rs index 71bc660ca..9f9350dfb 100644 --- a/crates/client/db/src/meta_db.rs +++ b/crates/client/db/src/meta_db.rs @@ -14,6 +14,9 @@ pub struct MetaDb { pub(crate) db: Arc, } +const CURRENT_SYNCING_TIPS: &[u8] = b"CURRENT_SYNCING_TIPS"; +const CURRENT_SYNC_BLOCK: &[u8] = b"CURRENT_SYNC_BLOCK"; + impl MetaDb { pub(crate) fn new(db: Arc) -> Self { Self { db } @@ -23,7 +26,7 @@ impl MetaDb { pub fn current_syncing_tips(&self) -> Result, DbError> { let column = self.db.get_column(Column::Meta); - match self.db.get_cf(&column, crate::static_keys::CURRENT_SYNCING_TIPS)? { + match self.db.get_cf(&column, CURRENT_SYNCING_TIPS)? { Some(raw) => Ok(Vec::::decode(&mut &raw[..])?), None => Ok(Vec::new()), } @@ -33,7 +36,26 @@ impl MetaDb { pub fn write_current_syncing_tips(&self, tips: Vec) -> Result<(), DbError> { let column = self.db.get_column(Column::Meta); - self.db.put_cf(&column, crate::static_keys::CURRENT_SYNCING_TIPS, tips.encode())?; + self.db.put_cf(&column, CURRENT_SYNCING_TIPS, tips.encode())?; + Ok(()) + } + + pub fn current_sync_block(&self) -> Result { + let res = self.db.get_cf(&self.db.get_column(Column::Meta), CURRENT_SYNC_BLOCK)?; + log::debug!("current_sync_block {res:?}"); + + if let Some(res) = res { + Ok(u64::from_be_bytes( + res.try_into().map_err(|_| DbError::Format("current sync block should be a u64".into()))?, + )) + } else { + Ok(0) + } + } + + pub fn set_current_sync_block(&self, sync_block: u64) -> Result<(), DbError> { + log::debug!("set_current_sync_block {sync_block}"); + self.db.put_cf(&self.db.get_column(Column::Meta), CURRENT_SYNC_BLOCK, u64::to_be_bytes(sync_block))?; Ok(()) } } diff --git a/crates/client/sync/src/fetch/mod.rs b/crates/client/sync/src/fetch/mod.rs index 065905dcc..9f3d16054 100644 --- a/crates/client/sync/src/fetch/mod.rs +++ b/crates/client/sync/src/fetch/mod.rs @@ -19,7 +19,7 @@ pub async fn l2_fetch_task( fetch_stream_sender: mpsc::Sender, provider: Arc, sync_polling_interval: Option, -) -> Result<(), L2SyncError> { +) -> anyhow::Result<()> { // First, catch up with the chain let mut next_block = first_block; diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 9d7fbebed..2dfb0c00b 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -3,7 +3,7 @@ use std::pin::pin; use std::str::FromStr; use std::sync::{Arc, RwLock}; -use anyhow::Context; +use anyhow::{bail, Context}; use futures::{stream, StreamExt, TryStreamExt}; use lazy_static::lazy_static; use mc_db::storage_handler::primitives::contract_class::{ClassUpdateWrapper, ContractClassData}; @@ -137,7 +137,8 @@ async fn l2_verify_and_apply_task( block_sender: Sender, mut command_sink: CommandSink, verify: bool, -) -> Result<(), L2SyncError> { + backup_every_n_blocks: Option, +) -> anyhow::Result<()> { let block_sender = Arc::new(block_sender); let mut last_block_hash = None; @@ -150,16 +151,26 @@ async fn l2_verify_and_apply_task( let state_update_1 = Arc::clone(&state_update); let global_state_root = block.header().global_state_root; - spawn_compute(move || { + let state_root = spawn_compute(move || { let sw = PerfStopwatch::new(); - let state_root = verify_l2(block_n, &state_update); + let state_root = verify_l2(block_n, &state_update)?; stopwatch_end!(sw, "verify_l2: {:?}"); - if global_state_root != state_root { - log::info!("❗ Verified state: {} doesn't match fetched state: {}", state_root, global_state_root); - } + anyhow::Ok(state_root) }) - .await; + .await?; + + if global_state_root != state_root { + // TODO(fault tolerance): we should have a single rocksdb transaction for the whole l2 update. + // let prev_block = block_n.checked_sub(1).expect("no block to revert to"); + + // storage_handler::contract_trie_mut().revert_to(prev_block); + // storage_handler::contract_storage_trie_mut().revert_to(prev_block); + // storage_handler::contract_class_trie_mut().revert_to(prev_block); + // TODO(charpa): make other stuff revertible, maybe history? + + bail!("Verified state: {} doesn't match fetched state: {}", state_root, global_state_root); + } // UNWRAP: we need a 'static future as we are spawning tokio tasks further down the line // this is a hack to achieve that, we put the update in an arc and then unwrap it at the end @@ -203,10 +214,19 @@ async fn l2_verify_and_apply_task( } ); + DeoxysBackend::meta().set_current_sync_block(block_n).context("setting current sync block")?; + // compact DB every 1k blocks if block_n % 1000 == 0 { DeoxysBackend::compact(); } + + if backup_every_n_blocks.is_some_and(|backup_every_n_blocks| block_n % backup_every_n_blocks as u64 == 0) { + log::info!("⏳ Backing up database at block {block_n}..."); + let sw = PerfStopwatch::new(); + DeoxysBackend::backup().await.context("backing up database")?; + log::info!("✅ Database backup is done ({:?})", sw.elapsed()); + } } Ok(()) @@ -222,7 +242,7 @@ pub struct L2ConvertedBlockAndUpdates { async fn l2_block_conversion_task( updates_receiver: mpsc::Receiver, output: mpsc::Sender, -) -> Result<(), L2SyncError> { +) -> anyhow::Result<()> { // Items of this stream are futures that resolve to blocks, which becomes a regular stream of blocks // using futures buffered. let conversion_stream = stream::unfold(updates_receiver, |mut updates_recv| async { @@ -253,6 +273,7 @@ pub struct L2SyncConfig { pub n_blocks_to_sync: Option, pub verify: bool, pub sync_polling_interval: Option, + pub backup_every_n_blocks: Option, } /// Spawns workers to fetch blocks and state updates from the feeder. @@ -289,8 +310,13 @@ where config.sync_polling_interval, )); let mut block_conversion_task = tokio::spawn(l2_block_conversion_task(fetch_stream_receiver, block_conv_sender)); - let mut verify_and_apply_task = - tokio::spawn(l2_verify_and_apply_task(block_conv_receiver, block_sender, command_sink, config.verify)); + let mut verify_and_apply_task = tokio::spawn(l2_verify_and_apply_task( + block_conv_receiver, + block_sender, + command_sink, + config.verify, + config.backup_every_n_blocks, + )); tokio::select!( // update highest block hash and number, update pending block and state update @@ -353,7 +379,7 @@ pub fn update_l2(state_update: L2StateUpdate) { } /// Verify and update the L2 state according to the latest state update -pub fn verify_l2(block_number: u64, state_update: &StateUpdate) -> StarkFelt { +pub fn verify_l2(block_number: u64, state_update: &StateUpdate) -> anyhow::Result { let csd = build_commitment_state_diff(state_update); let state_root = update_state_root(csd, block_number); let block_hash = state_update.block_hash; @@ -364,7 +390,7 @@ pub fn verify_l2(block_number: u64, state_update: &StateUpdate) -> StarkFelt { block_hash: Felt252Wrapper::from(block_hash).into(), }); - state_root.into() + Ok(state_root.into()) } async fn update_starknet_data(provider: &SequencerGatewayProvider, client: &C) -> Result<(), String> diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index 919b35dd6..40db0d8d1 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -46,6 +46,7 @@ pub mod starknet_sync_worker { l1_url: Url, client: Arc, starting_block: u32, + backup_every_n_blocks: Option, ) -> anyhow::Result<()> where C: HeaderBackend + 'static, @@ -68,7 +69,7 @@ pub mod starknet_sync_worker { .await .context("getting state update for genesis block")? .to_state_update_core(); - verify_l2(0, &state_update); + verify_l2(0, &state_update)?; } tokio::select!( @@ -82,7 +83,8 @@ pub mod starknet_sync_worker { first_block: starting_block.into(), n_blocks_to_sync: fetch_config.n_blocks_to_sync, verify: fetch_config.verify, - sync_polling_interval: fetch_config.sync_polling_interval + sync_polling_interval: fetch_config.sync_polling_interval, + backup_every_n_blocks, }, ) => res.context("syncing L2 state")? ); diff --git a/crates/client/sync/src/utils/mod.rs b/crates/client/sync/src/utils/mod.rs index 6497e9af3..61903b3a0 100644 --- a/crates/client/sync/src/utils/mod.rs +++ b/crates/client/sync/src/utils/mod.rs @@ -1,6 +1,6 @@ #![macro_use] #![allow(clippy::new_without_default)] -use std::time::Instant; +use std::time::{Duration, Instant}; pub mod constant; pub mod convert; @@ -14,11 +14,15 @@ impl PerfStopwatch { pub fn new() -> PerfStopwatch { PerfStopwatch(Instant::now()) } + + pub fn elapsed(&self) -> Duration { + self.0.elapsed() + } } #[macro_export] macro_rules! stopwatch_end { ($stopwatch:expr, $($arg:tt)+) => { - log::debug!($($arg)+, $stopwatch.0.elapsed()) + log::debug!($($arg)+, $stopwatch.elapsed()) } } diff --git a/crates/node/src/commands/run.rs b/crates/node/src/commands/run.rs index 98378d53a..548f15f47 100644 --- a/crates/node/src/commands/run.rs +++ b/crates/node/src/commands/run.rs @@ -162,6 +162,15 @@ pub struct ExtendedRunCmd { #[cfg(feature = "tui")] #[clap(long)] pub tui: bool, + + #[clap(long)] + pub backup_every_n_blocks: Option, + + #[clap(long)] + pub backup_dir: Option, + + #[clap(long, default_value = "false")] + pub restore_from_latest_backup: bool, } pub fn run_node(mut cli: Cli) -> Result<()> { @@ -209,8 +218,19 @@ pub fn run_node(mut cli: Cli) -> Result<()> { let genesis_block = fetch_apply_genesis_block(fetch_block_config.clone()).await.unwrap(); - service::new_full(config, sealing, l1_endpoint, cache, fetch_block_config, genesis_block, starting_block) - .map_err(sc_cli::Error::Service) + service::new_full( + config, + sealing, + l1_endpoint, + cache, + fetch_block_config, + genesis_block, + starting_block, + cli.run.backup_every_n_blocks, + cli.run.backup_dir, + cli.run.restore_from_latest_backup, + ) + .map_err(sc_cli::Error::Service) }) } diff --git a/crates/node/src/service.rs b/crates/node/src/service.rs index 7470e6206..e7e9fab69 100644 --- a/crates/node/src/service.rs +++ b/crates/node/src/service.rs @@ -21,7 +21,7 @@ use parity_scale_codec::Encode; use prometheus_endpoint::Registry; use reqwest::Url; use sc_basic_authorship::ProposerFactory; -use sc_client_api::{BlockchainEvents, HeaderBackend}; +use sc_client_api::BlockchainEvents; use sc_consensus::{BasicQueue, BlockImportParams}; use sc_consensus_manual_seal::{ConsensusDataProvider, Error}; pub use sc_executor::NativeElseWasmExecutor; @@ -74,6 +74,8 @@ pub fn new_partial( build_import_queue: BIQ, cache_more_things: bool, genesis_block: DeoxysBlock, + backup_dir: Option, + restore_from_latest_backup: bool, ) -> Result< sc_service::PartialComponents< FullClient, @@ -94,7 +96,14 @@ where &TaskManager, ) -> Result<(BasicImportQueue, BoxBlockImport), ServiceError>, { - let deoxys_backend = DeoxysBackend::open(&config.database, &db_config_dir(config), cache_more_things).unwrap(); + let deoxys_backend = DeoxysBackend::open( + &config.database, + &db_config_dir(config), + backup_dir, + restore_from_latest_backup, + cache_more_things, + ) + .unwrap(); let telemetry = config .telemetry_endpoints @@ -189,6 +198,7 @@ where /// # Arguments /// /// - `cache`: whether more information should be cached when storing the block in the database. +#[allow(clippy::too_many_arguments)] // grr pub fn new_full( config: Configuration, sealing: SealingMode, @@ -197,6 +207,9 @@ pub fn new_full( fetch_config: FetchConfig, genesis_block: DeoxysBlock, starting_block: Option, + backup_every_n_blocks: Option, + backup_dir: Option, + restore_from_latest_backup: bool, ) -> Result { let build_import_queue = build_manual_seal_import_queue; @@ -209,7 +222,14 @@ pub fn new_full( select_chain, transaction_pool, other: (block_import, mut telemetry, deoxys_backend), - } = new_partial(&config, build_import_queue, cache_more_things, genesis_block)?; + } = new_partial( + &config, + build_import_queue, + cache_more_things, + genesis_block, + backup_dir, + restore_from_latest_backup, + )?; let net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); @@ -228,7 +248,7 @@ pub fn new_full( let prometheus_registry = config.prometheus_registry().cloned(); - let best_block = client.info().best_number; + let best_block = DeoxysBackend::meta().current_sync_block().expect("getting current sync block") as _; let on_block = if starting_block.is_some() && starting_block >= Some(best_block) { starting_block } else { Some(best_block) }; @@ -310,6 +330,7 @@ pub fn new_full( l1_url, Arc::clone(&client), on_block.unwrap(), + backup_every_n_blocks, ); async { fut.await.unwrap() } }); @@ -473,7 +494,13 @@ type ChainOpsResult = pub fn new_chain_ops(config: &mut Configuration, cache_more_things: bool) -> ChainOpsResult { config.keystore = sc_service::config::KeystoreConfig::InMemory; - let sc_service::PartialComponents { client, backend, import_queue, task_manager, other, .. } = - new_partial::<_>(config, build_manual_seal_import_queue, cache_more_things, DeoxysBlock::default())?; + let sc_service::PartialComponents { client, backend, import_queue, task_manager, other, .. } = new_partial::<_>( + config, + build_manual_seal_import_queue, + cache_more_things, + DeoxysBlock::default(), + None, + false, + )?; Ok((client, backend, import_queue, task_manager, other.2)) }