Skip to content

Commit

Permalink
edenapi: translate segment ids in a stream
Browse files Browse the repository at this point in the history
Summary:
Currently we do all the work of building the segment graph and translating the segments into Mercurial hashes up-front on the server before sending the whole set of segments backs to the client at once.  This makes it appear to the client that the server has hung, until suddenly they receive the entire response.

We can't currently build the whole graph in a streaming fashion, as early segments may rely on much later segments in order to determine their parent's locations (consider, for example, a merge commit near the head of a large repo that merges in a small disjoint subrepo - we won't be able to yield that merge commit's segment until we get round to processing the small disjoint subrepo, which will be much later on).

However, we can at least stream at the point we start translating commit ids from Bonsai to Mercurial.

A more general solution would be to change the streaming API to allow parent locations to be deferred and generated later on (when the parent segment is generated), however this would involve larger changes, and is more effort than necessary for just changing how streaming works.  If the server-side memory requirements for buffering make it necessary to do this, we can implement a `graphsegments2` method that works this way.

Differential Revision: D51749060

fbshipit-source-id: 9f7af439fe121f259cd52dd145a2b3767ceaf8b3
  • Loading branch information
markbt authored and facebook-github-bot committed Dec 6, 2023
1 parent 325dd13 commit b44f679
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 91 deletions.
1 change: 1 addition & 0 deletions eden/mononoke/edenapi_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "GPLv2+"

