Skip to content

Commit

Permalink
use delta generation mechanism from git
Browse files Browse the repository at this point in the history
Summary: In this diff I switch our code to use git's method of generating deltas.

Reviewed By: RajivTS

Differential Revision: D54847076

fbshipit-source-id: a27b5a89329401054832ececffb1088a23a92a60
  • Loading branch information
mitrandir77 authored and facebook-github-bot committed Mar 20, 2024
1 parent 8328964 commit 975cc68
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 25 deletions.
1 change: 1 addition & 0 deletions eden/mononoke/git/git_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ filestore = { version = "0.1.0", path = "../../filestore" }
flate2 = { version = "1.0.26", features = ["rust_backend"], default-features = false }
futures = { version = "0.3.28", features = ["async-await", "compat"] }
futures-util = "0.3.7"
git_delta = { version = "0.1.0", path = "../../third_party/git_delta" }
git_types_thrift = { version = "0.1.0", path = "if" }
gix-actor = "0.24"
gix-diff = "0.33"
Expand Down
1 change: 1 addition & 0 deletions eden/mononoke/git/git_types/TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ rust_library(
"//eden/mononoke/mononoke_types:mononoke_types",
"//eden/mononoke/mononoke_types/serialization:mononoke_types_serialization-rust",
"//eden/mononoke/server/context:context",
"//eden/mononoke/third_party/git_delta:git_delta",
"//thrift/lib/rust:fbthrift",
],
)
125 changes: 103 additions & 22 deletions eden/mononoke/git/git_types/src/derive_delta_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert;
use std::io::Write;
use std::str::from_utf8;
use std::sync::Arc;

Expand All @@ -32,10 +33,13 @@ use derived_data_manager::BonsaiDerivable;
use derived_data_manager::DerivableType;
use derived_data_manager::DerivationContext;
use derived_data_service_if as thrift;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use futures_util::future::try_join_all;
use futures_util::stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use git_delta::git_delta;
use gix_diff::blob::Algorithm;
use gix_hash::ObjectId;
use manifest::ManifestOps;
Expand All @@ -46,7 +50,6 @@ use multimap::MultiMap;
use unodes::RootUnodeManifestId;

use crate::delta::DeltaInstructionChunkIdPrefix;
use crate::delta::DeltaInstructions;
use crate::delta_manifest::GitDeltaManifest;
use crate::delta_manifest::GitDeltaManifestEntry;
use crate::delta_manifest::GitDeltaManifestId;
Expand All @@ -55,7 +58,9 @@ use crate::delta_manifest::ObjectEntry;
use crate::fetch_git_object_bytes;
use crate::mode;
use crate::store::store_delta_instructions;
use crate::store::store_raw_delta;
use crate::store::HeaderState;
use crate::DeltaInstructions;
use crate::DeltaObjectKind;
use crate::MappedGitCommitId;
use crate::TreeHandle;
Expand All @@ -71,6 +76,14 @@ const DELTA_THRESHOLD: u64 = 262_144_000; // 250 MB
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
pub struct RootGitDeltaManifestId(GitDeltaManifestId);

#[derive(Debug, Copy, Clone)]
pub enum DeltaCreationMethod {
#[allow(dead_code)]
Internal,
#[allow(dead_code)]
Git,
}

