From b44f6790fcb2ff8c71752e35cf85e086b19f2710 Mon Sep 17 00:00:00 2001 From: Mark Juggurnauth-Thomas Date: Wed, 6 Dec 2023 06:44:45 -0800 Subject: [PATCH] edenapi: translate segment ids in a stream Summary: Currently we do all the work of building the segment graph and translating the segments into Mercurial hashes up-front on the server before sending the whole set of segments backs to the client at once. This makes it appear to the client that the server has hung, until suddenly they receive the entire response. We can't currently build the whole graph in a streaming fashion, as early segments may rely on much later segments in order to determine their parent's locations (consider, for example, a merge commit near the head of a large repo that merges in a small disjoint subrepo - we won't be able to yield that merge commit's segment until we get round to processing the small disjoint subrepo, which will be much later on). However, we can at least stream at the point we start translating commit ids from Bonsai to Mercurial. A more general solution would be to change the streaming API to allow parent locations to be deferred and generated later on (when the parent segment is generated), however this would involve larger changes, and is more effort than necessary for just changing how streaming works. If the server-side memory requirements for buffering make it necessary to do this, we can implement a `graphsegments2` method that works this way. Differential Revision: D51749060 fbshipit-source-id: 9f7af439fe121f259cd52dd145a2b3767ceaf8b3 --- eden/mononoke/edenapi_service/Cargo.toml | 1 + eden/mononoke/edenapi_service/TARGETS | 1 + .../edenapi_service/src/handlers/commit.rs | 50 +++---- eden/mononoke/mononoke_api_hg/src/repo.rs | 125 ++++++++---------- 4 files changed, 86 insertions(+), 91 deletions(-) diff --git a/eden/mononoke/edenapi_service/Cargo.toml b/eden/mononoke/edenapi_service/Cargo.toml index 8d164558a449a..81e8fad68c35e 100644 --- a/eden/mononoke/edenapi_service/Cargo.toml +++ b/eden/mononoke/edenapi_service/Cargo.toml @@ -9,6 +9,7 @@ license = "GPLv2+" [dependencies] anyhow = "1.0.75" +async-stream = "0.3" async-trait = "0.1.71" base64 = "0.13" blobstore = { version = "0.1.0", path = "../blobstore" } diff --git a/eden/mononoke/edenapi_service/TARGETS b/eden/mononoke/edenapi_service/TARGETS index c18913a011e1d..20a3c58b029fe 100644 --- a/eden/mononoke/edenapi_service/TARGETS +++ b/eden/mononoke/edenapi_service/TARGETS @@ -7,6 +7,7 @@ rust_library( srcs = glob(["src/**/*.rs"]), deps = [ "fbsource//third-party/rust:anyhow", + "fbsource//third-party/rust:async-stream", "fbsource//third-party/rust:async-trait", "fbsource//third-party/rust:base64", "fbsource//third-party/rust:bytes", diff --git a/eden/mononoke/edenapi_service/src/handlers/commit.rs b/eden/mononoke/edenapi_service/src/handlers/commit.rs index f5e6237f9a593..0c3679e5ddadd 100644 --- a/eden/mononoke/edenapi_service/src/handlers/commit.rs +++ b/eden/mononoke/edenapi_service/src/handlers/commit.rs @@ -12,6 +12,7 @@ use std::time::Duration; use anyhow::anyhow; use anyhow::Context; use anyhow::Error; +use async_stream::try_stream; use async_trait::async_trait; use blobstore::Loadable; use edenapi_types::wire::WireCommitHashToLocationRequestBatch; @@ -701,30 +702,31 @@ impl EdenApiHandler for GraphSegmentsHandler { .map(|hg_id| HgChangesetId::new(HgNodeHash::from(hg_id))) .collect(); - let graph_segment_entries = - repo.graph_segments(common, heads) - .await? - .into_iter() - .map(|segment| { - Ok(CommitGraphSegmentsEntry { - head: HgId::from(segment.head.into_nodehash()), - base: HgId::from(segment.base.into_nodehash()), - length: segment.length, - parents: segment - .parents - .into_iter() - .map(|parent| CommitGraphSegmentParent { - hgid: HgId::from(parent.hgid.into_nodehash()), - location: parent.location.map(|location| { - location.map_descendant(|descendant| { - HgId::from(descendant.into_nodehash()) - }) - }), - }) - .collect(), - }) - }); - Ok(stream::iter(graph_segment_entries).boxed()) + Ok(try_stream! { + let graph_segments = repo.graph_segments(common, heads).await?; + + for await segment in graph_segments { + let segment = segment?; + yield CommitGraphSegmentsEntry { + head: HgId::from(segment.head.into_nodehash()), + base: HgId::from(segment.base.into_nodehash()), + length: segment.length, + parents: segment + .parents + .into_iter() + .map(|parent| CommitGraphSegmentParent { + hgid: HgId::from(parent.hgid.into_nodehash()), + location: parent.location.map(|location| { + location.map_descendant(|descendant| { + HgId::from(descendant.into_nodehash()) + }) + }), + }) + .collect(), + } + } + } + .boxed()) } } diff --git a/eden/mononoke/mononoke_api_hg/src/repo.rs b/eden/mononoke/mononoke_api_hg/src/repo.rs index 3b20c06d50ff6..4c411eda13d19 100644 --- a/eden/mononoke/mononoke_api_hg/src/repo.rs +++ b/eden/mononoke/mononoke_api_hg/src/repo.rs @@ -838,7 +838,8 @@ impl HgRepoContext { &self, common: Vec, heads: Vec, - ) -> Result, MononokeError> { + ) -> Result> + '_, MononokeError> + { let bonsai_common = self.convert_changeset_ids(common).await?; let bonsai_heads = self.convert_changeset_ids(heads).await?; @@ -849,77 +850,67 @@ impl HgRepoContext { .ancestors_difference_segments(self.ctx(), bonsai_heads, bonsai_common) .await?; - let bonsai_hg_mapping = stream::iter(segments.clone()) - .flat_map(|segment| { - stream::iter([segment.head, segment.base]) - .chain(stream::iter(segment.parents).map(|parent| parent.cs_id)) - }) - .chunks(100) - .then(move |chunk| async move { - let mapping = self + Ok(stream::iter(segments.into_iter()) + .chunks(25) + .map(move |segments| async move { + let mut ids = HashSet::with_capacity(segments.len() * 4); + for segment in segments.iter() { + ids.insert(segment.head); + ids.insert(segment.base); + for parent in segment.parents.iter() { + ids.insert(parent.cs_id); + if let Some(location) = &parent.location { + ids.insert(location.head); + } + } + } + let mapping: HashMap = self .blob_repo() - .get_hg_bonsai_mapping(self.ctx().clone(), chunk.to_vec()) + .get_hg_bonsai_mapping(self.ctx().clone(), ids.into_iter().collect::>()) .await - .context("error fetching hg bonsai mapping")?; - Ok::<_, Error>(mapping) - }) - .try_collect::>>() - .await? - .into_iter() - .flatten() - .map(|(hgid, csid)| (csid, hgid)) - .collect::>(); - - segments - .into_iter() - .map(|segment| { - Ok(HgChangesetSegment { - head: *bonsai_hg_mapping.get(&segment.head).ok_or_else(|| { - MononokeError::InvalidRequest(format!( - "failed to find hg equivalent for segment head {}", - segment.head - )) - })?, - base: *bonsai_hg_mapping.get(&segment.base).ok_or_else(|| { - MononokeError::InvalidRequest(format!( - "failed to find hg equivalent for segment base {}", - segment.base - )) - })?, - length: segment.length, - parents: segment - .parents - .into_iter() - .map(|parent| { - Ok(HgChangesetSegmentParent { - hgid: *bonsai_hg_mapping.get(&parent.cs_id).ok_or_else(|| { - MononokeError::InvalidRequest(format!( - "failed to find hg equivalent for segment parent {}", - parent - )) - })?, - location: parent - .location - .map(|location| { - Ok::<_, Error>(Location { - descendant: *bonsai_hg_mapping.get(&location.head).ok_or_else( - || { - MononokeError::InvalidRequest(format!( - "failed to find hg equivalent for location head {}", - location.head - )) - }, - )?, - distance: location.distance, + .context("error fetching hg bonsai mapping")? + .into_iter() + .map(|(hgid, csid)| (csid, hgid)) + .collect(); + let map_id = move |name, csid| { + mapping + .get(&csid) + .ok_or_else(|| { + MononokeError::InvalidRequest(format!( + "failed to find hg equivalent for {} {}", + name, csid, + )) + }) + .copied() + }; + anyhow::Ok(stream::iter(segments.into_iter().map(move |segment| { + Ok(HgChangesetSegment { + head: map_id("segment head", segment.head)?, + base: map_id("segment base", segment.base)?, + length: segment.length, + parents: segment + .parents + .into_iter() + .map(|parent| { + Ok(HgChangesetSegmentParent { + hgid: map_id("segment parent", parent.cs_id)?, + location: parent + .location + .map(|location| { + anyhow::Ok(Location::new( + map_id("location head", location.head)?, + location.distance, + )) }) - }) - .transpose()?, + .transpose()?, + }) }) - }) - .collect::>()?, - }) + .collect::>()?, + }) + }))) }) - .collect::>() + .buffered(10) + .try_flatten()) } /// Return a mapping of commits to their parents that are in the segment of