[dependencies]
anyhow = "1.0.75"
async-stream = "0.3"
async-trait = "0.1.71"
base64 = "0.13"
blobstore = { version = "0.1.0", path = "../blobstore" }
Expand Down
1 change: 1 addition & 0 deletions eden/mononoke/edenapi_service/TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rust_library(
srcs = glob(["src/**/*.rs"]),
deps = [
"fbsource//third-party/rust:anyhow",
"fbsource//third-party/rust:async-stream",
"fbsource//third-party/rust:async-trait",
"fbsource//third-party/rust:base64",
"fbsource//third-party/rust:bytes",
Expand Down
50 changes: 26 additions & 24 deletions eden/mononoke/edenapi_service/src/handlers/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::time::Duration;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use async_stream::try_stream;
use async_trait::async_trait;
use blobstore::Loadable;
use edenapi_types::wire::WireCommitHashToLocationRequestBatch;
Expand Down Expand Up @@ -701,30 +702,31 @@ impl EdenApiHandler for GraphSegmentsHandler {
.map(|hg_id| HgChangesetId::new(HgNodeHash::from(hg_id)))
.collect();

let graph_segment_entries =
repo.graph_segments(common, heads)
.await?
.into_iter()
.map(|segment| {
Ok(CommitGraphSegmentsEntry {
head: HgId::from(segment.head.into_nodehash()),
base: HgId::from(segment.base.into_nodehash()),
length: segment.length,
parents: segment
.parents
.into_iter()
.map(|parent| CommitGraphSegmentParent {
hgid: HgId::from(parent.hgid.into_nodehash()),
location: parent.location.map(|location| {
location.map_descendant(|descendant| {
HgId::from(descendant.into_nodehash())
})
}),
})
.collect(),
})
});
Ok(stream::iter(graph_segment_entries).boxed())
Ok(try_stream! {
let graph_segments = repo.graph_segments(common, heads).await?;

for await segment in graph_segments {
let segment = segment?;
yield CommitGraphSegmentsEntry {
head: HgId::from(segment.head.into_nodehash()),
base: HgId::from(segment.base.into_nodehash()),
length: segment.length,
parents: segment
.parents
.into_iter()
.map(|parent| CommitGraphSegmentParent {
hgid: HgId::from(parent.hgid.into_nodehash()),
location: parent.location.map(|location| {
location.map_descendant(|descendant| {
HgId::from(descendant.into_nodehash())
})
}),
})
.collect(),
}
}
}
.boxed())
}
}

Expand Down
125 changes: 58 additions & 67 deletions eden/mononoke/mononoke_api_hg/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ impl HgRepoContext {
&self,
common: Vec<HgChangesetId>,
heads: Vec<HgChangesetId>,
) -> Result<Vec<HgChangesetSegment>, MononokeError> {
) -> Result<impl Stream<Item = Result<HgChangesetSegment, MononokeError>> + '_, MononokeError>
{
let bonsai_common = self.convert_changeset_ids(common).await?;
let bonsai_heads = self.convert_changeset_ids(heads).await?;

Expand All @@ -849,77 +850,67 @@ impl HgRepoContext {
.ancestors_difference_segments(self.ctx(), bonsai_heads, bonsai_common)
.await?;

let bonsai_hg_mapping = stream::iter(segments.clone())
.flat_map(|segment| {
stream::iter([segment.head, segment.base])
.chain(stream::iter(segment.parents).map(|parent| parent.cs_id))
})
.chunks(100)
.then(move |chunk| async move {
let mapping = self
Ok(stream::iter(segments.into_iter())
.chunks(25)
.map(move |segments| async move {
let mut ids = HashSet::with_capacity(segments.len() * 4);
for segment in segments.iter() {
ids.insert(segment.head);
ids.insert(segment.base);
for parent in segment.parents.iter() {
ids.insert(parent.cs_id);
if let Some(location) = &parent.location {
ids.insert(location.head);
}
}
}
let mapping: HashMap<ChangesetId, HgChangesetId> = self
.blob_repo()
.get_hg_bonsai_mapping(self.ctx().clone(), chunk.to_vec())
.get_hg_bonsai_mapping(self.ctx().clone(), ids.into_iter().collect::<Vec<_>>())
.await
.context("error fetching hg bonsai mapping")?;
Ok::<_, Error>(mapping)
})
.try_collect::<Vec<Vec<(HgChangesetId, ChangesetId)>>>()
.await?
.into_iter()
.flatten()
.map(|(hgid, csid)| (csid, hgid))
.collect::<HashMap<_, _>>();

segments
.into_iter()
.map(|segment| {
Ok(HgChangesetSegment {
head: *bonsai_hg_mapping.get(&segment.head).ok_or_else(|| {
MononokeError::InvalidRequest(format!(
"failed to find hg equivalent for segment head {}",
segment.head
))
})?,
base: *bonsai_hg_mapping.get(&segment.base).ok_or_else(|| {
MononokeError::InvalidRequest(format!(
"failed to find hg equivalent for segment base {}",
segment.base
))
})?,
length: segment.length,
parents: segment
.parents
.into_iter()
.map(|parent| {
Ok(HgChangesetSegmentParent {
hgid: *bonsai_hg_mapping.get(&parent.cs_id).ok_or_else(|| {
MononokeError::InvalidRequest(format!(
"failed to find hg equivalent for segment parent {}",
parent
))
})?,
location: parent
.location
.map(|location| {
Ok::<_, Error>(Location {
descendant: *bonsai_hg_mapping.get(&location.head).ok_or_else(
|| {
MononokeError::InvalidRequest(format!(
"failed to find hg equivalent for location head {}",
location.head
))
},
)?,
distance: location.distance,
.context("error fetching hg bonsai mapping")?
.into_iter()
.map(|(hgid, csid)| (csid, hgid))
.collect();
let map_id = move |name, csid| {
mapping
.get(&csid)
.ok_or_else(|| {
MononokeError::InvalidRequest(format!(
"failed to find hg equivalent for {} {}",
name, csid,
))
})
.copied()
};
anyhow::Ok(stream::iter(segments.into_iter().map(move |segment| {
Ok(HgChangesetSegment {
head: map_id("segment head", segment.head)?,
base: map_id("segment base", segment.base)?,
length: segment.length,
parents: segment
.parents
.into_iter()
.map(|parent| {
Ok(HgChangesetSegmentParent {
hgid: map_id("segment parent", parent.cs_id)?,
location: parent
.location
.map(|location| {
anyhow::Ok(Location::new(
map_id("location head", location.head)?,
location.distance,
))
})
})
.transpose()?,
.transpose()?,
})
})
})
.collect::<Result<_, MononokeError>>()?,
})
.collect::<Result<_, MononokeError>>()?,
})
})))
})
.collect::<Result<_, MononokeError>>()
.buffered(10)
.try_flatten())
}

/// Return a mapping of commits to their parents that are in the segment of
Expand Down

0 comments on commit b44f679

Please sign in to comment.