From b59af8cac6a2c3e5f8b1404eec94d523198264f8 Mon Sep 17 00:00:00 2001 From: Andrea Campi Date: Fri, 13 Dec 2024 03:37:00 -0800 Subject: [PATCH] yield periodically in a tight and expensive loop Summary: ## This stack We noticed that the poll time is pretty high on some async methods, and that is affecting other calls as well. Let's try to optimize that. ## This diff This ends up mattering a lot for some requests. Reviewed By: singhsrb Differential Revision: D67138693 fbshipit-source-id: f27e2a4fdf55d6f76a894362c8f94ac6c4f7cbd7 --- .../mononoke_api/src/sparse_profile.rs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/eden/mononoke/mononoke_api/src/sparse_profile.rs b/eden/mononoke/mononoke_api/src/sparse_profile.rs index c9af39817562c..5f601b484a830 100644 --- a/eden/mononoke/mononoke_api/src/sparse_profile.rs +++ b/eden/mononoke/mononoke_api/src/sparse_profile.rs @@ -21,6 +21,8 @@ use futures::try_join; use futures::FutureExt; use futures::StreamExt; use futures::TryStreamExt; +use futures_ext::FbStreamExt; +use futures_watchdog::WatchdogExt; use maplit::btreeset; use metaconfig_types::SparseProfilesConfig; use mononoke_types::fsnode::FsnodeEntry; @@ -234,19 +236,24 @@ impl SparseProfileMonitoring { } pub(crate) async fn fetch( - _ctx: &CoreContext, + ctx: &CoreContext, path: String, changeset: &ChangesetContext, ) -> Result>> { let path: &str = &path; let path = NonRootMPath::try_from(path)?; - let path_with_content = changeset.path_with_content(path.clone()).await?; + let path_with_content = changeset + .path_with_content(path.clone()) + .watched(ctx.logger()) + .await?; let file_ctx = path_with_content .file() + .watched(ctx.logger()) .await? .ok_or_else(|| anyhow!("Sparse profile {} not found", path))?; file_ctx .content_concat() + .watched(ctx.logger()) .await .with_context(|| format!("Couldn't fetch content of {path}")) .map(|b| Some(b.to_vec())) @@ -258,6 +265,8 @@ async fn create_matchers( paths: Vec, ) -> Result>> { stream::iter(paths) + .yield_periodically() + .with_logger(ctx.logger()) .map(|path| async move { let content = format!("%include {path}"); let dummy_source = "repo_root".to_string(); @@ -265,6 +274,7 @@ async fn create_matchers( .with_context(|| format!("while constructing Profile for source {path}"))?; let matcher = profile .matcher(|path| fetch(ctx, path, changeset)) + .watched(ctx.logger()) .await .with_context(|| format!("While constructing matcher for source {path}"))?; anyhow::Ok(( @@ -274,6 +284,7 @@ async fn create_matchers( }) .buffer_unordered(100) .try_collect() + .watched(ctx.logger()) .await } @@ -450,8 +461,12 @@ pub async fn get_profile_delta_size( other: &ChangesetContext, paths: Vec, ) -> Result, MononokeError> { - let matchers = create_matchers(ctx, current, paths).await?; - calculate_delta_size(ctx, monitor, current, other, matchers).await + let matchers = create_matchers(ctx, current, paths) + .watched(ctx.logger()) + .await?; + calculate_delta_size(ctx, monitor, current, other, matchers) + .watched(ctx.logger()) + .await } pub async fn calculate_delta_size<'a, R: MononokeRepo>(