Skip to content

Commit

Permalink
Back out "gitimport: Run derivation concurrently with import"
Browse files Browse the repository at this point in the history
Summary:
Original commit changeset: 07e1a5311001

Original Phabricator Diff: D55203295

Reason:
I tried it on `chromium/src` and while it went well for a couple of batches of 1000 commits, it eventually started hanging at the end of batches again, and for much longer than before. My guess is that trying to write to the bonsai git mapping dbs while we're also reading from them slows down the process overall as we're racing for the lock. I will backout of this diff for now. To re-introduce it, we will first need to batch the db reads.

Reviewed By: mzr

Differential Revision: D55245809

fbshipit-source-id: 4ff8e849fdcbb9f974a43a3336ce87d1c44ab650
  • Loading branch information
Pierre Chevalier authored and facebook-github-bot committed Mar 22, 2024
1 parent e6056cb commit 87b2cfe
Showing 1 changed file with 6 additions and 29 deletions.
35 changes: 6 additions & 29 deletions eden/mononoke/git/import_tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,26 +283,8 @@ pub async fn gitimport_acc<Uploader: GitUploader>(
let acc = RwLock::new(GitimportAccumulator::new());
let backfill_derivation = prefs.backfill_derivation.clone();

// Create a channel for sending batches to the finalize_batch task
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<_>>(1);
// Spawn a separate task for running finalize_batch
let finalize_batch_task = async {
tokio::spawn({
cloned!(backfill_derivation, ctx, uploader);
async move {
while let Some(batch) = rx.recv().await {
uploader
.finalize_batch(&ctx, dry_run, backfill_derivation.clone(), batch)
.await?;
}
Ok::<(), anyhow::Error>(())
}
})
.await?
};

// Kick off a stream that consumes the walk and prepared commits. Then, produce the Bonsais.
let import_task = target
target
.list_commits(&prefs.git_command_path, path)
.await?
.try_filter_map({
Expand Down Expand Up @@ -466,17 +448,12 @@ pub async fn gitimport_acc<Uploader: GitUploader>(
.chunks(prefs.concurrency)
// Go from Vec<Result<X,Y>> -> Result<Vec<X>,Y>
.map(|v| v.into_iter().collect::<Result<Vec<_>, Error>>())
.try_fold(tx, |tx, v| {
async move {
tx.send(v).await?;
Ok(tx)
}
.try_for_each(|v| async {
cloned!(backfill_derivation, ctx, uploader);
task::spawn(async move { uploader.finalize_batch(&ctx, dry_run, backfill_derivation, v).await }).await?
})
.and_then(|tx| async move {
std::mem::drop(tx);
Ok(())
});
tokio::try_join!(import_task, finalize_batch_task)?;
.await?;

debug!(ctx.logger(), "Completed git import for repo {}.", repo_name);
Ok(acc.into_inner().expect("lock poisoned"))
}
Expand Down

0 comments on commit 87b2cfe

Please sign in to comment.