From 23ae84230f73fddbd8674b2a01f9fc14ccc015a3 Mon Sep 17 00:00:00 2001 From: Luisa Vasquez Gomez Date: Thu, 12 Dec 2024 07:10:03 -0800 Subject: [PATCH] modern sync: properly track mutable counters Summary: I wanted to have a stream of continuous change sets but turns out it was quite tricky to add a "break" in between streams to make the bookmark update log update, so switching to the old boring iteration. I also hid the mutable counters update behind an arg given that I can see a scenario where we run the sync locally in case prod is stuck and I don't want conflicting updates between those two runs and for prod to be the only SoT, if we need to update we can use monad Reviewed By: markbt Differential Revision: D67092014 fbshipit-source-id: 759600997e9b065b55a2e4fff51a2d166a25250f --- eden/mononoke/modern_sync/src/bul_util.rs | 23 ---------- eden/mononoke/modern_sync/src/main.rs | 10 +++-- eden/mononoke/modern_sync/src/sync.rs | 45 ++++++++++++------- eden/mononoke/tests/integration/library.sh | 1 + .../modern_sync/test-modern-sync.t | 2 + 5 files changed, 37 insertions(+), 44 deletions(-) diff --git a/eden/mononoke/modern_sync/src/bul_util.rs b/eden/mononoke/modern_sync/src/bul_util.rs index ba6d5b47c4d9c..79457e0f6dcfd 100644 --- a/eden/mononoke/modern_sync/src/bul_util.rs +++ b/eden/mononoke/modern_sync/src/bul_util.rs @@ -15,12 +15,10 @@ use bookmarks::BookmarkUpdateLogEntry; use bookmarks::BookmarkUpdateLogId; use bookmarks::Freshness; use cloned::cloned; -use commit_graph::CommitGraph; use context::CoreContext; use futures::stream; use futures::stream::StreamExt; use futures::stream::TryStreamExt; -use mononoke_types::ChangesetId; use crate::sync::ExecutionType; @@ -74,24 +72,3 @@ pub async fn get_one_entry( stream::iter(entries) } - -/// Takes a vec of BookmarkUpdateLogEntry and returns a stream of all the ChangesetIds in these movements -pub async fn get_commit_stream( - entries: Vec, - commit_graph: Arc, - ctx: &CoreContext, -) -> impl stream::Stream> + '_ { - let entries_stream = stream::iter(entries); - entries_stream - .then(move |entry| { - cloned!(ctx, commit_graph); - async move { - let from = entry.from_changeset_id.map_or(vec![], |val| vec![val]); - let to = entry.to_changeset_id.map_or(vec![], |val| vec![val]); - commit_graph - .ancestors_difference_stream(&ctx, to, from) - .await - } - }) - .try_flatten() -} diff --git a/eden/mononoke/modern_sync/src/main.rs b/eden/mononoke/modern_sync/src/main.rs index a4a1f4609311c..f39173e48fd03 100644 --- a/eden/mononoke/modern_sync/src/main.rs +++ b/eden/mononoke/modern_sync/src/main.rs @@ -47,11 +47,13 @@ struct ModernSyncArgs { #[clap(flatten)] tls_params: Option, - #[clap( - long, - help = "Dest repo name (in case it's different from source repo name)" - )] + #[clap(long)] + /// "Dest repo name (in case it's different from source repo name)" dest_repo_name: Option, + + #[clap(long)] + /// Update counters in the source repo (prod and tests only) + update_counters: bool, } #[facet::container] diff --git a/eden/mononoke/modern_sync/src/sync.rs b/eden/mononoke/modern_sync/src/sync.rs index 9ad3cac77b6e0..1d8ee79415ece 100644 --- a/eden/mononoke/modern_sync/src/sync.rs +++ b/eden/mononoke/modern_sync/src/sync.rs @@ -18,7 +18,7 @@ use changeset_info::ChangesetInfo; use clientinfo::ClientEntryPoint; use clientinfo::ClientInfo; use cloned::cloned; -use commit_graph::CommitGraphArc; +use commit_graph::CommitGraphRef; use context::CoreContext; use context::SessionContainer; use futures::StreamExt; @@ -121,11 +121,10 @@ pub async fn sync( ) })? }; - + let app_args = app.args::()?; let sender: Arc = if dry_run { Arc::new(DummySender::new(logger.clone())) } else { - let app_args = app.args::()?; let url = if let Some(socket) = app_args.dest_socket { // Only for integration tests format!("{}:{}/edenapi/", &config.url, socket) @@ -178,23 +177,35 @@ pub async fn sync( Err(e) } Ok(entries) => { - // TODO: We probably want to get these in inverse order so once we derive the top parent - // the children will already be derived. - bul_util::get_commit_stream(entries, repo.commit_graph_arc(), ctx) - .await - .fuse() - .try_next_step(move |cs_id| { - cloned!(ctx, repo, logger, sender); - async move { - process_one_changeset(&cs_id, &ctx, repo, &logger, sender, false) + for entry in entries { + let from = entry.from_changeset_id.into_iter().collect(); + let to = entry.to_changeset_id.into_iter().collect(); + + repo.commit_graph() + .ancestors_difference_stream(ctx, to, from) + .await? + .fuse() + .try_next_step(|cs_id| { + cloned!(ctx, repo, logger, sender); + async move { + process_one_changeset( + &cs_id, &ctx, repo, &logger, sender, false, + ) .await - } - }) - .try_collect::<()>() - .await + } + }) + .try_collect::<()>() + .await?; + + if app_args.update_counters { + repo.mutable_counters() + .set_counter(ctx, MODERN_SYNC_COUNTER_NAME, entry.id.0 as i64, None) + .await?; + } + } + Ok(()) } } - // TODO Update counter after processing one entry } }) .try_collect::<()>() diff --git a/eden/mononoke/tests/integration/library.sh b/eden/mononoke/tests/integration/library.sh index a121d3391b8ae..b0e43a29f8e58 100755 --- a/eden/mononoke/tests/integration/library.sh +++ b/eden/mononoke/tests/integration/library.sh @@ -350,6 +350,7 @@ function mononoke_modern_sync { "${COMMON_ARGS[@]}" \ --repo-name "$ORIG_REPO" \ --dest-repo-name "$DEST_REPO" \ + --update-counters \ --mononoke-config-path "$TESTTMP/mononoke-config" \ --dest-socket $MONONOKE_SOCKET \ --tls-ca "$TEST_CERTDIR/root-ca.crt" \ diff --git a/eden/mononoke/tests/integration/modern_sync/test-modern-sync.t b/eden/mononoke/tests/integration/modern_sync/test-modern-sync.t index 18092a53db94e..d5345544bcd6c 100644 --- a/eden/mononoke/tests/integration/modern_sync/test-modern-sync.t +++ b/eden/mononoke/tests/integration/modern_sync/test-modern-sync.t @@ -139,6 +139,8 @@ Sync all bookmarks moves File HgFileNodeId(HgNodeHash(Sha1(35e7525ce3a48913275d7061dd9a867ffef1e34d))) File HgFileNodeId(HgNodeHash(Sha1(778675f9ec8d35ff2fce23a34f68edd15d783853))) + $ mononoke_admin mutable-counters --repo-name orig get modern_sync + Some(2) $ cat $TESTTMP/modern_sync_scuba_logs | jq | rg "start_id|dry_run|repo" "start_id": 0, "dry_run": "false",