Skip to content

Commit

Permalink
add docker subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Jul 12, 2022
1 parent 041120e commit 276548d
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nautirust"
version = "0.1.1"
version = "0.1.2"
edition = "2021"

[dependencies]
Expand Down
4 changes: 4 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct Channel {
#[serde(rename = "requiredFields")]
pub required_fields: Vec<String>,
pub start: Option<String>,
pub docker: Option<String>,
pub stop: Option<String>,
pub options: Vec<Value>,
#[serde(skip_serializing)]
Expand All @@ -30,6 +31,7 @@ impl<'de> Deserialize<'de> for Channel {
struct Ch {
id: String,
start: Option<String>,
pub docker: Option<String>,
stop: Option<String>,
#[serde(rename = "requiredFields")]
required_fields: Vec<String>,
Expand All @@ -39,6 +41,7 @@ impl<'de> Deserialize<'de> for Channel {
required_fields,
id,
start,
docker,
stop,
options,
} = <Ch as Deserialize>::deserialize(deserializer)?;
Expand All @@ -64,6 +67,7 @@ impl<'de> Deserialize<'de> for Channel {
start,
stop,
location: None,
docker,
options,
schema,
required_fields,
Expand Down
134 changes: 134 additions & 0 deletions src/commands/docker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::collections::HashSet;
use std::env;
use std::path::Path;
use std::process::Child;

use async_std::fs::{self, read_to_string, write};
use tempdir::TempDir;

use super::run::Steps;
use crate::channel::Channel;
use crate::runner::Runner;

/// Create a docker-compose file from a nautirust pipeline
#[derive(clap::Args, Debug)]
pub struct Command {
/// Config file
file: String,
#[clap(short, long)]
output: bool,
/// temporary directory to put step configuration files
#[clap(short, long)]
tmp_dir: Option<String>,
}

impl Command {
pub(crate) async fn execute(
self,
channels: Vec<Channel>,
runners: Vec<Runner>,
) {
let mut tmp_dir = None;

if let Some(l) = &self.tmp_dir {
fs::create_dir_all(l).await.unwrap();
}

let path = self
.tmp_dir
.as_ref()
.map(|l| Path::new(l).to_owned())
.unwrap_or_else(|| {
let tmp = TempDir::new("orchestrator").unwrap();
let out = tmp.path().to_owned();
tmp_dir = Some(tmp);
out
});

let content = read_to_string(self.file).await.unwrap();
let values: Steps = serde_json::from_str(&content).unwrap();

// Check if each runner can docker
let find_runner = |id| runners.iter().find(|r| &r.id == id).unwrap();
let runners_without_docker = values
.steps
.iter()
.map(|s| find_runner(&s.processor_config.runner_id))
.filter(|runner| runner.docker.is_none())
.map(|r| &r.id)
.collect::<HashSet<_>>();

if !runners_without_docker.is_empty() {
eprintln!(
"Not all runners support dockerization ({:?})",
runners_without_docker
);
return;
}

let mut procs: Vec<Child> = Vec::new();

let used_channels = super::get_used_channels(&content, &channels);
used_channels.for_each(
|Channel {
docker, location, ..
}| {
super::add_add_subproc(docker, location.as_ref(), &mut procs)
},
);

for value in &values.steps {
let file = path.join(format!("{}.json", value.processor_config.id));
let config = serde_json::to_vec_pretty(&value).unwrap();

write(file.clone(), config).await.unwrap();

let runner = find_runner(&value.processor_config.runner_id);

let config_path = format!(
"'{}'",
file.canonicalize().expect("canonicalize path").display()
);
let current_dir = format!(
"'{}'",
env::current_dir()
.unwrap()
.canonicalize()
.expect("canonicalize path")
.display()
);

let script = runner
.docker
.clone()
.unwrap()
.replace("{config}", &config_path)
.replace("{cwd}", &current_dir);

super::add_add_subproc(
&script.into(),
runner.location.as_ref(),
&mut procs,
);
}

let docker_header = "services:\n";
let docker_content: String = [docker_header.to_string()]
.into_iter()
.chain(
procs
.into_iter()
.map(|p| p.wait_with_output().expect("finish process"))
.map(|output| {
String::from_utf8(output.stdout).expect("invalid utf-8")
}),
)
.collect();

if self.output {
write("docker-compose.yml", docker_content).await.unwrap();
} else {
println!("{}", docker_content);
}
}
}
3 changes: 2 additions & 1 deletion src/commands/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub struct Command {
automatic: bool,
}

pub type Args = HashMap<String, Value>;
#[derive(Serialize, Deserialize, Debug)]
pub struct RunConfig {
#[serde(rename = "processorConfig")]
processor: Step,
args: HashMap<String, Value>,
args: Args,
}

#[derive(Debug)]
Expand Down
6 changes: 5 additions & 1 deletion src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::path::{Path, PathBuf};
use std::process::Child;
use std::process::{Child, Stdio};

use clap::Subcommand;
use jsonpath_rust::JsonPathQuery;
Expand All @@ -8,6 +8,7 @@ use serde_json::Value;
use crate::channel::Channel;
use crate::runner::Runner;

pub mod docker;
pub mod generate;
pub mod prepare;
pub mod run;
Expand All @@ -18,6 +19,7 @@ pub mod validate;
pub enum Command {
Generate(generate::Command),
Run(run::Command),
Docker(docker::Command),
Prepare(prepare::Command),
Validate(validate::Command),
Stop(stop::Command),
Expand All @@ -28,6 +30,7 @@ impl Command {
match self {
Command::Generate(gen) => gen.execute(channels, runners).await,
Command::Run(run) => run.execute(channels, runners).await,
Command::Docker(docker) => docker.execute(channels, runners).await,
Command::Validate(validate) => {
validate.execute(channels, runners).await
}
Expand Down Expand Up @@ -87,6 +90,7 @@ fn start_subproc<Str: AsRef<str>, S: AsRef<Path>>(
let location = location.and_then(expand_tilde);

let mut proc = std::process::Command::new("sh");
proc.stdout(Stdio::piped());
proc.args(["-c", script.as_ref()]);

if let Some(location) = location {
Expand Down
23 changes: 18 additions & 5 deletions src/commands/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::process::Child;

use async_std::fs::read_to_string;

use super::run::Values;
use super::run::{RunThing, Steps};
use crate::channel::Channel;
use crate::runner::Runner;
use crate::step::Step;

/// Prepares the execution pipeline by starting the required channels/runner
#[derive(clap::Args, Debug)]
Expand All @@ -14,12 +15,12 @@ pub struct Command {
}

impl Command {
pub async fn execute(self, _channels: Vec<Channel>, runners: Vec<Runner>) {
pub async fn execute(self, channels: Vec<Channel>, runners: Vec<Runner>) {
let content = read_to_string(self.file).await.unwrap();
let values: Values = serde_json::from_str(&content).unwrap();
let values: Steps = serde_json::from_str(&content).unwrap();

let mut procs: Vec<Child> = Vec::new();
let used_channels = super::get_used_channels(&content, &_channels);
let used_channels = super::get_used_channels(&content, &channels);
used_channels.for_each(
|Channel {
start, location, ..
Expand All @@ -30,7 +31,7 @@ impl Command {

let used_runners = runners.iter().filter(|runner| {
values
.values
.steps
.iter()
.any(|v| v.processor_config.runner_id == runner.id)
});
Expand All @@ -45,6 +46,18 @@ impl Command {
},
);

values.steps.iter().for_each(
|RunThing {
processor_config:
Step {
build, location, ..
},
..
}| {
super::add_add_subproc(build, location.as_ref(), &mut procs);
},
);

// Stops the processors in the reverse order
while !procs.is_empty() {
procs.pop().unwrap().wait().unwrap();
Expand Down
26 changes: 10 additions & 16 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,23 @@ use serde_json::Value;
use tempdir::TempDir;

use crate::channel::Channel;
use crate::commands::generate::Args;
use crate::runner::Runner;

#[derive(Serialize, Deserialize, Debug)]
pub struct ProcConfig {
pub id: String,
#[serde(rename = "runnerId")]
pub runner_id: String,
#[serde(flatten)]
other: Value,
}
use crate::step::Step;

#[derive(Serialize, Deserialize, Debug)]
pub struct RunThing {
#[serde(rename = "processorConfig")]
pub processor_config: ProcConfig,

pub processor_config: Step,
args: Args,
#[serde(flatten)]
other: Value,
rest: Value,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Values {
pub values: Vec<RunThing>,
pub struct Steps {
#[serde(rename = "values")]
pub steps: Vec<RunThing>,
}

/// Run a configured pipeline
Expand Down Expand Up @@ -67,11 +61,11 @@ impl Command {
});

let content = read_to_string(self.file).await.unwrap();
let values: Values = serde_json::from_str(&content).unwrap();
let values: Steps = serde_json::from_str(&content).unwrap();

let mut procs: Vec<Child> = Vec::new();

for value in values.values {
for value in values.steps {
let file = path.join(format!("{}.json", value.processor_config.id));
let config = serde_json::to_vec_pretty(&value).unwrap();

Expand Down
6 changes: 3 additions & 3 deletions src/commands/stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::process::Child;

use async_std::fs::read_to_string;

use super::run::Values;
use super::run::Steps;
use crate::channel::Channel;
use crate::runner::Runner;

Expand All @@ -16,7 +16,7 @@ pub struct Command {
impl Command {
pub async fn execute(self, _channels: Vec<Channel>, runners: Vec<Runner>) {
let content = read_to_string(self.file).await.unwrap();
let values: Values = serde_json::from_str(&content).unwrap();
let values: Steps = serde_json::from_str(&content).unwrap();

let mut procs: Vec<Child> = Vec::new();
let used_channels = super::get_used_channels(&content, &_channels);
Expand All @@ -27,7 +27,7 @@ impl Command {

let used_runners = runners.iter().filter(|runner| {
values
.values
.steps
.iter()
.any(|v| v.processor_config.runner_id == runner.id)
});
Expand Down
6 changes: 2 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod commands;
mod runner;
mod step;

const TOML_LOCATION: &'static str = "orchestrator.toml";
const TOML_LOCATION: &str = "orchestrator.toml";

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand Down Expand Up @@ -58,7 +58,6 @@ enum Commands {

#[derive(Serialize, Deserialize, Debug)]
struct AppConfig {
tmp_dir: String,
/// Glob to indicate channel locations
channels: String,
/// Glob to indicate runner locations
Expand All @@ -74,13 +73,12 @@ async fn load_cfg(args: Args) -> Result<(AppConfig, Command), Box<dyn Error>> {
// First set some default value
let mut builder = ConfigBuilder::<DefaultState>::default()
.set_default("channels", "channels")?
.set_default("tmp_dir", "tmp")?
.set_default("runners", "runners")?;

// Try to override with config things
for toml in &tomls {
if Path::new(&toml).exists().await {
builder = builder.add_source(config::File::with_name(&toml));
builder = builder.add_source(config::File::with_name(toml));
} else {
eprintln!("config file not found '{}'", toml);
}
Expand Down
Loading

0 comments on commit 276548d

Please sign in to comment.