impl RootGitDeltaManifestId {
pub fn new(id: GitDeltaManifestId) -> Self {
Self(id)
Expand Down Expand Up @@ -135,6 +148,7 @@ async fn metadata_to_manifest_entry(
metadata: DeltaEntryMetadata,
blobstore: Arc<dyn Blobstore>,
ctx: &CoreContext,
delta_creation_method: DeltaCreationMethod,
) -> Result<GitDeltaManifestEntry> {
let full_object_entry = tree_member_to_object_entry(&metadata.actual, path.clone())
.with_context(|| {
Expand All @@ -157,36 +171,103 @@ async fn metadata_to_manifest_entry(
)
})?;
let origin = delta_metadata.origin;
let actual_object = fetch_git_object_bytes(&ctx, blobstore.clone(),metadata.actual.oid(), HeaderState::Excluded).await?;
let base_object = fetch_git_object_bytes(&ctx, blobstore.clone(), delta_metadata.object.oid(), HeaderState::Excluded).await?;
let actual_object = fetch_git_object_bytes(
&ctx,
blobstore.clone(),
metadata.actual.oid(),
HeaderState::Excluded,
)
.await?;
let base_object = fetch_git_object_bytes(
&ctx,
blobstore.clone(),
delta_metadata.object.oid(),
HeaderState::Excluded,
)
.await?;
// Objects are only valid for deltas when they are trees OR UTF-8 encoded blobs
let actual_object_valid = full_object_entry.kind == DeltaObjectKind::Tree || from_utf8(&actual_object).is_ok();
let base_object_valid = base.kind == DeltaObjectKind::Tree || from_utf8(&base_object).is_ok();
let actual_object_valid = full_object_entry.kind == DeltaObjectKind::Tree
|| from_utf8(&actual_object).is_ok();
let base_object_valid =
base.kind == DeltaObjectKind::Tree || from_utf8(&base_object).is_ok();
// Only generate delta when both the base and the target object are valid
if actual_object_valid && base_object_valid {
let instructions = DeltaInstructions::generate(
base_object,actual_object,Algorithm::Myers,
)
.with_context(|| {
format!(
"Error while computing delta between base object {:?} and actual object {:?}",
base.oid, full_object_entry.oid
// Let's not delta against empty objects
if actual_object.is_empty() || base_object.is_empty() {
return anyhow::Ok(None);
}
let stored_instructions_metadata = match delta_creation_method {
DeltaCreationMethod::Internal => {
let instructions = DeltaInstructions::generate(
base_object,actual_object,Algorithm::Myers,
)
.with_context(|| {
format!(
"Error while computing delta between base object {:?} and actual object {:?}",
base.oid, full_object_entry.oid
)
})?;
// The base path and actual path are the same for now but can vary in the future when we support
// files copied from one location to the other
let chunk_prefix =
DeltaInstructionChunkIdPrefix::new(commit, path.clone(), origin, path.clone());
let chunk_size = Some(CHUNK_SIZE);
store_delta_instructions(&ctx, &blobstore, instructions, chunk_prefix, chunk_size)
.await
.with_context(|| {
format!(
"Error while storing delta instructions for path {} in commit {}",
path, commit
)
})?
}
DeltaCreationMethod::Git => {
// zlib compress actual object to see how big of a delta makes sense
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&actual_object)
.context("Failure in writing raw delta instruction bytes to ZLib buffer")?;
let actual_object_compressed_len = encoder
.finish()
.context("Failure in ZLib encoding delta instruction bytes")?
.len();

let raw_delta = if let core::result::Result::Ok(raw_delta) =
git_delta(&base_object, &actual_object, actual_object_compressed_len)
{
raw_delta
} else {
// if the delta is larger than max_delta above will fail and we'll fail back to
// serving the full object
return anyhow::Ok(None);
};
let chunk_prefix = DeltaInstructionChunkIdPrefix::new(
commit,
path.clone(),
origin,
path.clone(),
);
let chunk_size = Some(CHUNK_SIZE);
store_raw_delta(
&ctx,
&blobstore,
raw_delta,
chunk_prefix,
chunk_size,
)
})?;
// The base path and actual path are the same for now but can vary in the future when we support
// files copied from one location to the other
let chunk_prefix =
DeltaInstructionChunkIdPrefix::new(commit, path.clone(), origin, path.clone());
let chunk_size = Some(CHUNK_SIZE);
let stored_instructions_metadata = store_delta_instructions(&ctx, &blobstore, instructions, chunk_prefix, chunk_size)
.await
.with_context(|| {
format!(
"Error while storing delta instructions for path {} in commit {}",
path, commit
)
})?;
anyhow::Ok(Some(ObjectDelta::new(origin, base, stored_instructions_metadata)))
})?
}};
anyhow::Ok(Some(ObjectDelta::new(
origin,
base,
stored_instructions_metadata,
)))
} else {
anyhow::Ok(None)
}
Expand Down Expand Up @@ -457,7 +538,7 @@ async fn derive_git_delta_manifest(
entry.deltas = deltas_with_correct_origin;
}
// Use the metadata of the delta entry to construct GitDeltaManifestEntry
let manifest_entry = metadata_to_manifest_entry(&commit, path.clone(), entry, blobstore, ctx)
let manifest_entry = metadata_to_manifest_entry(&commit, path.clone(), entry, blobstore, ctx, DeltaCreationMethod::Internal)
.await.with_context(|| format!("Error in generating git delta manifest entry for path {}", path))?;
anyhow::Ok((path, manifest_entry))
}
Expand Down
28 changes: 25 additions & 3 deletions eden/mononoke/git/git_types/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ pub async fn fetch_git_object_bytes(

/// Free function for fetching stored git objects. Applies to all git
/// objects.
#[allow(dead_code)]
pub async fn fetch_git_object(
ctx: &CoreContext,
blobstore: Arc<dyn Blobstore>,
Expand Down Expand Up @@ -306,7 +305,6 @@ pub struct StoredInstructionsMetadata {
/// the written delta instructions stored as chunks in the blobstore. This method can partially fail
/// and store a subset of the chunks. However, it is perfectly safe to retry until all the chunks are stored
/// successfully
#[allow(dead_code)]
pub async fn store_delta_instructions<B>(
ctx: &CoreContext,
blobstore: &B,
Expand All @@ -322,6 +320,31 @@ where
.write(&mut raw_instruction_bytes)
.await
.context("Error in converting DeltaInstructions to raw bytes")?;
store_raw_delta(
ctx,
blobstore,
raw_instruction_bytes,
chunk_prefix,
chunk_size,
)
.await
}

/// Store raw git delta in blobstore by chunking the incoming byte stream and returning the metadata
/// of the written delta instructions stored as chunks in the blobstore. This method can partially
/// fail and store a subset of the chunks. However, it is perfectly safe to retry until all the
/// chunks are stored successfully
pub async fn store_raw_delta<B>(
ctx: &CoreContext,
blobstore: &B,
delta: Vec<u8>,
chunk_prefix: DeltaInstructionChunkIdPrefix,
chunk_size: Option<u64>,
) -> anyhow::Result<StoredInstructionsMetadata>
where
B: Blobstore + Clone,
{
let raw_instruction_bytes = delta;
let uncompressed_bytes = raw_instruction_bytes.len() as u64;
// Zlib encode the instructions before writing to the store
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
Expand Down Expand Up @@ -384,7 +407,6 @@ where

/// Fetch all the delta instruction chunks corresponding to the given prefix and return the result
/// as a boxed stream of bytes in order
#[allow(dead_code)]
pub fn fetch_delta_instructions<'a, B>(
ctx: &'a CoreContext,
blobstore: &'a B,
Expand Down

0 comments on commit 975cc68

Please sign in to comment.