diff --git a/crates/client/sync/src/fetch/fetchers.rs b/crates/client/sync/src/fetch/fetchers.rs index 76e9aa9fc..36e3f6e7f 100644 --- a/crates/client/sync/src/fetch/fetchers.rs +++ b/crates/client/sync/src/fetch/fetchers.rs @@ -43,6 +43,8 @@ pub struct FetchConfig { pub n_blocks_to_sync: Option, /// Number of blocks between db flushes pub flush_every_n_blocks: u64, + /// Number of seconds between db flushes + pub flush_every_n_seconds: u64, /// Stops the node once all blocks have been synced (for testing purposes) pub stop_on_sync: bool, /// Number of blocks to fetch in parallel during the sync process diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index b0bdd76a4..356edc08b 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -50,6 +50,7 @@ pub struct L2VerifyApplyConfig { block_import: Arc, backup_every_n_blocks: Option, flush_every_n_blocks: u64, + flush_every_n_seconds: u64, stop_on_sync: bool, telemetry: TelemetryHandle, validation: BlockValidationContext, @@ -62,23 +63,27 @@ async fn l2_verify_and_apply_task( ctx: ServiceContext, config: L2VerifyApplyConfig, ) -> anyhow::Result<()> { - let mut last_block_n = 0; - let L2VerifyApplyConfig { block_import, backup_every_n_blocks, flush_every_n_blocks, + flush_every_n_seconds, stop_on_sync, telemetry, validation, mut block_conv_receiver, } = config; + let mut last_block_n = 0; + let mut instant = std::time::Instant::now(); + let target_duration = std::time::Duration::from_secs(flush_every_n_seconds); + while let Some(block) = channel_wait_or_graceful_shutdown(pin!(block_conv_receiver.recv()), &ctx).await { let BlockImportResult { header, block_hash } = block_import.verify_apply(block, validation.clone()).await?; - if header.block_number - last_block_n >= flush_every_n_blocks { + if header.block_number - last_block_n >= flush_every_n_blocks || instant.elapsed() >= target_duration { last_block_n = header.block_number; + instant = std::time::Instant::now(); backend.flush().context("Flushing database")?; } @@ -225,6 +230,7 @@ pub struct L2SyncConfig { pub sync_polling_interval: Option, pub backup_every_n_blocks: Option, pub flush_every_n_blocks: u64, + pub flush_every_n_seconds: u64, pub pending_block_poll_interval: Duration, pub ignore_block_order: bool, pub warp_update: bool, @@ -297,6 +303,7 @@ pub async fn sync( block_import: Arc::clone(&config.block_importer), backup_every_n_blocks: config.backup_every_n_blocks, flush_every_n_blocks: config.flush_every_n_blocks, + flush_every_n_seconds: config.flush_every_n_seconds, stop_on_sync: config.stop_on_sync, telemetry: config.telemetry, validation: validation.clone(), @@ -374,6 +381,7 @@ mod tests { block_import: block_import.clone(), backup_every_n_blocks: Some(1), flush_every_n_blocks: 1, + flush_every_n_seconds: 10, stop_on_sync: false, telemetry, validation: validation.clone(), diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index dc2a4295e..6bb874f9e 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -68,6 +68,7 @@ pub async fn l2_sync_worker( sync_polling_interval: fetch_config.sync_polling_interval, backup_every_n_blocks: sync_config.backup_every_n_blocks, flush_every_n_blocks: fetch_config.flush_every_n_blocks, + flush_every_n_seconds: fetch_config.flush_every_n_seconds, pending_block_poll_interval: sync_config.pending_block_poll_interval, ignore_block_order, sync_parallelism: fetch_config.sync_parallelism, diff --git a/crates/node/src/cli/sync.rs b/crates/node/src/cli/sync.rs index 00044cdfd..930b08c9f 100644 --- a/crates/node/src/cli/sync.rs +++ b/crates/node/src/cli/sync.rs @@ -84,18 +84,46 @@ pub struct SyncParams { #[clap(env = "MADARA_BACKUP_EVERY_N_BLOCKS", long, value_name = "NUMBER OF BLOCKS")] pub backup_every_n_blocks: Option, - /// Periodically flushes the database from ram to disk. You can set this - /// based on how fast your machine is at synchronizing blocks and how much - /// ram it has available. + /// Periodically flushes the database from ram to disk based on the number + /// of blocks synchronized since the last flush. You can set this to a + /// higher number depending on how fast your machine is at synchronizing + /// blocks and how much ram it has available. + /// + /// Be aware that blocks might still be flushed to db earlier based on the + /// value of --flush-every-n-seconds. + /// + /// Note that keeping this value high could lead to blocks being stored in + /// ram for longer periods of time before they are written to disk. This + /// might be an issue for chains which synchronize slowly. #[clap( env = "MADARA_FLUSH_EVERY_N_BLOCKS", value_name = "FLUSH EVERY N BLOCKS", long, - value_parser = clap::value_parser!(u64).range(..10_000), + value_parser = clap::value_parser!(u64).range(..=10_000), default_value_t = 1_000 )] pub flush_every_n_blocks: u64, + /// Periodically flushes the database from ram to disk based on the elapsed + /// time since the last flush. You can set this to a higher number + /// depending on how fast your machine is at synchronizing blocks and how + /// much ram it has available. + /// + /// Be aware that blocks might still be flushed to db earlier based on the + /// value of --flush-every-n-blocks. + /// + /// Note that keeping this value high could lead to blocks being stored in + /// ram for longer periods of time before they are written to disk. This + /// might be an issue for chains which synchronize slowly. + #[clap( + env = "MADARA_FLUSH_EVERY_N_BLOCKS", + value_name = "FLUSH EVERY N BLOCKS", + long, + value_parser = clap::value_parser!(u64).range(..=3_600), + default_value_t = 5 + )] + pub flush_every_n_seconds: u64, + /// Number of blocks to fetch in parallel. This only affects sync time, and /// does not affect the node once it has reached the tip of the chain. /// Increasing this can lead to lower sync times at the cost of higher cpu @@ -135,6 +163,7 @@ impl SyncParams { sync_polling_interval: polling, n_blocks_to_sync: self.n_blocks_to_sync, flush_every_n_blocks: self.flush_every_n_blocks, + flush_every_n_seconds: self.flush_every_n_seconds, stop_on_sync: self.stop_on_sync, sync_parallelism: self.sync_parallelism, warp_update,