Skip to content

Commit

Permalink
yield periodically in a tight and expensive loop
Browse files Browse the repository at this point in the history
Summary:
## This stack

We noticed that the poll time is pretty high on some async methods, and that is affecting other calls as well. Let's try to optimize that.

## This diff

This ends up mattering a lot for some requests.

Reviewed By: singhsrb

Differential Revision: D67138693

fbshipit-source-id: f27e2a4fdf55d6f76a894362c8f94ac6c4f7cbd7
  • Loading branch information
andreacampi authored and facebook-github-bot committed Dec 13, 2024
1 parent a95b41e commit b59af8c
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions eden/mononoke/mononoke_api/src/sparse_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use futures::try_join;
use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures_ext::FbStreamExt;
use futures_watchdog::WatchdogExt;
use maplit::btreeset;
use metaconfig_types::SparseProfilesConfig;
use mononoke_types::fsnode::FsnodeEntry;
Expand Down Expand Up @@ -234,19 +236,24 @@ impl SparseProfileMonitoring {
}

pub(crate) async fn fetch<R: MononokeRepo>(
_ctx: &CoreContext,
ctx: &CoreContext,
path: String,
changeset: &ChangesetContext<R>,
) -> Result<Option<Vec<u8>>> {
let path: &str = &path;
let path = NonRootMPath::try_from(path)?;
let path_with_content = changeset.path_with_content(path.clone()).await?;
let path_with_content = changeset
.path_with_content(path.clone())
.watched(ctx.logger())
.await?;
let file_ctx = path_with_content
.file()
.watched(ctx.logger())
.await?
.ok_or_else(|| anyhow!("Sparse profile {} not found", path))?;
file_ctx
.content_concat()
.watched(ctx.logger())
.await
.with_context(|| format!("Couldn't fetch content of {path}"))
.map(|b| Some(b.to_vec()))
Expand All @@ -258,13 +265,16 @@ async fn create_matchers<R: MononokeRepo>(
paths: Vec<NonRootMPath>,
) -> Result<HashMap<String, Arc<dyn Matcher + Send + Sync>>> {
stream::iter(paths)
.yield_periodically()
.with_logger(ctx.logger())
.map(|path| async move {
let content = format!("%include {path}");
let dummy_source = "repo_root".to_string();
let profile = sparse::Root::from_bytes(content.as_bytes(), dummy_source)
.with_context(|| format!("while constructing Profile for source {path}"))?;
let matcher = profile
.matcher(|path| fetch(ctx, path, changeset))
.watched(ctx.logger())
.await
.with_context(|| format!("While constructing matcher for source {path}"))?;
anyhow::Ok((
Expand All @@ -274,6 +284,7 @@ async fn create_matchers<R: MononokeRepo>(
})
.buffer_unordered(100)
.try_collect()
.watched(ctx.logger())
.await
}

Expand Down Expand Up @@ -450,8 +461,12 @@ pub async fn get_profile_delta_size<R: MononokeRepo>(
other: &ChangesetContext<R>,
paths: Vec<NonRootMPath>,
) -> Result<HashMap<String, ProfileSizeChange>, MononokeError> {
let matchers = create_matchers(ctx, current, paths).await?;
calculate_delta_size(ctx, monitor, current, other, matchers).await
let matchers = create_matchers(ctx, current, paths)
.watched(ctx.logger())
.await?;
calculate_delta_size(ctx, monitor, current, other, matchers)
.watched(ctx.logger())
.await
}

pub async fn calculate_delta_size<'a, R: MononokeRepo>(
Expand Down

0 comments on commit b59af8c

Please sign in to comment.