Skip to content

Commit

Permalink
Run more sync operations in parallel
Browse files Browse the repository at this point in the history
Summary:
## This stack

Track and down methods that are holding up the reactor and optimize them.

## This diff

This code is loading both sides of a diff from blobstore concurrently is the right thing. In practice, most of the cost comes from the Thrift deserialization, which is synchronous. Worse, `try_join` does not yield back to Tokio. The combination means blocking the reactor for 150-200ms or even longer.

With this diff we'll actually run them in parallel, saving both max poll time and quite likely wall clock time too.

D66761433 optimized the same logic for unordered diffs. This has shown large benefits: https://fburl.com/scuba/mononoke_scs_server/veyjnzzp

Reviewed By: singhsrb

Differential Revision: D66961595

fbshipit-source-id: 6247ffedc1eff6e266bc365475aeff00fa8495a8
  • Loading branch information
andreacampi authored and facebook-github-bot committed Dec 9, 2024
1 parent 7e39201 commit eed01da
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
35 changes: 21 additions & 14 deletions eden/mononoke/manifest/src/ordered_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::iter::Peekable;
use anyhow::Error;
use borrowed::borrowed;
use bounded_traversal::OrderedTraversal;
use cloned::cloned;
use context::CoreContext;
use futures::future;
use futures::future::FutureExt;
Expand All @@ -19,6 +20,7 @@ use futures::stream;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use futures_watchdog::WatchdogExt;
use mononoke_types::path::MPath;
use mononoke_types::MPathElement;
use nonzero_ext::nonzero;
Expand Down Expand Up @@ -354,11 +356,16 @@ where

match input {
Diff::Changed(path, left, right) => {
let (left_mf, right_mf) = future::try_join(
left.load(ctx, store),
right.load(ctx, other_store),
)
.await?;
let l = tokio::spawn({
cloned!(ctx, left, store);
async move { left.load(&ctx, &store).watched(ctx.logger()).await }
});
let r = tokio::spawn({
cloned!(ctx, right, other_store);
async move { right.load(&ctx, &other_store).watched(ctx.logger()).await }
});
let (left_mf, right_mf) = future::try_join(l, r).watched(ctx.logger()).await?;
let (left_mf, right_mf) = (left_mf?, right_mf?);

if after.include_self() {
push_output(
Expand All @@ -372,8 +379,8 @@ where
}

let iter = EntryDiffIterator::new(
left_mf.list_weighted(ctx, store).await?.try_collect::<Vec<_>>().await?.into_iter(),
right_mf.list_weighted(ctx, other_store).await?.try_collect::<Vec<_>>().await?.into_iter(),
left_mf.list_weighted(ctx, store).watched(ctx.logger()).await?.try_collect::<Vec<_>>().watched(ctx.logger()).await?.into_iter(),
right_mf.list_weighted(ctx, other_store).watched(ctx.logger()).await?.try_collect::<Vec<_>>().watched(ctx.logger()).await?.into_iter(),
);
for (name, left, right) in iter {
if after.skip(&name) || left == right {
Expand Down Expand Up @@ -492,9 +499,9 @@ where
Diff::Added(path.clone(), Entry::Tree(tree.clone())),
);
}
let manifest = tree.load(ctx, other_store).await?;
let mut stream = manifest.list_weighted(ctx, store).await?;
while let Some((name, entry)) = stream.try_next().await? {
let manifest = tree.load(ctx, other_store).watched(ctx.logger()).await?;
let mut stream = manifest.list_weighted(ctx, store).watched(ctx.logger()).await?;
while let Some((name, entry)) = stream.try_next().watched(ctx.logger()).await? {
if after.skip(&name) {
continue;
}
Expand Down Expand Up @@ -526,9 +533,9 @@ where
Diff::Removed(path.clone(), Entry::Tree(tree.clone())),
);
}
let manifest = tree.load(ctx, store).await?;
let mut stream = manifest.list_weighted(ctx, store).await?;
while let Some((name, entry)) = stream.try_next().await? {
let manifest = tree.load(ctx, store).watched(ctx.logger()).await?;
let mut stream = manifest.list_weighted(ctx, store).watched(ctx.logger()).await?;
while let Some((name, entry)) = stream.try_next().watched(ctx.logger()).await? {
if after.skip(&name) {
continue;
}
Expand Down Expand Up @@ -561,7 +568,7 @@ where
);

pin_mut!(s);
while let Some(value) = s.next().await {
while let Some(value) = s.next().watched(ctx.logger()).await {
yield value;
}
})
Expand Down
5 changes: 5 additions & 0 deletions eden/mononoke/mononoke_api/src/changeset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use futures::stream::BoxStream;
use futures::stream::Stream;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use futures_ext::FbStreamExt;
use futures_lazy_shared::LazyShared;
use futures_watchdog::WatchdogExt;
use git_types::MappedGitCommitId;
Expand Down Expand Up @@ -1350,7 +1351,10 @@ impl<R: MononokeRepo> ChangesetContext<R> {
let diff_trees = diff_items.contains(&ChangesetDiffItem::TREES);

self.find_entries(to_vec1(path_restrictions), ordering)
.watched(self.ctx().logger())
.await?
.yield_periodically()
.with_logger(self.ctx().logger())
.try_filter_map(|(path, entry)| async move {
match (path.into_optional_non_root_path(), entry) {
(Some(mpath), ManifestEntry::Leaf(_)) if diff_files => Ok(Some(mpath)),
Expand All @@ -1367,6 +1371,7 @@ impl<R: MononokeRepo> ChangesetContext<R> {
))
})
.try_collect::<Vec<_>>()
.watched(self.ctx().logger())
.await
}

Expand Down

0 comments on commit eed01da

Please sign in to comment.