Skip to content

Commit

Permalink
feat: backups (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
cchudant authored May 14, 2024
1 parent ace892e commit 07beabf
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- feat(db): backups
- fix: state root for nonce
- fix: store the first history in storage ket
- perf: improved perfs with parallelized iteration over tx hashes cache
Expand Down
2 changes: 2 additions & 0 deletions crates/client/db/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub enum DbError {
Uuid(#[from] uuid::Error),
#[error("A value was queryied that was not initialized at column: `{0}` key: `{1}`")]
ValueNotInitialized(Column, String),
#[error("Format error: `{0}`")]
Format(String),
}

#[derive(Debug, Error)]
Expand Down
127 changes: 110 additions & 17 deletions crates/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@
//! `paritydb` and `rocksdb` are both supported, behind the `kvdb-rocksd` and `parity-db` feature
//! flags. Support for custom databases is possible but not supported yet.
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock, RwLock};
use std::{fmt, fs};

use anyhow::{bail, Context, Result};
use bonsai_db::{BonsaiDb, DatabaseKeyMapping};
use bonsai_trie::id::BasicId;
use bonsai_trie::{BonsaiStorage, BonsaiStorageConfig};
use mapping_db::MappingDb;
use meta_db::MetaDb;
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
use sc_client_db::DatabaseSource;

mod error;
mod mapping_db;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBCompressionType, MultiThreaded, OptimisticTransactionDB, Options,
BoundColumnFamily, ColumnFamilyDescriptor, DBCompressionType, Env, MultiThreaded, OptimisticTransactionDB, Options,
};
use starknet_api::hash::StarkHash;
use starknet_types_core::hash::{Pedersen, Poseidon};
Expand All @@ -38,6 +39,7 @@ pub mod storage_updates;
pub use error::{BonsaiDbError, DbError};
pub use mapping_db::MappingCommitment;
use storage_handler::bonsai_identifier;
use tokio::sync::{mpsc, oneshot};

