Skip to content

Commit

Permalink
feat(db): added --flush-every-n-seconds cli flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Trantorian1 committed Nov 29, 2024
1 parent 4c0b1f8 commit 17eb84d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 7 deletions.
2 changes: 2 additions & 0 deletions crates/client/sync/src/fetch/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct FetchConfig {
pub n_blocks_to_sync: Option<u64>,
/// Number of blocks between db flushes
pub flush_every_n_blocks: u64,
/// Number of seconds between db flushes
pub flush_every_n_seconds: u64,
/// Stops the node once all blocks have been synced (for testing purposes)
pub stop_on_sync: bool,
/// Number of blocks to fetch in parallel during the sync process
Expand Down
14 changes: 11 additions & 3 deletions crates/client/sync/src/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct L2VerifyApplyConfig {
block_import: Arc<BlockImporter>,
backup_every_n_blocks: Option<u64>,
flush_every_n_blocks: u64,
flush_every_n_seconds: u64,
stop_on_sync: bool,
telemetry: TelemetryHandle,
validation: BlockValidationContext,
Expand All @@ -62,23 +63,27 @@ async fn l2_verify_and_apply_task(
ctx: ServiceContext,
config: L2VerifyApplyConfig,
) -> anyhow::Result<()> {
let mut last_block_n = 0;

let L2VerifyApplyConfig {
block_import,
backup_every_n_blocks,
flush_every_n_blocks,
flush_every_n_seconds,
stop_on_sync,
telemetry,
validation,
mut block_conv_receiver,
} = config;

let mut last_block_n = 0;
let mut instant = std::time::Instant::now();
let target_duration = std::time::Duration::from_secs(flush_every_n_seconds);

while let Some(block) = channel_wait_or_graceful_shutdown(pin!(block_conv_receiver.recv()), &ctx).await {
let BlockImportResult { header, block_hash } = block_import.verify_apply(block, validation.clone()).await?;

if header.block_number - last_block_n >= flush_every_n_blocks {
if header.block_number - last_block_n >= flush_every_n_blocks || instant.elapsed() >= target_duration {
last_block_n = header.block_number;
instant = std::time::Instant::now();
backend.flush().context("Flushing database")?;
}

Expand Down Expand Up @@ -225,6 +230,7 @@ pub struct L2SyncConfig {
pub sync_polling_interval: Option<Duration>,
pub backup_every_n_blocks: Option<u64>,
pub flush_every_n_blocks: u64,
pub flush_every_n_seconds: u64,
pub pending_block_poll_interval: Duration,
pub ignore_block_order: bool,
pub warp_update: bool,
Expand Down Expand Up @@ -297,6 +303,7 @@ pub async fn sync(
block_import: Arc::clone(&config.block_importer),
backup_every_n_blocks: config.backup_every_n_blocks,
flush_every_n_blocks: config.flush_every_n_blocks,
flush_every_n_seconds: config.flush_every_n_seconds,
stop_on_sync: config.stop_on_sync,
telemetry: config.telemetry,
validation: validation.clone(),
Expand Down Expand Up @@ -374,6 +381,7 @@ mod tests {
block_import: block_import.clone(),
backup_every_n_blocks: Some(1),
flush_every_n_blocks: 1,
flush_every_n_seconds: 10,
stop_on_sync: false,
telemetry,
validation: validation.clone(),
Expand Down
1 change: 1 addition & 0 deletions crates/client/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub async fn l2_sync_worker(
sync_polling_interval: fetch_config.sync_polling_interval,
backup_every_n_blocks: sync_config.backup_every_n_blocks,
flush_every_n_blocks: fetch_config.flush_every_n_blocks,
flush_every_n_seconds: fetch_config.flush_every_n_seconds,
pending_block_poll_interval: sync_config.pending_block_poll_interval,
ignore_block_order,
sync_parallelism: fetch_config.sync_parallelism,
Expand Down
37 changes: 33 additions & 4 deletions crates/node/src/cli/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,46 @@ pub struct SyncParams {
#[clap(env = "MADARA_BACKUP_EVERY_N_BLOCKS", long, value_name = "NUMBER OF BLOCKS")]
pub backup_every_n_blocks: Option<u64>,

/// Periodically flushes the database from ram to disk. You can set this
/// based on how fast your machine is at synchronizing blocks and how much
/// ram it has available.
/// Periodically flushes the database from ram to disk based on the number
/// of blocks synchronized since the last flush. You can set this to a
/// higher number depending on how fast your machine is at synchronizing
/// blocks and how much ram it has available.
///
/// Be aware that blocks might still be flushed to db earlier based on the
/// value of --flush-every-n-seconds.
///
/// Note that keeping this value high could lead to blocks being stored in
/// ram for longer periods of time before they are written to disk. This
/// might be an issue for chains which synchronize slowly.
#[clap(
env = "MADARA_FLUSH_EVERY_N_BLOCKS",
value_name = "FLUSH EVERY N BLOCKS",
long,
value_parser = clap::value_parser!(u64).range(..10_000),
value_parser = clap::value_parser!(u64).range(..=10_000),
default_value_t = 1_000
)]
pub flush_every_n_blocks: u64,

/// Periodically flushes the database from ram to disk based on the elapsed
/// time since the last flush. You can set this to a higher number
/// depending on how fast your machine is at synchronizing blocks and how
/// much ram it has available.
///
/// Be aware that blocks might still be flushed to db earlier based on the
/// value of --flush-every-n-blocks.
///
/// Note that keeping this value high could lead to blocks being stored in
/// ram for longer periods of time before they are written to disk. This
/// might be an issue for chains which synchronize slowly.
#[clap(
env = "MADARA_FLUSH_EVERY_N_BLOCKS",
value_name = "FLUSH EVERY N BLOCKS",
long,
value_parser = clap::value_parser!(u64).range(..=3_600),
default_value_t = 5
)]
pub flush_every_n_seconds: u64,

/// Number of blocks to fetch in parallel. This only affects sync time, and
/// does not affect the node once it has reached the tip of the chain.
/// Increasing this can lead to lower sync times at the cost of higher cpu
Expand Down Expand Up @@ -135,6 +163,7 @@ impl SyncParams {
sync_polling_interval: polling,
n_blocks_to_sync: self.n_blocks_to_sync,
flush_every_n_blocks: self.flush_every_n_blocks,
flush_every_n_seconds: self.flush_every_n_seconds,
stop_on_sync: self.stop_on_sync,
sync_parallelism: self.sync_parallelism,
warp_update,
Expand Down

0 comments on commit 17eb84d

Please sign in to comment.