Skip to content

Commit

Permalink
Update input for packfiles and bundles
Browse files Browse the repository at this point in the history
Summary: Currently, `PackfileWriter` and `BundleWriter` take a stream of futures that resolve to `Result<PackfileItem>` as input, i.e. `impl Stream<Item = impl Future<Output = Result<PackfileItem>>>`. This diff changes that to be `impl Stream<Item = Result<PackfileItem>>` where the future is already resolved at the caller in whatever manner the caller deems appropriate.

Reviewed By: gustavoavena

Differential Revision: D49961711

fbshipit-source-id: 36b484e799dd716cffbb10d88bde38e4aebf7fac
  • Loading branch information
RajivTS authored and facebook-github-bot committed Oct 6, 2023
1 parent 0ad24f2 commit 61813bb
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 37 deletions.
2 changes: 1 addition & 1 deletion eden/mononoke/git/git_types/src/derive_delta_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl RootGitDeltaManifestId {
Self(id)
}

pub fn manifest_unode_id(&self) -> &GitDeltaManifestId {
pub fn manifest_id(&self) -> &GitDeltaManifestId {
&self.0
}
}
Expand Down
1 change: 1 addition & 0 deletions eden/mononoke/git/git_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use crate::blob::BlobHandle;
pub use crate::commit::MappedGitCommitId;
pub use crate::delta::DeltaInstructionChunkIdPrefix;
pub use crate::delta::DeltaInstructions;
pub use crate::delta_manifest::GitDeltaManifestEntry;
pub use crate::derive_delta_manifest::get_object_bytes;
pub use crate::derive_delta_manifest::HeaderState;
pub use crate::derive_delta_manifest::RootGitDeltaManifestId;
Expand Down
3 changes: 1 addition & 2 deletions eden/mononoke/git/packfile/src/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use std::fmt::Display;