const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
Expand All @@ -63,15 +65,26 @@ impl From<&DatabaseSettings> for BonsaiStorageConfig {

pub type DB = OptimisticTransactionDB<MultiThreaded>;

pub(crate) fn open_database(config: &DatabaseSettings) -> Result<DB> {
pub(crate) fn open_database(
config: &DatabaseSettings,
backup_dir: Option<PathBuf>,
restore_from_latest_backup: bool,
) -> Result<DB> {
Ok(match &config.source {
DatabaseSource::RocksDb { path, .. } => open_rocksdb(path, true)?,
DatabaseSource::Auto { paritydb_path: _, rocksdb_path, .. } => open_rocksdb(rocksdb_path, false)?,
DatabaseSource::RocksDb { path, .. } => open_rocksdb(path, true, backup_dir, restore_from_latest_backup)?,
DatabaseSource::Auto { paritydb_path: _, rocksdb_path, .. } => {
open_rocksdb(rocksdb_path, false, backup_dir, restore_from_latest_backup)?
}
_ => bail!("only the rocksdb database source is supported at the moment"),
})
}

pub(crate) fn open_rocksdb(path: &Path, create: bool) -> Result<OptimisticTransactionDB<MultiThreaded>> {
pub(crate) fn open_rocksdb(
path: &Path,
create: bool,
backup_dir: Option<PathBuf>,
restore_from_latest_backup: bool,
) -> Result<OptimisticTransactionDB<MultiThreaded>> {
let mut opts = Options::default();
opts.set_report_bg_io_stats(true);
opts.set_use_fsync(false);
Expand All @@ -81,8 +94,27 @@ pub(crate) fn open_rocksdb(path: &Path, create: bool) -> Result<OptimisticTransa
opts.set_keep_log_file_num(1);
opts.set_compression_type(DBCompressionType::Zstd);
let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1);
opts.increase_parallelism(i32::max(cores / 2, 1));
opts.increase_parallelism(cores);

if let Some(backup_dir) = backup_dir {
let (restored_cb_sender, restored_cb_recv) = std::sync::mpsc::channel();
// we use a channel from std because we're in a tokio context and the function is async
// TODO make the function async or somethign..

let (sender, receiver) = mpsc::channel(1);
let db_path = path.to_owned();
std::thread::spawn(move || {
spawn_backup_db_task(&backup_dir, restore_from_latest_backup, &db_path, restored_cb_sender, receiver)
.expect("database backup thread")
});
DB_BACKUP_SINGLETON.set(sender).ok().context("backend already initialized")?;

log::debug!("blocking on db restoration");
restored_cb_recv.recv().context("restoring database")?;
log::debug!("done blocking on db restoration");
}

log::debug!("creating db");
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
&opts,
path,
Expand All @@ -92,6 +124,44 @@ pub(crate) fn open_rocksdb(path: &Path, create: bool) -> Result<OptimisticTransa
Ok(db)
}

fn spawn_backup_db_task(
backup_dir: &Path,
restore_from_latest_backup: bool,
db_path: &Path,
db_restored_cb: std::sync::mpsc::Sender<()>,
mut recv: mpsc::Receiver<BackupRequest>,
) -> Result<()> {
// we use a thread to do that as backup engine is not thread safe

let mut backup_opts = BackupEngineOptions::new(backup_dir).context("creating backup options")?;
let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1);
backup_opts.set_max_background_operations(cores);

let mut engine = BackupEngine::open(&backup_opts, &Env::new().context("creating rocksdb env")?)
.context("opening backup engine")?;

if restore_from_latest_backup {
log::info!("⏳ Restoring latest backup");
log::debug!("restore path is {db_path:?}");
fs::create_dir_all(db_path).with_context(|| format!("creating directories {:?}", db_path))?;

let opts = rocksdb::backup::RestoreOptions::default();
engine.restore_from_latest_backup(db_path, db_path, &opts).context("restoring database")?;
log::debug!("restoring latest backup done");
}

db_restored_cb.send(()).ok().context("receiver dropped")?;
drop(db_restored_cb);

while let Some(BackupRequest(callback)) = recv.blocking_recv() {
let db = DB_SINGLETON.get().context("getting rocksdb instance")?;
engine.create_new_backup_flush(db, true).context("creating rocksdb backup")?;
let _ = callback.send(());
}

Ok(())
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub enum Column {
Meta,
Expand Down Expand Up @@ -224,12 +294,6 @@ impl DatabaseExt for DB {
}
}

pub mod static_keys {
pub const CURRENT_SYNCING_TIPS: &[u8] = b"CURRENT_SYNCING_TIPS";
pub const LAST_PROVED_BLOCK: &[u8] = b"LAST_PROVED_BLOCK";
pub const LAST_SYNCED_L1_EVENT_BLOCK: &[u8] = b"LAST_SYNCED_L1_EVENT_BLOCK";
}

/// Returns the Starknet database directory.
pub fn starknet_database_dir(db_config_dir: &Path, db_path: &str) -> PathBuf {
db_config_dir.join("starknet").join(db_path)
Expand Down Expand Up @@ -262,6 +326,9 @@ static BACKEND_SINGLETON: OnceLock<Arc<DeoxysBackend>> = OnceLock::new();

static DB_SINGLETON: OnceLock<Arc<DB>> = OnceLock::new();

struct BackupRequest(oneshot::Sender<()>);
static DB_BACKUP_SINGLETON: OnceLock<mpsc::Sender<BackupRequest>> = OnceLock::new();

pub struct DBDropHook; // TODO(HACK): db really really shouldnt be in a static
impl Drop for DBDropHook {
fn drop(&mut self) {
Expand All @@ -283,17 +350,36 @@ impl DeoxysBackend {
pub fn open(
database: &DatabaseSource,
db_config_dir: &Path,
backup_dir: Option<PathBuf>,
restore_from_latest_backup: bool,
cache_more_things: bool,
) -> Result<&'static Arc<DeoxysBackend>> {
// load db after restoration
BACKEND_SINGLETON
.set(Arc::new(Self::init(database, db_config_dir, cache_more_things).unwrap()))
.set(Arc::new(
Self::init(database, db_config_dir, cache_more_things, backup_dir, restore_from_latest_backup).unwrap(),
))
.ok()
.context("Backend already initialized")?;

Ok(BACKEND_SINGLETON.get().unwrap())
}

fn init(database: &DatabaseSource, db_config_dir: &Path, cache_more_things: bool) -> Result<Self> {
pub async fn backup() -> Result<()> {
let chann = DB_BACKUP_SINGLETON.get().context("backups are not enabled")?;
let (callback_sender, callback_recv) = oneshot::channel();
chann.send(BackupRequest(callback_sender)).await.context("backups are not enabled")?;
callback_recv.await.context("backups task died :(")?;
Ok(())
}

fn init(
database: &DatabaseSource,
db_config_dir: &Path,
cache_more_things: bool,
backup_dir: Option<PathBuf>,
restore_from_latest_backup: bool,
) -> Result<Self> {
Self::new(
&DatabaseSettings {
source: match database {
Expand All @@ -314,12 +400,19 @@ impl DeoxysBackend {
max_saved_snapshots: Some(0),
snapshot_interval: u64::MAX,
},
backup_dir,
restore_from_latest_backup,
cache_more_things,
)
}

fn new(config: &DatabaseSettings, cache_more_things: bool) -> Result<Self> {
DB_SINGLETON.set(Arc::new(open_database(config)?)).unwrap();
fn new(
config: &DatabaseSettings,
backup_dir: Option<PathBuf>,
restore_from_latest_backup: bool,
cache_more_things: bool,
) -> Result<Self> {
DB_SINGLETON.set(Arc::new(open_database(config, backup_dir, restore_from_latest_backup)?)).unwrap();
let db = DB_SINGLETON.get().unwrap();
let bonsai_config = BonsaiStorageConfig {
max_saved_trie_logs: Some(0),
Expand Down
26 changes: 24 additions & 2 deletions crates/client/db/src/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub struct MetaDb {
pub(crate) db: Arc<DB>,
}

const CURRENT_SYNCING_TIPS: &[u8] = b"CURRENT_SYNCING_TIPS";
const CURRENT_SYNC_BLOCK: &[u8] = b"CURRENT_SYNC_BLOCK";

impl MetaDb {
pub(crate) fn new(db: Arc<DB>) -> Self {
Self { db }
Expand All @@ -23,7 +26,7 @@ impl MetaDb {
pub fn current_syncing_tips(&self) -> Result<Vec<DHashT>, DbError> {
let column = self.db.get_column(Column::Meta);

match self.db.get_cf(&column, crate::static_keys::CURRENT_SYNCING_TIPS)? {
match self.db.get_cf(&column, CURRENT_SYNCING_TIPS)? {
Some(raw) => Ok(Vec::<DHashT>::decode(&mut &raw[..])?),
None => Ok(Vec::new()),
}
Expand All @@ -33,7 +36,26 @@ impl MetaDb {
pub fn write_current_syncing_tips(&self, tips: Vec<DHashT>) -> Result<(), DbError> {
let column = self.db.get_column(Column::Meta);

self.db.put_cf(&column, crate::static_keys::CURRENT_SYNCING_TIPS, tips.encode())?;
self.db.put_cf(&column, CURRENT_SYNCING_TIPS, tips.encode())?;
Ok(())
}

pub fn current_sync_block(&self) -> Result<u64, DbError> {
let res = self.db.get_cf(&self.db.get_column(Column::Meta), CURRENT_SYNC_BLOCK)?;
log::debug!("current_sync_block {res:?}");

if let Some(res) = res {
Ok(u64::from_be_bytes(
res.try_into().map_err(|_| DbError::Format("current sync block should be a u64".into()))?,
))
} else {
Ok(0)
}
}

pub fn set_current_sync_block(&self, sync_block: u64) -> Result<(), DbError> {
log::debug!("set_current_sync_block {sync_block}");
self.db.put_cf(&self.db.get_column(Column::Meta), CURRENT_SYNC_BLOCK, u64::to_be_bytes(sync_block))?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/client/sync/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn l2_fetch_task(
fetch_stream_sender: mpsc::Sender<L2BlockAndUpdates>,
provider: Arc<SequencerGatewayProvider>,
sync_polling_interval: Option<Duration>,
) -> Result<(), L2SyncError> {
) -> anyhow::Result<()> {
// First, catch up with the chain

let mut next_block = first_block;
Expand Down
Loading

0 comments on commit 07beabf

Please sign in to comment.