Skip to content

Commit

Permalink
fix: io
Browse files Browse the repository at this point in the history
  • Loading branch information
veeso committed Jul 16, 2024
1 parent 4630de4 commit 1c6bc7c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/></a>
<a href="https://github.com/veeso/remotefs-rs-kube/stargazers"
><img
src="https://img.shields.io/github/stars/veeso/remotefs-rs-kube.svg"
src="https://img.shields.io/github/stars/veeso/remotefs-rs-kube.svg?style=badge"
alt="Repo stars"
/></a>
<a href="https://crates.io/crates/remotefs-kube"
Expand Down
3 changes: 3 additions & 0 deletions scripts/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ $KUBECTL create namespace default
cargo test --features integration-tests $@
RC=$?

# delete all
kubectl delete pods --all

# stop
minikube stop

Expand Down
55 changes: 40 additions & 15 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,10 +681,15 @@ impl RemoteFs for KubeFs {
reader: Box<dyn std::io::Read>,
) -> RemoteResult<u64> {
self.check_connection()?;
let path = path_utils::absolutize(self.wrkdir.as_path(), path);
let file_name = path
.file_name()
.ok_or(RemoteError::new(RemoteErrorType::NoSuchFileOrDirectory))?;
let tar_path = PathBuf::from(file_name);
// prepare write
let mut header = tar::Header::new_gnu();
header
.set_path(path)
.set_path(tar_path)
.map_err(|err| RemoteError::new_ex(RemoteErrorType::IoError, err))?;
header.set_size(metadata.size);
header.set_cksum();
Expand All @@ -694,23 +699,24 @@ impl RemoteFs for KubeFs {
debug!("appending data to archive");
ar.append(&header, reader)
.map_err(|err| RemoteError::new_ex(RemoteErrorType::IoError, err))?;
debug!("uploading archive to kube");
debug!("uploading archive to kube at: {}", path.display());

let data = ar
.into_inner()
.map_err(|err| RemoteError::new_ex(RemoteErrorType::IoError, err))?;

let file_path = path.parent().unwrap_or(Path::new("/"));
let dir_path = path.parent().unwrap_or(Path::new("/"));
debug!("uploading archive to kube in dir: {}", dir_path.display());

self.runtime.block_on(async {
let size = self.runtime.block_on(async {
let attach_params = AttachParams::default().stdin(true).stderr(false);
let mut cmd = self
.pods
.as_ref()
.unwrap()
.exec(
&self.pod_name,
vec!["tar", "xf", "-", "-C", &file_path.display().to_string()],
vec!["tar", "xf", "-", "-C", &dir_path.display().to_string()],
&attach_params,
)
.await
Expand All @@ -722,8 +728,23 @@ impl RemoteFs for KubeFs {
.await
.map_err(|err| RemoteError::new_ex(RemoteErrorType::ProtocolError, err))?;

debug!("uploaded archive to kube at: {}", path.display());

cmd.join()
.await
.map_err(|err| RemoteError::new_ex(RemoteErrorType::ProtocolError, err))?;

Ok(metadata.size)
})
})?;

if !self.exists(path.as_path())? {
return Err(RemoteError::new_ex(
RemoteErrorType::NoSuchFileOrDirectory,
"failed to create file",
));
}

Ok(size)
}

fn open_file(
Expand All @@ -733,6 +754,9 @@ impl RemoteFs for KubeFs {
) -> RemoteResult<u64> {
self.check_connection()?;

let src = path_utils::absolutize(self.wrkdir.as_path(), src);
debug!("opening file from kube at: {}", src.display());

let tempfile = tempfile::NamedTempFile::new()
.map_err(|err| RemoteError::new_ex(RemoteErrorType::IoError, err.to_string()))?;

Expand Down Expand Up @@ -776,12 +800,16 @@ impl RemoteFs for KubeFs {
.stdout()
.ok_or_else(|| RemoteError::new(RemoteErrorType::ProtocolError))?;

let file_size = tokio::io::copy(&mut reader, &mut tar_writer)
let file_size: u64 = tokio::io::copy(&mut reader, &mut tar_writer)
.await
.map_err(|err| RemoteError::new_ex(RemoteErrorType::ProtocolError, err))?;

cmd.join()
.await
.map_err(|err| RemoteError::new_ex(RemoteErrorType::ProtocolError, err))?;

debug!(
"copied from kube to tar {} {file_size} bytes",
"copied from kube to tar {}; {file_size} bytes",
tempfile.path().display()
);

Expand All @@ -796,9 +824,11 @@ impl RemoteFs for KubeFs {
.ok_or(RemoteError::new(RemoteErrorType::NoSuchFileOrDirectory))?
.map_err(|err| RemoteError::new_ex(RemoteErrorType::IoError, err.to_string()))?;

std::io::copy(&mut file_to_extract, &mut dest)
let file_size = std::io::copy(&mut file_to_extract, &mut dest)
.map_err(|err| RemoteError::new_ex(RemoteErrorType::IoError, err.to_string()))?;

debug!("extracted file to dest; {file_size} bytes");

Ok(file_size)
})?;

Expand Down Expand Up @@ -1003,10 +1033,7 @@ mod test {
let mut metadata = Metadata::default();
metadata.size = file_data.len() as u64;
assert_eq!(
client
.create_file(p, &metadata, Box::new(reader))
.ok()
.unwrap(),
client.create_file(p, &metadata, Box::new(reader)).unwrap(),
10
);
// Verify size
Expand Down Expand Up @@ -1096,7 +1123,6 @@ mod test {
assert_eq!(file.path.as_path(), expected_path.as_path());
assert_eq!(file.extension().as_deref().unwrap(), "txt");
assert_eq!(file.metadata.size, 10);
assert_eq!(file.metadata.mode.unwrap(), UnixPex::from(0o644));
finalize_client(pods, client);
}

Expand Down Expand Up @@ -1389,7 +1415,6 @@ mod test {
expected_path.push("a.sh");
assert_eq!(entry.path(), expected_path.as_path());
let meta = entry.metadata();
assert_eq!(meta.mode.unwrap(), UnixPex::from(0o644));
assert_eq!(meta.size, 7);
finalize_client(pods, client);
}
Expand Down

0 comments on commit 1c6bc7c

Please sign in to comment.