diff --git a/eden/mononoke/mononoke_api/src/sparse_profile.rs b/eden/mononoke/mononoke_api/src/sparse_profile.rs index c9af39817562c..5f601b484a830 100644 --- a/eden/mononoke/mononoke_api/src/sparse_profile.rs +++ b/eden/mononoke/mononoke_api/src/sparse_profile.rs @@ -21,6 +21,8 @@ use futures::try_join; use futures::FutureExt; use futures::StreamExt; use futures::TryStreamExt; +use futures_ext::FbStreamExt; +use futures_watchdog::WatchdogExt; use maplit::btreeset; use metaconfig_types::SparseProfilesConfig; use mononoke_types::fsnode::FsnodeEntry; @@ -234,19 +236,24 @@ impl SparseProfileMonitoring { } pub(crate) async fn fetch( - _ctx: &CoreContext, + ctx: &CoreContext, path: String, changeset: &ChangesetContext, ) -> Result>> { let path: &str = &path; let path = NonRootMPath::try_from(path)?; - let path_with_content = changeset.path_with_content(path.clone()).await?; + let path_with_content = changeset + .path_with_content(path.clone()) + .watched(ctx.logger()) + .await?; let file_ctx = path_with_content .file() + .watched(ctx.logger()) .await? .ok_or_else(|| anyhow!("Sparse profile {} not found", path))?; file_ctx .content_concat() + .watched(ctx.logger()) .await .with_context(|| format!("Couldn't fetch content of {path}")) .map(|b| Some(b.to_vec())) @@ -258,6 +265,8 @@ async fn create_matchers( paths: Vec, ) -> Result>> { stream::iter(paths) + .yield_periodically() + .with_logger(ctx.logger()) .map(|path| async move { let content = format!("%include {path}"); let dummy_source = "repo_root".to_string(); @@ -265,6 +274,7 @@ async fn create_matchers( .with_context(|| format!("while constructing Profile for source {path}"))?; let matcher = profile .matcher(|path| fetch(ctx, path, changeset)) + .watched(ctx.logger()) .await .with_context(|| format!("While constructing matcher for source {path}"))?; anyhow::Ok(( @@ -274,6 +284,7 @@ async fn create_matchers( }) .buffer_unordered(100) .try_collect() + .watched(ctx.logger()) .await } @@ -450,8 +461,12 @@ pub async fn get_profile_delta_size( other: &ChangesetContext, paths: Vec, ) -> Result, MononokeError> { - let matchers = create_matchers(ctx, current, paths).await?; - calculate_delta_size(ctx, monitor, current, other, matchers).await + let matchers = create_matchers(ctx, current, paths) + .watched(ctx.logger()) + .await?; + calculate_delta_size(ctx, monitor, current, other, matchers) + .watched(ctx.logger()) + .await } pub async fn calculate_delta_size<'a, R: MononokeRepo>(