use anyhow::Result;
use futures::Future;
use futures::Stream;
use gix_hash::ObjectId;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -95,7 +94,7 @@ impl<T: AsyncWrite + Unpin> BundleWriter<T> {
/// Write the stream of input items to the bundle
pub async fn write(
&mut self,
objects_stream: impl Stream<Item = impl Future<Output = Result<PackfileItem>>>,
objects_stream: impl Stream<Item = Result<PackfileItem>>,
) -> Result<()> {
self.pack_writer.write(objects_stream).await
}
Expand Down
7 changes: 2 additions & 5 deletions eden/mononoke/git/packfile/src/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::io::Write;

use anyhow::Context;
use anyhow::Result;
use futures::Future;
use futures::Stream;
use futures::StreamExt;
use gix_hash::ObjectId;
Expand Down Expand Up @@ -96,15 +95,13 @@ impl<T: AsyncWrite + Unpin> PackfileWriter<T> {
/// Write the stream of objects to the packfile
pub async fn write(
&mut self,
entries_stream: impl Stream<Item = impl Future<Output = Result<PackfileItem>>>,
entries_stream: impl Stream<Item = Result<PackfileItem>>,
) -> Result<()> {
// Write the packfile header if applicable
self.write_header().await?;
let mut entries_stream = Box::pin(entries_stream);
while let Some(entry) = entries_stream.next().await {
let entry = entry
.await
.context("Failure in getting packfile item entry")?;
let entry = entry.context("Failure in getting packfile item entry")?;
let mut entry: Entry = entry
.try_into()
.context("Failure in converting PackfileItem to Entry")?;
Expand Down
36 changes: 11 additions & 25 deletions eden/mononoke/git/packfile/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use flate2::write::ZlibDecoder;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use futures::stream;
use futures::Future;
use git_types::DeltaInstructions;
use gix_diff::blob::Algorithm;
use gix_hash::ObjectId;
Expand All @@ -34,8 +33,7 @@ use crate::types::PackfileItem;

async fn get_objects_stream(
with_delta: bool,
) -> anyhow::Result<impl stream::Stream<Item = impl Future<Output = anyhow::Result<PackfileItem>>>>
{
) -> anyhow::Result<impl stream::Stream<Item = anyhow::Result<PackfileItem>>> {
// Create a few Git objects
let tag_bytes = Bytes::from(to_vec_bytes(&gix_object::Object::Tag(Tag {
target: ObjectId::empty_tree(gix_hash::Kind::Sha1),
Expand All @@ -56,9 +54,9 @@ async fn get_objects_stream(
}],
}))?);
let mut pack_items = vec![
futures::future::ready(PackfileItem::new_base(tag_bytes.clone())),
futures::future::ready(PackfileItem::new_base(blob_bytes)),
futures::future::ready(PackfileItem::new_base(tree_bytes)),
PackfileItem::new_base(tag_bytes.clone()),
PackfileItem::new_base(blob_bytes),
PackfileItem::new_base(tree_bytes),
];
if with_delta {
let another_tag_bytes = Bytes::from(to_vec_bytes(&gix_object::Object::Tag(Tag {
Expand Down Expand Up @@ -87,7 +85,7 @@ async fn get_objects_stream(
decompressed_size,
compressed_instruction_bytes,
);
pack_items.push(futures::future::ready(anyhow::Ok(pack_item)));
pack_items.push(anyhow::Ok(pack_item));
}
let objects_stream = stream::iter(pack_items);
Ok(objects_stream)
Expand Down Expand Up @@ -213,19 +211,15 @@ async fn validate_staggered_packfile_generation() -> anyhow::Result<()> {
}))?);
// Validate we are able to write the object to the packfile without errors
packfile_writer
.write(stream::iter(vec![futures::future::ready(
PackfileItem::new_base(tag_bytes),
)]))
.write(stream::iter(vec![PackfileItem::new_base(tag_bytes)]))
.await
.expect("Expected successful write of object to packfile");
let blob_bytes = Bytes::from(to_vec_bytes(&gix_object::Object::Blob(gix_object::Blob {
data: "Some file content".as_bytes().to_vec(),
}))?);
// Validate we are able to write the object to the packfile without errors
packfile_writer
.write(stream::iter(vec![futures::future::ready(
PackfileItem::new_base(blob_bytes),
)]))
.write(stream::iter(vec![PackfileItem::new_base(blob_bytes)]))
.await
.expect("Expected successful write of object to packfile");
let tree_bytes = Bytes::from(to_vec_bytes(&gix_object::Object::Tree(gix_object::Tree {
Expand All @@ -237,9 +231,7 @@ async fn validate_staggered_packfile_generation() -> anyhow::Result<()> {
}))?);
// Validate we are able to write the object to the packfile without errors
packfile_writer
.write(stream::iter(vec![futures::future::ready(
PackfileItem::new_base(tree_bytes),
)]))
.write(stream::iter(vec![PackfileItem::new_base(tree_bytes)]))
.await
.expect("Expected successful write of object to packfile");

Expand Down Expand Up @@ -395,29 +387,23 @@ async fn validate_staggered_bundle_generation() -> anyhow::Result<()> {
}))?);
// Validate we are able to write the object to the bundle without errors
bundle_writer
.write(stream::iter(vec![futures::future::ready(
PackfileItem::new_base(tag_bytes),
)]))
.write(stream::iter(vec![PackfileItem::new_base(tag_bytes)]))
.await
.expect("Expected successful write of object to bundle");
let blob_bytes = Bytes::from(to_vec_bytes(&gix_object::Object::Blob(gix_object::Blob {
data: "Some file content".as_bytes().to_vec(),
}))?);
// Validate we are able to write the object to the bundle without errors
bundle_writer
.write(stream::iter(vec![futures::future::ready(
PackfileItem::new_base(blob_bytes),
)]))
.write(stream::iter(vec![PackfileItem::new_base(blob_bytes)]))
.await
.expect("Expected successful write of object to bundle");
let tree_bytes = Bytes::from(to_vec_bytes(&gix_object::Object::Tree(gix_object::Tree {
entries: vec![],
}))?);
// Validate we are able to write the object to the bundle without errors
bundle_writer
.write(stream::iter(vec![futures::future::ready(
PackfileItem::new_base(tree_bytes),
)]))
.write(stream::iter(vec![PackfileItem::new_base(tree_bytes)]))
.await
.expect("Expected successful write of object to bundle");
// Validate we are able to finish writing to the bundle
Expand Down
7 changes: 3 additions & 4 deletions eden/mononoke/tools/admin/src/commands/git_bundle/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use clap::Args;
use context::CoreContext;
use flate2::write::ZlibDecoder;
use futures::stream;
use futures::Future;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
Expand Down Expand Up @@ -205,8 +204,8 @@ async fn get_refs(refs_path: PathBuf) -> Result<HashMap<String, ObjectId>> {
/// their content as a stream
async fn get_objects_stream(
object_paths: Vec<PathBuf>,
) -> impl Stream<Item = impl Future<Output = Result<PackfileItem>>> {
stream::iter(object_paths.into_iter().map(|path| {
) -> impl Stream<Item = Result<PackfileItem>> {
stream::iter(object_paths.into_iter().map(anyhow::Ok)).and_then(move |path| {
async move {
// Fetch the Zlib encoded content of the Git object
let encoded_data = tokio::fs::read(path.as_path())
Expand All @@ -219,7 +218,7 @@ async fn get_objects_stream(
decoded_data = decoder.finish()?;
PackfileItem::new_base(Bytes::from(decoded_data))
}
}))
})
}

fn get_files_in_dir_recursive<P>(path: PathBuf, predicate: P) -> Result<Vec<PathBuf>>
Expand Down

0 comments on commit 61813bb

Please sign in to comment.