Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): Fix end condition of the l2 sync #63

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ git # Deoxys Changelog

## Next release

- fix(sync): Fix end condition of the l2 sync
- fix(rpc): fix chain id method for mainnet
- fix(class): Fix Sierra classes conversion (missing abis)
- fix(compute): Fixed prepare_data_availability_modes computation
Expand Down
24 changes: 18 additions & 6 deletions crates/client/sync/src/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,14 @@ pub struct SenderConfig {
}

/// Spawns workers to fetch blocks and state updates from the feeder.
pub async fn sync<C>(mut sender_config: SenderConfig, fetch_config: FetchConfig, first_block: u64, client: Arc<C>)
where
/// `n_blocks` is optionally the total number of blocks to sync, for debugging/benchmark purposes.
pub async fn sync<C>(
mut sender_config: SenderConfig,
fetch_config: FetchConfig,
first_block: u64,
n_blocks: Option<usize>,
client: Arc<C>,
) where
C: HeaderBackend<DBlockT> + 'static,
{
let SenderConfig { block_sender, state_update_sender, class_sender, command_sink, overrides } = &mut sender_config;
Expand Down Expand Up @@ -166,7 +172,7 @@ where
}
});
// Have 10 fetches in parallel at once, using futures Buffered
let fetch_stream = stream::iter(fetch_stream).buffered(10);
let fetch_stream = stream::iter(fetch_stream.take(n_blocks.unwrap_or(usize::MAX))).buffered(10);
let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(10);

tokio::select!(
Expand All @@ -182,9 +188,15 @@ where
}
} => {},
// fetch blocks and updates in parallel
_ = fetch_stream.for_each(|val| async {
fetch_stream_sender.send(val).await.expect("receiver is closed");
}) => {},
_ = async {
fetch_stream.for_each(|val| async {
fetch_stream_sender.send(val).await.expect("receiver is closed");
}).await;

drop(fetch_stream_sender); // dropping the channel makes the recieving task stop once the queue is empty.

std::future::pending().await
} => {},
// apply blocks and updates sequentially
_ = async {
let mut block_n = first_block;
Expand Down
2 changes: 1 addition & 1 deletion crates/client/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub mod starknet_sync_worker {

let _ = tokio::join!(
l1::sync(l1_url.clone()),
l2::sync(sender_config, fetch_config.clone(), starting_block.into(), client)
l2::sync(sender_config, fetch_config.clone(), starting_block.into(), None, client)
);
}
}
Loading