Skip to content

Commit

Permalink
Implement support for in-pack Ref Deltas during Mononoke Git push
Browse files Browse the repository at this point in the history
Summary:
As highlighted in the previous diff, we currently do not support this. Everytime the pack parser encounters a ref-delta, it assumes that the base of the delta is an existing object that is present on the server. In 99% of the cases, this is true. However, in certain scenarios, Git can push a packfile containing ref deltas where the base of the delta is present in the same packfile.

The strategy to support such deltas involves making the pack parser iterative instead of doing a single pass over the packfile and reporting the result. The steps are as follows:
1. Identify all ref deltas in the pack and collect the ID of all the base objects
2. Fetch all the bases objects from Mononoke, if they are already present in our storage
3. Try to unpack the pack file and use the fetched base objects for resolving deltas in case we encounter any
4. If we encounter a ref delta which points to a base object that is unknown to us, keep track of it and continue instead of erroring out
5. At the end of the iteration, collect all the unpacked objects and add them to the set of base objects already known to us
6. Using the updated set of base objects, continue again from step 3

Any valid packfile will eventually get parsed and unpacked using this strategy. However, we could end up in an infinite loop in case of an invalid packfile with ref delta refering to an invalid base object with the above steps. To avoid this, we have added an upper limit to the max number of iterations.

Differential Revision: D68025406

fbshipit-source-id: 1aa406fe9033490e985c042b3fe4c6c82dde1ae8
  • Loading branch information
