Skip to content

Commit

Permalink
fix: working on tests
Browse files Browse the repository at this point in the history
  • Loading branch information
veeso committed Jul 16, 2024
1 parent b16082b commit 4630de4
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 32 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ version = "0.1.0"

[dependencies]
chrono = "^0.4"
futures-util = "0.3"
k8s-openapi = { version = "0.22", features = ["v1_30"] }
kube = { version = "0.92", features = ["client", "config", "ws"] }
lazy-regex = "3"
Expand All @@ -23,9 +24,11 @@ tar = "0.4"
tempfile = "3"
thiserror = "^1"
tokio = { version = "1", features = ["fs", "rt"] }
tokio-util = "0.7"

[dev-dependencies]
env_logger = "^0.11"
kube = { version = "0.92", features = ["client", "config", "runtime", "ws"] }
pretty_assertions = "1"
rand = "^0.8.4"
serde_json = "1"
Expand Down
18 changes: 18 additions & 0 deletions scripts/delete-all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

MINIKUBE=$(which minikube)
KUBECTL="$MINIKUBE kubectl"
if [ -z "$MINIKUBE" ]; then
echo "Minikube is not installed. Please install it first."
echo '$ curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64'
echo '$ sudo install minikube-linux-amd64 /usr/local/bin/minikube && rm minikube-linux-amd64'

exit 1
fi

# start minikube
minikube start
$KUBECTL delete pods --all

# stop
minikube stop
2 changes: 0 additions & 2 deletions scripts/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ export MINIKUBE_IP="$(minikube ip)"
echo "minikube IP: $MINIKUBE_IP"

$KUBECTL create namespace default
$KUBECTL config set-context --current --namespace=your-namespace
$KUBECTL config set-cluster minikube --server=https://$MINIKUBE_IP:8443

cargo test --features integration-tests $@
RC=$?
Expand Down
140 changes: 110 additions & 30 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;

use futures_util::StreamExt as _;
use k8s_openapi::api::core::v1::Pod;
use kube::api::AttachParams;
use kube::{Api, Client, Config};
Expand All @@ -16,7 +17,7 @@ use remotefs::fs::{
UnixPexClass, Welcome, WriteStream,
};
use remotefs::File;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::io::AsyncWriteExt as _;
use tokio::runtime::Runtime;

use crate::utils::{fmt as fmt_utils, parser as parser_utils, path as path_utils};
Expand Down Expand Up @@ -199,36 +200,83 @@ impl KubeFs {
cmd: impl std::fmt::Display,
path: &Path,
) -> RemoteResult<(u32, String)> {
let shell_cmd = format!(r#"sh -c "cd {}; {}; exit $?""#, path.display(), cmd);
const STDOUT_SIZE: usize = 2048;

let shell_cmd = format!(r#"cd {} && {}; echo -n ";$?""#, path.display(), cmd);
debug!("Executing shell command: {}", shell_cmd);

self.runtime.block_on(async {
let args = shell_cmd.split_ascii_whitespace().collect::<Vec<&str>>();
let attach_params = AttachParams::default().stdout(true);
let attach_params = AttachParams::default()
.stdout(true)
.stdin(false)
.stderr(true)
.max_stdout_buf_size(STDOUT_SIZE);

let mut process = self
.pods
.as_ref()
.unwrap()
.exec(&self.pod_name, args, &attach_params)
.exec(
&self.pod_name,
vec!["/bin/sh", "-c", &shell_cmd],
&attach_params,
)
.await
.map_err(|err| RemoteError::new_ex(RemoteErrorType::ProtocolError, err))?;

let status = process.take_status().unwrap().await;
let stdout_reader =
tokio_util::io::ReaderStream::new(process.stdout().ok_or_else(|| {
RemoteError::new_ex(RemoteErrorType::ProtocolError, "failed to read stdout")
})?);

let mut stdout_reader = process.stdout().ok_or_else(|| {
RemoteError::new_ex(RemoteErrorType::ProtocolError, "failed to read stdout")
})?;
let mut buf = [0u8; 1024];
stdout_reader
.read_exact(&mut buf)
let stdout = stdout_reader
.filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
.collect::<Vec<_>>()
.await
.map_err(|err| RemoteError::new_ex(RemoteErrorType::ProtocolError, err))?;
.join("");

// if level is debug print stderr
if log::log_enabled!(log::Level::Debug) {
let stderr_reader =
tokio_util::io::ReaderStream::new(process.stderr().ok_or_else(|| {
RemoteError::new_ex(RemoteErrorType::ProtocolError, "failed to read stderr")
})?);

let stderr = stderr_reader
.filter_map(|r| async {
r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok())
})
.collect::<Vec<_>>()
.await
.join("");
debug!("Shell command stderr: {stderr}",);
}

let stdout = String::from_utf8_lossy(&buf).to_string();
let status = status.and_then(|status| status.code).unwrap_or_default() as u32;
process.join().await.map_err(|err| {
RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
})?;

// collect rc from stdout
// count the number of tokens
let token_count = stdout.chars().filter(|c| *c == ';').count();
let mut tokens = stdout.split(';');
// stdout is all tokens, except the last one
let stdout = tokens
.by_ref()
.take(token_count)
.collect::<Vec<&str>>()
.join(";");
// last token is the return code
let rc = tokens
.next()
.ok_or_else(|| RemoteError::new(RemoteErrorType::ProtocolError))?
.parse::<u32>()
.map_err(|_| RemoteError::new(RemoteErrorType::ProtocolError))?;

Ok((status, stdout))
debug!("Shell command exit code: {rc}",);
debug!("Shell command output: {stdout}");

Ok((rc, stdout))
})
}

Expand Down Expand Up @@ -291,13 +339,18 @@ impl RemoteFs for KubeFs {
})?;

