Skip to content

Commit

Permalink
modern sync: properly track mutable counters
Browse files Browse the repository at this point in the history
Summary:
I wanted to have a stream of continuous change sets but turns out it was quite tricky to add a "break" in between streams to make the bookmark update log update, so switching to the old boring iteration.

I also hid the mutable counters update behind an arg given that I can see a scenario where we run the sync locally in case prod is stuck and I don't want conflicting updates between those two runs and for prod to be the only SoT, if we need to update we can use monad

Reviewed By: markbt

Differential Revision: D67092014

fbshipit-source-id: 759600997e9b065b55a2e4fff51a2d166a25250f
  • Loading branch information
lmvasquezg authored and facebook-github-bot committed Dec 12, 2024
1 parent df91c7f commit 23ae842
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 44 deletions.
23 changes: 0 additions & 23 deletions eden/mononoke/modern_sync/src/bul_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ use bookmarks::BookmarkUpdateLogEntry;
use bookmarks::BookmarkUpdateLogId;
use bookmarks::Freshness;
use cloned::cloned;
use commit_graph::CommitGraph;
use context::CoreContext;
use futures::stream;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use mononoke_types::ChangesetId;

use crate::sync::ExecutionType;

Expand Down Expand Up @@ -74,24 +72,3 @@ pub async fn get_one_entry(

stream::iter(entries)
}

/// Takes a vec of BookmarkUpdateLogEntry and returns a stream of all the ChangesetIds in these movements
pub async fn get_commit_stream(
entries: Vec<BookmarkUpdateLogEntry>,
commit_graph: Arc<CommitGraph>,
ctx: &CoreContext,
) -> impl stream::Stream<Item = Result<ChangesetId, Error>> + '_ {
let entries_stream = stream::iter(entries);
entries_stream
.then(move |entry| {
cloned!(ctx, commit_graph);
async move {
let from = entry.from_changeset_id.map_or(vec![], |val| vec![val]);
let to = entry.to_changeset_id.map_or(vec![], |val| vec![val]);
commit_graph
.ancestors_difference_stream(&ctx, to, from)
.await
}
})
.try_flatten()
}
10 changes: 6 additions & 4 deletions eden/mononoke/modern_sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ struct ModernSyncArgs {
#[clap(flatten)]
tls_params: Option<TLSArgs>,

#[clap(
long,
help = "Dest repo name (in case it's different from source repo name)"
)]
#[clap(long)]
/// "Dest repo name (in case it's different from source repo name)"
dest_repo_name: Option<String>,

#[clap(long)]
/// Update counters in the source repo (prod and tests only)
update_counters: bool,
}

#[facet::container]
Expand Down
45 changes: 28 additions & 17 deletions eden/mononoke/modern_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use changeset_info::ChangesetInfo;
use clientinfo::ClientEntryPoint;
use clientinfo::ClientInfo;
use cloned::cloned;
use commit_graph::CommitGraphArc;
use commit_graph::CommitGraphRef;
use context::CoreContext;
use context::SessionContainer;
use futures::StreamExt;
Expand Down Expand Up @@ -121,11 +121,10 @@ pub async fn sync(
)
})?
};

let app_args = app.args::<ModernSyncArgs>()?;
let sender: Arc<dyn ModernSyncSender + Send + Sync> = if dry_run {
Arc::new(DummySender::new(logger.clone()))
} else {
let app_args = app.args::<ModernSyncArgs>()?;
let url = if let Some(socket) = app_args.dest_socket {
// Only for integration tests
format!("{}:{}/edenapi/", &config.url, socket)
Expand Down Expand Up @@ -178,23 +177,35 @@ pub async fn sync(
Err(e)
}
Ok(entries) => {
// TODO: We probably want to get these in inverse order so once we derive the top parent
// the children will already be derived.
bul_util::get_commit_stream(entries, repo.commit_graph_arc(), ctx)
.await
.fuse()
.try_next_step(move |cs_id| {
cloned!(ctx, repo, logger, sender);
async move {
process_one_changeset(&cs_id, &ctx, repo, &logger, sender, false)
for entry in entries {
let from = entry.from_changeset_id.into_iter().collect();
let to = entry.to_changeset_id.into_iter().collect();

repo.commit_graph()
.ancestors_difference_stream(ctx, to, from)
.await?
.fuse()
.try_next_step(|cs_id| {
cloned!(ctx, repo, logger, sender);
async move {
process_one_changeset(
&cs_id, &ctx, repo, &logger, sender, false,
)
.await
}
})
.try_collect::<()>()
.await
}
})
.try_collect::<()>()
.await?;

if app_args.update_counters {
repo.mutable_counters()
.set_counter(ctx, MODERN_SYNC_COUNTER_NAME, entry.id.0 as i64, None)
.await?;
}
}
Ok(())
}
}
// TODO Update counter after processing one entry
}
})
.try_collect::<()>()
Expand Down
1 change: 1 addition & 0 deletions eden/mononoke/tests/integration/library.sh
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ function mononoke_modern_sync {
"${COMMON_ARGS[@]}" \
--repo-name "$ORIG_REPO" \
--dest-repo-name "$DEST_REPO" \
--update-counters \
--mononoke-config-path "$TESTTMP/mononoke-config" \
--dest-socket $MONONOKE_SOCKET \
--tls-ca "$TEST_CERTDIR/root-ca.crt" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ Sync all bookmarks moves
File HgFileNodeId(HgNodeHash(Sha1(35e7525ce3a48913275d7061dd9a867ffef1e34d)))
File HgFileNodeId(HgNodeHash(Sha1(778675f9ec8d35ff2fce23a34f68edd15d783853)))

$ mononoke_admin mutable-counters --repo-name orig get modern_sync
Some(2)
$ cat $TESTTMP/modern_sync_scuba_logs | jq | rg "start_id|dry_run|repo"
"start_id": 0,
"dry_run": "false",
Expand Down

0 comments on commit 23ae842

Please sign in to comment.