RajivTS authored and facebook-github-bot committed Jan 13, 2025
1 parent 26e54c4 commit 0024131
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 63 deletions.
6 changes: 5 additions & 1 deletion eden/mononoke/git/git_types/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ impl ObjectKind {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct ObjectContent {
pub parsed: Object,
pub raw: Bytes,
}

impl ObjectContent {
pub fn new(parsed: Object, raw: Bytes) -> Self {
Self { parsed, raw }
}

pub fn is_tree(&self) -> bool {
self.parsed.as_tree().is_some()
}
Expand Down
202 changes: 147 additions & 55 deletions eden/mononoke/git/protocol/src/pack_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ use futures::stream;
use futures::StreamExt;
use futures::TryStreamExt;
use futures_stats::TimedTryFutureExt;
use git_types::fetch_git_object_bytes;
use git_types::fetch_git_object;
use git_types::GitIdentifier;
use git_types::HeaderState;
use git_types::ObjectContent;
use gix_features::progress::Discard;
use gix_hash::Kind;
use gix_hash::ObjectId;
use gix_object::encode::loose_header;
use gix_object::ObjectRef;
use gix_object::WriteTo;
use gix_pack::cache::Never;
use gix_pack::data;
use gix_pack::data::decode::entry::ResolvedBase;
use gix_pack::data::input;
use gix_pack::data::decode::Error as PackError;
use gix_pack::data::input::Entry as InputEntry;
use gix_pack::data::input::Error as InputError;
use gix_pack::data::File;
use mononoke_types::hash::GitSha1;
use packfile::types::BaseObject;
Expand All @@ -46,9 +46,72 @@ use tempfile::Builder;

use crate::PACKFILE_SUFFIX;

type ObjectMap = HashMap<ObjectId, (Bytes, gix_object::Kind)>;
const MAX_ALLOWED_DEPTH: u8 = 30;
type ObjectMap = HashMap<ObjectId, ObjectContent>;

fn into_data_entry(pack_entry: input::Entry) -> data::Entry {
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
enum PackEntry {
Pending(InputEntry),
Processed((ObjectId, ObjectContent)),
}

#[derive(Debug, Clone, Default)]
struct PackEntries {
entries: HashSet<PackEntry>,
}

impl PackEntries {
fn from_pending_and_processed(pending: Vec<InputEntry>, processed: ObjectMap) -> Self {
let mut entries = HashSet::new();
for entry in pending {
entries.insert(PackEntry::Pending(entry));
}
for (id, object_content) in processed {
entries.insert(PackEntry::Processed((id, object_content)));
}
Self { entries }
}

fn from_entries(entries: HashSet<PackEntry>) -> Self {
Self { entries }
}

fn into_pending_and_processed(self) -> (Vec<InputEntry>, ObjectMap) {
let mut pending = Vec::new();
let mut processed = HashMap::new();
for entry in self.entries {
match entry {
PackEntry::Pending(entry) => pending.push(entry),
PackEntry::Processed((id, content)) => {
processed.insert(id, content);
}
}
}
(pending, processed)
}

fn into_processed(self) -> Result<FxHashMap<ObjectId, ObjectContent>> {
let mut object_map = FxHashMap::default();
for entry in self.entries {
match entry {
PackEntry::Processed((id, content)) => {
object_map.insert(id, content);
}
_ => anyhow::bail!("Packfile entries are not completely processed"),
}
}
Ok(object_map)
}

fn is_processed(&self) -> bool {
self.entries.iter().all(|entry| match entry {
PackEntry::Processed(_) => true,
_ => false,
})
}
}

fn into_data_entry(pack_entry: InputEntry) -> data::Entry {
data::Entry {
header: pack_entry.header,
decompressed_size: pack_entry.decompressed_size,
Expand All @@ -72,10 +135,10 @@ fn resolve_delta(
out: &mut Vec<u8>,
known_objects: &ObjectMap,
) -> Option<ResolvedBase> {
known_objects.get(oid).map(|(bytes, kind)| {
out.extend_from_slice(bytes);
known_objects.get(oid).map(|object_content| {
object_content.parsed.write_to(out.by_ref()).unwrap();
ResolvedBase::OutOfPack {
kind: kind.clone(),
kind: object_content.parsed.kind().clone(),
end: out.len(),
}
})
Expand Down Expand Up @@ -104,17 +167,15 @@ async fn fetch_prereq_objects(
async move {
let git_identifier =
GitIdentifier::Basic(GitSha1::from_object_id(object_id.as_ref())?);
let fallible_git_bytes =
fetch_git_object_bytes(&ctx, blobstore, &git_identifier, HeaderState::Included)
.await;
match fallible_git_bytes {
Ok(git_bytes) => {
let object = ObjectRef::from_loose(&git_bytes)
.context("Failure in converting bytes into git object")?;
let kind = object.kind();
let mut git_bytes = Vec::new();
let fallible_git_object = fetch_git_object(&ctx, blobstore, &git_identifier).await;
match fallible_git_object {
Ok(object) => {
let mut git_bytes = object.loose_header().into_vec();
object.write_to(git_bytes.by_ref())?;
anyhow::Ok(Some((object_id, (Bytes::from(git_bytes), kind))))
anyhow::Ok(Some((
object_id,
ObjectContent::new(object, Bytes::from(git_bytes)),
)))
}
// The object might not be present in the data store since its an inpack object
_ => anyhow::Ok(None),
Expand Down Expand Up @@ -148,6 +209,50 @@ pub async fn parse_pack(
response
}

fn process_pack_entries(pack_file: &data::File, entries: PackEntries) -> Result<PackEntries> {
let (pending_entries, prereq_objects) = entries.into_pending_and_processed();
let output_entries = pending_entries
.into_iter()
.map(|entry| {
let mut output = vec![];
let err_context = format!("Error in decoding packfile entry: {:?}", &entry.header);
let outcome = pack_file.decode_entry(
into_data_entry(entry.clone()),
&mut output,
&mut gix_features::zlib::Inflate::default(),
&|oid, out| resolve_delta(oid, out, &prereq_objects),
&mut Never,
);
match outcome {
Ok(outcome) => {
let object_bytes = Bytes::from(git_object_bytes(
output,
outcome.kind,
outcome.object_size as usize,
));
let base_object = BaseObject::new(object_bytes.clone())
.context("Error in converting bytes to git object")?;
let id = base_object.hash;
let object = ObjectContent::new(base_object.object, object_bytes);
let processed_entry = PackEntry::Processed((id, object));
anyhow::Ok(processed_entry)
}
Err(e) => match e {
PackError::DeltaBaseUnresolved(_) => anyhow::Ok(PackEntry::Pending(entry)),
_ => Err(e).context(err_context),
},
}
})
.collect::<Result<HashSet<PackEntry>>>()
.context("Failure in decoding packfile entries")?;
let output_entries = prereq_objects
.into_iter()
.map(|(id, object_content)| PackEntry::Processed((id, object_content)))
.chain(output_entries)
.collect();
Ok(PackEntries::from_entries(output_entries))
}

async fn parse_stored_pack(
pack_path: &Path,
ctx: &CoreContext,
Expand Down Expand Up @@ -177,45 +282,32 @@ async fn parse_stored_pack(
.try_timed()
.await?
.log_future_stats(ctx.scuba().clone(), "Fetched Prerequisite Objects", None);
// Fetch all the entries that need to be processed
let pending_entries = pack_file
.streaming_iter()
.context("Failure in iterating packfile")?
.collect::<Result<Vec<_>, InputError>>()?;

let parsed_objects = stream::iter(
pack_file
.streaming_iter()
.context("Failure in iterating packfile")?,
)
.map(move |fallible_entry| match fallible_entry {
Ok(entry) => {
let mut output = vec![];
let err_context = format!("Error in decoding packfile entry: {:?}", &entry.header);
let outcome = pack_file
.decode_entry(
into_data_entry(entry),
&mut output,
&mut gix_features::zlib::Inflate::default(),
&|oid, out| resolve_delta(oid, out, &prereq_objects),
&mut Never,
)
.context(err_context)?;
let object_bytes = Bytes::from(git_object_bytes(
output,
outcome.kind,
outcome.object_size as usize,
));
let base_object = BaseObject::new(object_bytes.clone())
.context("Error in converting bytes to git object")?;
let id = base_object.hash;
let object = ObjectContent {
parsed: base_object.object,
raw: object_bytes,
};
anyhow::Ok((id, object))
// Process all the entries
tokio::task::spawn_blocking({
let pack_file = pack_file.clone();
move || {
let mut pack_entries =
PackEntries::from_pending_and_processed(pending_entries, prereq_objects);
let mut counter = 0;
while !pack_entries.is_processed() {
if counter > MAX_ALLOWED_DEPTH {
anyhow::bail!(
"Maximum allowed depth reached while processing packfile entries"
);
}
counter += 1;
pack_entries = process_pack_entries(&pack_file, pack_entries)?;
}
pack_entries.into_processed()
}
Err(e) => anyhow::bail!("Failure in iterating packfile entry: {:?}", e),
})
.try_collect::<FxHashMap<_, _>>()
.try_timed()
.await?
.log_future_stats(ctx.scuba().clone(), "Decoded objects from Packfile", None);

Ok(parsed_objects)
.log_future_stats(ctx.scuba().clone(), "Decoded objects from Packfile", None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,36 @@
$ echo "AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1." > file1
$ git add .
$ git commit -qam "Add file1"
$ git tag -a -m "new tag" first_tag
# File1 should be large enough that Git create a delta for it
$ echo "AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile1.AddFile2." > file1
$ echo "This is file2" > file2.txt
$ git add .
$ git commit -qam "Modified file1, Added file2"
$ git tag -a empty_tag -m ""
# Get list of all objects for verification later
$ git rev-list --objects --all | git cat-file --batch-check='%(objectname) %(objecttype) %(rest)' | sort > $TESTTMP/object_list

Push Git repository to Mononoke
# Push Git repository to Mononoke
$ git pack-objects --all --stdout > packfile
$ last_commit=$(git rev-parse HEAD)
Create the capabilities string
# Create the capabilities string
$ capabilities="report-status quiet object-format=sha1"
$ printf "00980000000000000000000000000000000000000000 $last_commit refs/heads/master_bookmark\0 $capabilities" >> push_data
$ echo -n "0000" >> push_data
$ cat packfile >> push_data
Pipe the push data to CURL
# Pipe the push data to CURL
$ curl -X POST $MONONOKE_GIT_SERVICE_BASE_URL/$REPONAME.git/git-receive-pack -H 'Content-Type: application/x-git-receive-pack-request' -H 'Accept: application/x-git-receive-pack-result' -H 'Accept-Encoding: deflate, gzip, br' -k --cert "$TEST_CERTDIR/client0.crt" --key "$TEST_CERTDIR/client0.key" --data-binary "@push_data" -s -w "\nResponse code: %{http_code}\n"
Error in decoding packfile entry: RefDelta { base_id: Sha1(ccce194714ee3fae382fad2f4a67cd6c1a7320cd) }: A delta chain could not be followed as the ref base with id ccce194714ee3fae382fad2f4a67cd6c1a7320cd could not be found
Response code: 500
*unpack ok (glob)
*ok refs/heads/master_bookmark (glob)
* (glob)
Response code: 200

$ wait_for_git_bookmark_create refs/heads/master_bookmark

# Clone the repo from Mononoke and validate that the push worked
$ cd $TESTTMP
$ git_client clone -q $MONONOKE_GIT_SERVICE_BASE_URL/$REPONAME.git check_repo
$ cd check_repo
$ git rev-list --objects --all | git cat-file --batch-check='%(objectname) %(objecttype) %(rest)' | sort > $TESTTMP/new_object_list
$ diff -w $TESTTMP/new_object_list $TESTTMP/object_list


0 comments on commit 0024131

Please sign in to comment.