Skip to content

Commit

Permalink
fix(sync): Fix end condition of the l2 sync
Browse files Browse the repository at this point in the history
  • Loading branch information
cchudant committed Apr 18, 2024
1 parent d76e81d commit ae10f35
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ git # Deoxys Changelog

## Next release

- fix(sync): Fix end condition of the l2 sync
- fix(class): Fix Sierra classes conversion (missing abis)
- fix(compute): Fixed prepare_data_availability_modes computation
- feat(rpc): add pending block to `get_block_with_receipts` rpc call
Expand Down
17 changes: 12 additions & 5 deletions crates/client/sync/src/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ pub struct SenderConfig {
}

/// Spawns workers to fetch blocks and state updates from the feeder.
pub async fn sync<C>(mut sender_config: SenderConfig, fetch_config: FetchConfig, first_block: u64, client: Arc<C>)
/// `n_blocks` is optionally the total number of blocks to sync, for debugging/benchmark purposes.
pub async fn sync<C>(mut sender_config: SenderConfig, fetch_config: FetchConfig, first_block: u64, n_blocks: Option<usize>, client: Arc<C>)
where
C: HeaderBackend<DBlockT> + 'static,
{
Expand Down Expand Up @@ -166,7 +167,7 @@ where
}
});
// Have 10 fetches in parallel at once, using futures Buffered
let fetch_stream = stream::iter(fetch_stream).buffered(10);
let fetch_stream = stream::iter(fetch_stream.take(n_blocks.unwrap_or(usize::MAX))).buffered(10);
let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(10);

tokio::select!(
Expand All @@ -182,9 +183,15 @@ where
}
} => {},
// fetch blocks and updates in parallel
_ = fetch_stream.for_each(|val| async {
fetch_stream_sender.send(val).await.expect("receiver is closed");
}) => {},
_ = async {
fetch_stream.for_each(|val| async {
fetch_stream_sender.send(val).await.expect("receiver is closed");
}).await;

drop(fetch_stream_sender); // dropping the channel makes the recieving task stop once the queue is empty.

std::future::pending().await
} => {},
// apply blocks and updates sequentially
_ = async {
let mut block_n = first_block;
Expand Down
2 changes: 1 addition & 1 deletion crates/client/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub mod starknet_sync_worker {

let _ = tokio::join!(
l1::sync(l1_url.clone()),
l2::sync(sender_config, fetch_config.clone(), starting_block.into(), client)
l2::sync(sender_config, fetch_config.clone(), starting_block.into(), None, client)
);
}
}

0 comments on commit ae10f35

Please sign in to comment.