From af1664ab6f813d2d06a78894bf747bc4b00560bc Mon Sep 17 00:00:00 2001 From: cchudant Date: Thu, 18 Apr 2024 18:24:31 +0000 Subject: [PATCH] fix(sync): Fix end condition of the l2 sync --- CHANGELOG.md | 1 + crates/client/sync/src/l2.rs | 24 ++++++++++++++++++------ crates/client/sync/src/lib.rs | 2 +- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f199a274d..2252fb0b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ git # Deoxys Changelog ## Next release +- fix(sync): Fix end condition of the l2 sync - fix(rpc): fix chain id method for mainnet - fix(class): Fix Sierra classes conversion (missing abis) - fix(compute): Fixed prepare_data_availability_modes computation diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 5522c9971..fc20dbc18 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -137,8 +137,14 @@ pub struct SenderConfig { } /// Spawns workers to fetch blocks and state updates from the feeder. -pub async fn sync(mut sender_config: SenderConfig, fetch_config: FetchConfig, first_block: u64, client: Arc) -where +/// `n_blocks` is optionally the total number of blocks to sync, for debugging/benchmark purposes. +pub async fn sync( + mut sender_config: SenderConfig, + fetch_config: FetchConfig, + first_block: u64, + n_blocks: Option, + client: Arc, +) where C: HeaderBackend + 'static, { let SenderConfig { block_sender, state_update_sender, class_sender, command_sink, overrides } = &mut sender_config; @@ -166,7 +172,7 @@ where } }); // Have 10 fetches in parallel at once, using futures Buffered - let fetch_stream = stream::iter(fetch_stream).buffered(10); + let fetch_stream = stream::iter(fetch_stream.take(n_blocks.unwrap_or(usize::MAX))).buffered(10); let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(10); tokio::select!( @@ -182,9 +188,15 @@ where } } => {}, // fetch blocks and updates in parallel - _ = fetch_stream.for_each(|val| async { - fetch_stream_sender.send(val).await.expect("receiver is closed"); - }) => {}, + _ = async { + fetch_stream.for_each(|val| async { + fetch_stream_sender.send(val).await.expect("receiver is closed"); + }).await; + + drop(fetch_stream_sender); // dropping the channel makes the recieving task stop once the queue is empty. + + std::future::pending().await + } => {}, // apply blocks and updates sequentially _ = async { let mut block_n = first_block; diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index 117836c71..a15919eb0 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -41,7 +41,7 @@ pub mod starknet_sync_worker { let _ = tokio::join!( l1::sync(l1_url.clone()), - l2::sync(sender_config, fetch_config.clone(), starting_block.into(), client) + l2::sync(sender_config, fetch_config.clone(), starting_block.into(), None, client) ); } }