Skip to content

Commit

Permalink
modern sync: upload hg changeset
Browse files Browse the repository at this point in the history
Summary:
This is not a final version, this is simply to make sure all elements of the filenodes are conserved through edenapi. This is missing two thing:
1. Correct Parents expression (mentioned by Mark in D67034907)
2. Stream/ pipe through channel so we don't keep everything in memory

But hey, it's a god POC, putting up for a review in case something is terribly wrong, otherwise I can address 1 and 2 in separate diff (will decide after chat about the parents issue)

This also adds a condition so we don't try to add hg mutations if none are sent.

With this we can now make sure all relevant data has been sent to the server and we can start optimising :)

Reviewed By: clara-9

Differential Revision: D67123774

fbshipit-source-id: dd042bdb45d49e5c4987a974a6d81c75042c2633
  • Loading branch information
lmvasquezg authored and facebook-github-bot committed Dec 12, 2024
1 parent 11ab191 commit 88a2bb5
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 17 deletions.
2 changes: 1 addition & 1 deletion eden/mononoke/edenapi_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod errors;
mod handlers;
mod middleware;
mod scuba;
mod utils;
pub mod utils;

use std::path::Path;
use std::sync::atomic::AtomicBool;
Expand Down
1 change: 1 addition & 0 deletions eden/mononoke/modern_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
commit_graph = { version = "0.1.0", path = "../repo_attributes/commit_graph/commit_graph" }
context = { version = "0.1.0", path = "../server/context" }
edenapi_service = { version = "0.1.0", path = "../edenapi_service" }
executor_lib = { version = "0.1.0", path = "../cmdlib/sharding" }
facet = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
Expand Down
5 changes: 3 additions & 2 deletions eden/mononoke/modern_sync/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use anyhow::Result;
use async_trait::async_trait;
use mercurial_types::blobs::HgBlobChangeset;
use mercurial_types::HgFileNodeId;
use mercurial_types::HgManifestId;
use mononoke_types::ContentId;
Expand All @@ -18,9 +19,9 @@ pub mod edenapi;
pub trait ModernSyncSender {
async fn upload_content(&self, content_id: ContentId, _blob: FileContents) -> Result<()>;

#[allow(unused)]
async fn upload_trees(&self, trees: Vec<HgManifestId>) -> Result<()>;

#[allow(unused)]
async fn upload_filenodes(&self, filenodes: Vec<HgFileNodeId>) -> Result<()>;

async fn upload_hg_changeset(&self, hg_css: Vec<HgBlobChangeset>) -> Result<()>;
}
12 changes: 12 additions & 0 deletions eden/mononoke/modern_sync/src/sender/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use anyhow::Result;
use async_trait::async_trait;
use mercurial_types::blobs::HgBlobChangeset;
use mercurial_types::HgFileNodeId;
use mercurial_types::HgManifestId;
use mononoke_types::ContentId;
Expand Down Expand Up @@ -47,4 +48,15 @@ impl ModernSyncSender for DummySender {
}
Ok(())
}

async fn upload_hg_changeset(&self, hg_css: Vec<HgBlobChangeset>) -> Result<()> {
for hg_cs in hg_css {
info!(
&self.logger,
"Uploading hg changeset with id {}",
hg_cs.get_changeset_id()
);
}
Ok(())
}
}
53 changes: 53 additions & 0 deletions eden/mononoke/modern_sync/src/sender/edenapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ use edenapi::HttpClientConfig;
use edenapi::SaplingRemoteApi;
use edenapi_types::AnyFileContentId;
use edenapi_types::AnyId;
use edenapi_types::Extra;
use edenapi_types::HgChangesetContent;
use edenapi_types::HgFilenodeData;
use edenapi_types::Parents;
use edenapi_types::RepoPathBuf;
use edenapi_types::UploadHgChangeset;
use edenapi_types::UploadToken;
use edenapi_types::UploadTreeEntry;
use futures::stream;
use futures::StreamExt;
use futures::TryStreamExt;
use mercurial_types::blobs::HgBlobChangeset;
use mercurial_types::fetch_manifest_envelope;
use mercurial_types::HgFileNodeId;
use mercurial_types::HgManifestId;
Expand Down Expand Up @@ -161,6 +166,21 @@ impl ModernSyncSender for EdenapiSender {
);
Ok(())
}

async fn upload_hg_changeset(&self, hg_css: Vec<HgBlobChangeset>) -> Result<()> {
let entries = stream::iter(hg_css)
.map(to_upload_hg_changeset)
.try_collect::<Vec<_>>()
.await?;

let res = self.client.upload_changesets(entries, vec![]).await?;
info!(
&self.logger,
"Upload hg changeset response: {:?}",
res.entries.try_collect::<Vec<_>>().await?
);
Ok(())
}
}

pub async fn from_tree_to_entry(
Expand Down Expand Up @@ -207,3 +227,36 @@ pub async fn from_id_to_filenode(
file_content_upload_token: token,
})
}

pub fn to_upload_hg_changeset(hg_cs: HgBlobChangeset) -> Result<UploadHgChangeset> {
let extra = hg_cs
.extra()
.iter()
.map(|(k, v)| Extra {
key: k.to_vec(),
value: v.to_vec(),
})
.collect();

let hg_files: Result<Vec<RepoPathBuf>> = hg_cs
.files()
.iter()
.map(edenapi_service::utils::to_hg_path)
.collect();

let hg_content = HgChangesetContent {
parents: hg_cs.parents().into(),
manifestid: hg_cs.manifestid().into_nodehash().into(),
user: hg_cs.user().to_vec(),
time: hg_cs.time().timestamp_secs(),
tz: hg_cs.time().tz_offset_secs(),
extras: extra,
files: hg_files?,
message: hg_cs.message().to_vec(),
};

Ok(UploadHgChangeset {
node_id: hg_cs.get_changeset_id().into_nodehash().into(),
changeset_content: hg_content,
})
}
5 changes: 3 additions & 2 deletions eden/mononoke/modern_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,12 @@ pub async fn process_one_changeset(
let hg_cs = hg_cs_id.load(ctx, repo.repo_blobstore()).await?;
let hg_mf_id = hg_cs.manifestid();

let (mf_ids, file_ids) =
let (mut mf_ids, file_ids) =
sort_manifest_changes(ctx, repo.repo_blobstore(), hg_mf_id, mf_ids_p).await?;

mf_ids.push(hg_mf_id);
sender.upload_trees(mf_ids).await?;
sender.upload_filenodes(file_ids).await?;
sender.upload_hg_changeset(vec![hg_cs]).await?;

if log_completion {
STATS::synced_commits.add_value(1, (repo.repo_identity().name().to_string(),));
Expand Down
13 changes: 8 additions & 5 deletions eden/mononoke/mononoke_api_hg/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,14 @@ impl<R: MononokeRepo> HgRepoContext<R> {
results.push(result);
}
log_new_commits(self.ctx(), self.repo_ctx().repo(), None, commits_to_log).await;
self.repo()
.hg_mutation_store()
.add_entries(self.ctx(), hg_changesets, mutations)
.await
.map_err(MononokeError::from)?;

if !mutations.is_empty() {
self.repo()
.hg_mutation_store()
.add_entries(self.ctx(), hg_changesets, mutations)
.await
.map_err(MononokeError::from)?;
}

Ok(results)
}
Expand Down
Loading

0 comments on commit 88a2bb5

Please sign in to comment.