diff --git a/eden/mononoke/scs/scs_methods/src/methods/commit_path.rs b/eden/mononoke/scs/scs_methods/src/methods/commit_path.rs index cd0ec8155a1e0..d37a8be8debd6 100644 --- a/eden/mononoke/scs/scs_methods/src/methods/commit_path.rs +++ b/eden/mononoke/scs/scs_methods/src/methods/commit_path.rs @@ -15,8 +15,11 @@ use bytes::Bytes; use context::CoreContext; use dedupmap::DedupMap; use futures::future; +use futures::stream; use futures::stream::TryStreamExt; use futures::try_join; +use futures::StreamExt; +use futures_ext::FbStreamExt; use futures_watchdog::WatchdogExt; use maplit::btreeset; use mononoke_api::ChangesetPathHistoryOptions; @@ -183,7 +186,8 @@ impl SourceControlServiceImpl { ) -> Result { match params.format { thrift::BlameFormat::COMPACT => { - self.commit_path_blame_compact(ctx, commit_path, params) + self.commit_path_blame_compact(ctx.clone(), commit_path, params) + .watched(ctx.logger()) .await } other_format => Err(scs_errors::invalid_request(format!( @@ -200,9 +204,15 @@ impl SourceControlServiceImpl { commit_path: thrift::CommitPathSpecifier, params: thrift::CommitPathBlameParams, ) -> Result { - let (repo, changeset) = self.repo_changeset(ctx, &commit_path.commit).await?; + let (repo, changeset) = self + .repo_changeset(ctx.clone(), &commit_path.commit) + .watched(ctx.logger()) + .await?; borrowed!(repo); - let path = changeset.path_with_history(&commit_path.path).await?; + let path = changeset + .path_with_history(&commit_path.path) + .watched(ctx.logger()) + .await?; let options = params.format_options.unwrap_or_else(|| { btreeset! { thrift::BlameFormatOption::INCLUDE_CONTENTS } @@ -247,9 +257,16 @@ impl SourceControlServiceImpl { // Fetch the blame, and optionally its associated content. let (blame, content) = if option_include_contents { - path.blame_with_content(follow_mutable_file_history).await? + path.blame_with_content(follow_mutable_file_history) + .watched(ctx.logger()) + .await? } else { - (path.blame(follow_mutable_file_history).await?, Bytes::new()) + ( + path.blame(follow_mutable_file_history) + .watched(ctx.logger()) + .await?, + Bytes::new(), + ) }; // Map all the changeset IDs into the requested identity schemes. Keep a mapping of @@ -264,7 +281,9 @@ impl SourceControlServiceImpl { .map(|(csid, _)| *csid) .collect::>(); let mut mapped_commit_ids = - map_commit_identities(repo, csids.clone(), ¶ms.identity_schemes).await?; + map_commit_identities(repo, csids.clone(), ¶ms.identity_schemes) + .watched(ctx.logger()) + .await?; for (id, num) in csids_and_nums { if let Some(mapped_ids) = mapped_commit_ids.remove(&id) { let index = commit_ids.len(); @@ -291,13 +310,17 @@ impl SourceControlServiceImpl { Ok::<_, MononokeError>((*csid, (author, date, message, title))) })) + .watched(ctx.logger()) .await? .into_iter() .collect(); // Collect parent information for each changeset if requested. let parent_commit_ids = if option_include_parent { - let changeset_parents = repo.many_changeset_parents(csids.clone()).await?; + let changeset_parents = repo + .many_changeset_parents(csids.clone()) + .watched(ctx.logger()) + .await?; let all_parent_csids = changeset_parents .iter() .flat_map(|(_, parents)| parents) @@ -306,7 +329,9 @@ impl SourceControlServiceImpl { .copied() .collect::>(); let parent_commit_ids_map = - map_commit_identities(repo, all_parent_csids, ¶ms.identity_schemes).await?; + map_commit_identities(repo, all_parent_csids, ¶ms.identity_schemes) + .watched(ctx.logger()) + .await?; let mut parent_commit_ids = Vec::with_capacity(indexed_csids.len()); for csid in indexed_csids { let parents = changeset_parents.get(&csid).ok_or_else(|| { @@ -337,8 +362,9 @@ impl SourceControlServiceImpl { let lines = blame .lines() - .map_err(|e| MononokeError::InvalidRequest(e.to_string()))? - .enumerate() + .map_err(|e| MononokeError::InvalidRequest(e.to_string()))?; + let lines = stream::iter(lines.enumerate()) + .yield_periodically() .map(|(line, blame_line)| -> Result<_, thrift::RequestError> { let commit_id_index = commit_id_indexes @@ -392,7 +418,8 @@ impl SourceControlServiceImpl { } Ok(thrift_blame_line) }) - .collect::, _>>()?; + .try_collect::>() + .await?; let paths = paths.into_items(); let authors = authors.into_items();