debug!("Connection established with pod {}", self.pod_name);
// Get working directory
debug!("Getting working directory...");
self.wrkdir = self
.shell_cmd("pwd")
.map(|x| PathBuf::from(x.as_str().trim()))?;
// Set pods
self.pods = Some(api);
debug!("Getting working directory...");
// Get working directory
let wrkdir = self.shell_cmd("pwd")?;
if !wrkdir.starts_with('/') {
return Err(RemoteError::new_ex(
RemoteErrorType::ConnectionError,
format!("bad pwd response: {wrkdir}"),
));
}
self.wrkdir = PathBuf::from(wrkdir.trim());
info!(
"Connection established; working directory: {}",
self.wrkdir.display()
Expand Down Expand Up @@ -1629,15 +1682,27 @@ mod test {
// setup pod
debug!("setting up pod");
// config for minikube
let mut auth_info = AuthInfo::default();
auth_info.username = Some("minikube".to_string());
// get home
let home = std::env::var("HOME").unwrap();
auth_info.client_certificate =
Some(format!("{home}/.minikube/profiles/minikube/client.crt"));
auth_info.client_key = Some(format!("{home}/.minikube/profiles/minikube/client.key"));

//let root_cert = format!("{home}/.minikube/ca.crt");

debug!("Auth info: {auth_info:?}");

let config = Config {
cluster_url: format!("https://{minikube_ip}:8843").parse().unwrap(),
cluster_url: format!("https://{minikube_ip}:8443").parse().unwrap(),
default_namespace: "default".to_string(),
read_timeout: None,
root_cert: None,
connect_timeout: None,
write_timeout: None,
accept_invalid_certs: true,
auth_info: AuthInfo::default(),
auth_info,
proxy_url: None,
tls_server_name: None,
};
Expand All @@ -1652,8 +1717,9 @@ mod test {
"metadata": { "name": pod_name },
"spec": {
"containers": [{
"name": pod_name,
"image": "clux/blog:0.1.0"
"name": "alpine",
"image": "alpine" ,
"command": ["tail", "-f", "/dev/null"],
}],
}
}))
Expand All @@ -1672,18 +1738,31 @@ mod test {

debug!("Pod created");

let establish = kube::runtime::wait::await_condition(
pods.clone(),
&pod_name,
kube::runtime::conditions::is_pod_running(),
);

info!("Waiting for pod to be running...");
let _ = tokio::time::timeout(std::time::Duration::from_secs(30), establish)
.await
.expect("pod timeout");

pods
});

let mut client = KubeFs::new(&pod_name, &runtime).config(config.clone());
assert!(client.connect().is_ok());
client.connect().expect("connection failed");
// Create wrkdir
let tempdir = PathBuf::from(generate_tempdir());
assert!(client
client
.create_dir(tempdir.as_path(), UnixPex::from(0o775))
.is_ok());
.expect("failed to create tempdir");
// Change directory
assert!(client.change_dir(tempdir.as_path()).is_ok());
client
.change_dir(tempdir.as_path())
.expect("failed to enter tempdir");
(pods, client)
}

Expand All @@ -1703,8 +1782,8 @@ mod test {
client.runtime.block_on(async {
let dp = DeleteParams::default();
pods.delete(&pod_name, &dp).await.unwrap().map_left(|pdel| {
assert_eq!(pdel.name_any(), "blog");
info!("Deleting {pod_name} pod started: {:?}", pdel);
assert_eq!(pdel.name_any(), pod_name);
});
})
}
Expand All @@ -1717,6 +1796,7 @@ mod test {
.sample_string(&mut thread_rng(), 8)
.chars()
.filter(|c| c.is_alphabetic())
.map(|c| c.to_ascii_lowercase())
.take(8)
.collect();

Expand Down

0 comments on commit 4630de4

Please sign in to comment.