Skip to content

Commit

Permalink
Optimize commit_path_blame_compact by making the core part a stream
Browse files Browse the repository at this point in the history
Summary:
## This stack

Track and down methods that are holding up the reactor and optimize them.

## This diff

The `commit_path_blame` method often shows up with a max poll time above a second; make the core logic streaming, add implicit yields.

Reviewed By: markbt

Differential Revision: D66820663

fbshipit-source-id: c9b67ab264f4bf3f55385a603b82a647c203161b
  • Loading branch information
andreacampi authored and facebook-github-bot committed Dec 6, 2024
1 parent 0a17ac4 commit 5ccdbce
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions eden/mononoke/scs/scs_methods/src/methods/commit_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ use bytes::Bytes;
use context::CoreContext;
use dedupmap::DedupMap;
use futures::future;
use futures::stream;
use futures::stream::TryStreamExt;
use futures::try_join;
use futures::StreamExt;
use futures_ext::FbStreamExt;
use futures_watchdog::WatchdogExt;
use maplit::btreeset;
use mononoke_api::ChangesetPathHistoryOptions;
Expand Down Expand Up @@ -183,7 +186,8 @@ impl SourceControlServiceImpl {
) -> Result<thrift::CommitPathBlameResponse, scs_errors::ServiceError> {
match params.format {
thrift::BlameFormat::COMPACT => {
self.commit_path_blame_compact(ctx, commit_path, params)
self.commit_path_blame_compact(ctx.clone(), commit_path, params)
.watched(ctx.logger())
.await
}
other_format => Err(scs_errors::invalid_request(format!(
Expand All @@ -200,9 +204,15 @@ impl SourceControlServiceImpl {
commit_path: thrift::CommitPathSpecifier,
params: thrift::CommitPathBlameParams,
) -> Result<thrift::CommitPathBlameResponse, scs_errors::ServiceError> {
let (repo, changeset) = self.repo_changeset(ctx, &commit_path.commit).await?;
let (repo, changeset) = self
.repo_changeset(ctx.clone(), &commit_path.commit)
.watched(ctx.logger())
.await?;
borrowed!(repo);
let path = changeset.path_with_history(&commit_path.path).await?;
let path = changeset
.path_with_history(&commit_path.path)
.watched(ctx.logger())
.await?;

let options = params.format_options.unwrap_or_else(|| {
btreeset! { thrift::BlameFormatOption::INCLUDE_CONTENTS }
Expand Down Expand Up @@ -247,9 +257,16 @@ impl SourceControlServiceImpl {

// Fetch the blame, and optionally its associated content.
let (blame, content) = if option_include_contents {
path.blame_with_content(follow_mutable_file_history).await?
path.blame_with_content(follow_mutable_file_history)
.watched(ctx.logger())
.await?
} else {
(path.blame(follow_mutable_file_history).await?, Bytes::new())
(
path.blame(follow_mutable_file_history)
.watched(ctx.logger())
.await?,
Bytes::new(),
)
};

// Map all the changeset IDs into the requested identity schemes. Keep a mapping of
Expand All @@ -264,7 +281,9 @@ impl SourceControlServiceImpl {
.map(|(csid, _)| *csid)
.collect::<Vec<_>>();
let mut mapped_commit_ids =
map_commit_identities(repo, csids.clone(), &params.identity_schemes).await?;
map_commit_identities(repo, csids.clone(), &params.identity_schemes)
.watched(ctx.logger())
.await?;
for (id, num) in csids_and_nums {
if let Some(mapped_ids) = mapped_commit_ids.remove(&id) {
let index = commit_ids.len();
Expand All @@ -291,13 +310,17 @@ impl SourceControlServiceImpl {

Ok::<_, MononokeError>((*csid, (author, date, message, title)))
}))
.watched(ctx.logger())
.await?
.into_iter()
.collect();

// Collect parent information for each changeset if requested.
let parent_commit_ids = if option_include_parent {
let changeset_parents = repo.many_changeset_parents(csids.clone()).await?;
let changeset_parents = repo
.many_changeset_parents(csids.clone())
.watched(ctx.logger())
.await?;
let all_parent_csids = changeset_parents
.iter()
.flat_map(|(_, parents)| parents)
Expand All @@ -306,7 +329,9 @@ impl SourceControlServiceImpl {
.copied()
.collect::<Vec<_>>();
let parent_commit_ids_map =
map_commit_identities(repo, all_parent_csids, &params.identity_schemes).await?;
map_commit_identities(repo, all_parent_csids, &params.identity_schemes)
.watched(ctx.logger())
.await?;
let mut parent_commit_ids = Vec::with_capacity(indexed_csids.len());
for csid in indexed_csids {
let parents = changeset_parents.get(&csid).ok_or_else(|| {
Expand Down Expand Up @@ -337,8 +362,9 @@ impl SourceControlServiceImpl {

let lines = blame
.lines()
.map_err(|e| MononokeError::InvalidRequest(e.to_string()))?
.enumerate()
.map_err(|e| MononokeError::InvalidRequest(e.to_string()))?;
let lines = stream::iter(lines.enumerate())
.yield_periodically()
.map(|(line, blame_line)| -> Result<_, thrift::RequestError> {
let commit_id_index =
commit_id_indexes
Expand Down Expand Up @@ -392,7 +418,8 @@ impl SourceControlServiceImpl {
}
Ok(thrift_blame_line)
})
.collect::<Result<Vec<_>, _>>()?;
.try_collect::<Vec<_>>()
.await?;

let paths = paths.into_items();
let authors = authors.into_items();
Expand Down

0 comments on commit 5ccdbce

Please sign in to comment.