From 276548dad44d6f91e4f3f628ce9f1235b5978f54 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Tue, 12 Jul 2022 16:59:18 +0200 Subject: [PATCH] add docker subcommand --- Cargo.toml | 2 +- src/channel.rs | 4 ++ src/commands/docker.rs | 134 +++++++++++++++++++++++++++++++++++++++ src/commands/generate.rs | 3 +- src/commands/mod.rs | 6 +- src/commands/prepare.rs | 23 +++++-- src/commands/run.rs | 26 +++----- src/commands/stop.rs | 6 +- src/main.rs | 6 +- src/runner.rs | 12 ++-- src/step.rs | 19 +++--- 11 files changed, 196 insertions(+), 45 deletions(-) create mode 100644 src/commands/docker.rs diff --git a/Cargo.toml b/Cargo.toml index 100e501..061af94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nautirust" -version = "0.1.1" +version = "0.1.2" edition = "2021" [dependencies] diff --git a/src/channel.rs b/src/channel.rs index 415c259..fe0e671 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -13,6 +13,7 @@ pub struct Channel { #[serde(rename = "requiredFields")] pub required_fields: Vec, pub start: Option, + pub docker: Option, pub stop: Option, pub options: Vec, #[serde(skip_serializing)] @@ -30,6 +31,7 @@ impl<'de> Deserialize<'de> for Channel { struct Ch { id: String, start: Option, + pub docker: Option, stop: Option, #[serde(rename = "requiredFields")] required_fields: Vec, @@ -39,6 +41,7 @@ impl<'de> Deserialize<'de> for Channel { required_fields, id, start, + docker, stop, options, } = ::deserialize(deserializer)?; @@ -64,6 +67,7 @@ impl<'de> Deserialize<'de> for Channel { start, stop, location: None, + docker, options, schema, required_fields, diff --git a/src/commands/docker.rs b/src/commands/docker.rs new file mode 100644 index 0000000..6489d71 --- /dev/null +++ b/src/commands/docker.rs @@ -0,0 +1,134 @@ +use std::collections::HashSet; +use std::env; +use std::path::Path; +use std::process::Child; + +use async_std::fs::{self, read_to_string, write}; +use tempdir::TempDir; + +use super::run::Steps; +use crate::channel::Channel; +use crate::runner::Runner; + +/// Create a docker-compose file from a nautirust pipeline +#[derive(clap::Args, Debug)] +pub struct Command { + /// Config file + file: String, + #[clap(short, long)] + output: bool, + /// temporary directory to put step configuration files + #[clap(short, long)] + tmp_dir: Option, +} + +impl Command { + pub(crate) async fn execute( + self, + channels: Vec, + runners: Vec, + ) { + let mut tmp_dir = None; + + if let Some(l) = &self.tmp_dir { + fs::create_dir_all(l).await.unwrap(); + } + + let path = self + .tmp_dir + .as_ref() + .map(|l| Path::new(l).to_owned()) + .unwrap_or_else(|| { + let tmp = TempDir::new("orchestrator").unwrap(); + let out = tmp.path().to_owned(); + tmp_dir = Some(tmp); + out + }); + + let content = read_to_string(self.file).await.unwrap(); + let values: Steps = serde_json::from_str(&content).unwrap(); + + // Check if each runner can docker + let find_runner = |id| runners.iter().find(|r| &r.id == id).unwrap(); + let runners_without_docker = values + .steps + .iter() + .map(|s| find_runner(&s.processor_config.runner_id)) + .filter(|runner| runner.docker.is_none()) + .map(|r| &r.id) + .collect::>(); + + if !runners_without_docker.is_empty() { + eprintln!( + "Not all runners support dockerization ({:?})", + runners_without_docker + ); + return; + } + + let mut procs: Vec = Vec::new(); + + let used_channels = super::get_used_channels(&content, &channels); + used_channels.for_each( + |Channel { + docker, location, .. + }| { + super::add_add_subproc(docker, location.as_ref(), &mut procs) + }, + ); + + for value in &values.steps { + let file = path.join(format!("{}.json", value.processor_config.id)); + let config = serde_json::to_vec_pretty(&value).unwrap(); + + write(file.clone(), config).await.unwrap(); + + let runner = find_runner(&value.processor_config.runner_id); + + let config_path = format!( + "'{}'", + file.canonicalize().expect("canonicalize path").display() + ); + let current_dir = format!( + "'{}'", + env::current_dir() + .unwrap() + .canonicalize() + .expect("canonicalize path") + .display() + ); + + let script = runner + .docker + .clone() + .unwrap() + .replace("{config}", &config_path) + .replace("{cwd}", ¤t_dir); + + super::add_add_subproc( + &script.into(), + runner.location.as_ref(), + &mut procs, + ); + } + + let docker_header = "services:\n"; + let docker_content: String = [docker_header.to_string()] + .into_iter() + .chain( + procs + .into_iter() + .map(|p| p.wait_with_output().expect("finish process")) + .map(|output| { + String::from_utf8(output.stdout).expect("invalid utf-8") + }), + ) + .collect(); + + if self.output { + write("docker-compose.yml", docker_content).await.unwrap(); + } else { + println!("{}", docker_content); + } + } +} diff --git a/src/commands/generate.rs b/src/commands/generate.rs index aaf9b67..45e7eb9 100644 --- a/src/commands/generate.rs +++ b/src/commands/generate.rs @@ -27,11 +27,12 @@ pub struct Command { automatic: bool, } +pub type Args = HashMap; #[derive(Serialize, Deserialize, Debug)] pub struct RunConfig { #[serde(rename = "processorConfig")] processor: Step, - args: HashMap, + args: Args, } #[derive(Debug)] diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 5159671..8748a70 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,5 +1,5 @@ use std::path::{Path, PathBuf}; -use std::process::Child; +use std::process::{Child, Stdio}; use clap::Subcommand; use jsonpath_rust::JsonPathQuery; @@ -8,6 +8,7 @@ use serde_json::Value; use crate::channel::Channel; use crate::runner::Runner; +pub mod docker; pub mod generate; pub mod prepare; pub mod run; @@ -18,6 +19,7 @@ pub mod validate; pub enum Command { Generate(generate::Command), Run(run::Command), + Docker(docker::Command), Prepare(prepare::Command), Validate(validate::Command), Stop(stop::Command), @@ -28,6 +30,7 @@ impl Command { match self { Command::Generate(gen) => gen.execute(channels, runners).await, Command::Run(run) => run.execute(channels, runners).await, + Command::Docker(docker) => docker.execute(channels, runners).await, Command::Validate(validate) => { validate.execute(channels, runners).await } @@ -87,6 +90,7 @@ fn start_subproc, S: AsRef>( let location = location.and_then(expand_tilde); let mut proc = std::process::Command::new("sh"); + proc.stdout(Stdio::piped()); proc.args(["-c", script.as_ref()]); if let Some(location) = location { diff --git a/src/commands/prepare.rs b/src/commands/prepare.rs index 3be6b42..ad6cc3a 100644 --- a/src/commands/prepare.rs +++ b/src/commands/prepare.rs @@ -2,9 +2,10 @@ use std::process::Child; use async_std::fs::read_to_string; -use super::run::Values; +use super::run::{RunThing, Steps}; use crate::channel::Channel; use crate::runner::Runner; +use crate::step::Step; /// Prepares the execution pipeline by starting the required channels/runner #[derive(clap::Args, Debug)] @@ -14,12 +15,12 @@ pub struct Command { } impl Command { - pub async fn execute(self, _channels: Vec, runners: Vec) { + pub async fn execute(self, channels: Vec, runners: Vec) { let content = read_to_string(self.file).await.unwrap(); - let values: Values = serde_json::from_str(&content).unwrap(); + let values: Steps = serde_json::from_str(&content).unwrap(); let mut procs: Vec = Vec::new(); - let used_channels = super::get_used_channels(&content, &_channels); + let used_channels = super::get_used_channels(&content, &channels); used_channels.for_each( |Channel { start, location, .. @@ -30,7 +31,7 @@ impl Command { let used_runners = runners.iter().filter(|runner| { values - .values + .steps .iter() .any(|v| v.processor_config.runner_id == runner.id) }); @@ -45,6 +46,18 @@ impl Command { }, ); + values.steps.iter().for_each( + |RunThing { + processor_config: + Step { + build, location, .. + }, + .. + }| { + super::add_add_subproc(build, location.as_ref(), &mut procs); + }, + ); + // Stops the processors in the reverse order while !procs.is_empty() { procs.pop().unwrap().wait().unwrap(); diff --git a/src/commands/run.rs b/src/commands/run.rs index a4b48df..8191045 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -8,29 +8,23 @@ use serde_json::Value; use tempdir::TempDir; use crate::channel::Channel; +use crate::commands::generate::Args; use crate::runner::Runner; - -#[derive(Serialize, Deserialize, Debug)] -pub struct ProcConfig { - pub id: String, - #[serde(rename = "runnerId")] - pub runner_id: String, - #[serde(flatten)] - other: Value, -} +use crate::step::Step; #[derive(Serialize, Deserialize, Debug)] pub struct RunThing { #[serde(rename = "processorConfig")] - pub processor_config: ProcConfig, - + pub processor_config: Step, + args: Args, #[serde(flatten)] - other: Value, + rest: Value, } #[derive(Serialize, Deserialize, Debug)] -pub struct Values { - pub values: Vec, +pub struct Steps { + #[serde(rename = "values")] + pub steps: Vec, } /// Run a configured pipeline @@ -67,11 +61,11 @@ impl Command { }); let content = read_to_string(self.file).await.unwrap(); - let values: Values = serde_json::from_str(&content).unwrap(); + let values: Steps = serde_json::from_str(&content).unwrap(); let mut procs: Vec = Vec::new(); - for value in values.values { + for value in values.steps { let file = path.join(format!("{}.json", value.processor_config.id)); let config = serde_json::to_vec_pretty(&value).unwrap(); diff --git a/src/commands/stop.rs b/src/commands/stop.rs index 646af5d..715d53e 100644 --- a/src/commands/stop.rs +++ b/src/commands/stop.rs @@ -2,7 +2,7 @@ use std::process::Child; use async_std::fs::read_to_string; -use super::run::Values; +use super::run::Steps; use crate::channel::Channel; use crate::runner::Runner; @@ -16,7 +16,7 @@ pub struct Command { impl Command { pub async fn execute(self, _channels: Vec, runners: Vec) { let content = read_to_string(self.file).await.unwrap(); - let values: Values = serde_json::from_str(&content).unwrap(); + let values: Steps = serde_json::from_str(&content).unwrap(); let mut procs: Vec = Vec::new(); let used_channels = super::get_used_channels(&content, &_channels); @@ -27,7 +27,7 @@ impl Command { let used_runners = runners.iter().filter(|runner| { values - .values + .steps .iter() .any(|v| v.processor_config.runner_id == runner.id) }); diff --git a/src/main.rs b/src/main.rs index 1693f97..cd33d1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ mod commands; mod runner; mod step; -const TOML_LOCATION: &'static str = "orchestrator.toml"; +const TOML_LOCATION: &str = "orchestrator.toml"; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -58,7 +58,6 @@ enum Commands { #[derive(Serialize, Deserialize, Debug)] struct AppConfig { - tmp_dir: String, /// Glob to indicate channel locations channels: String, /// Glob to indicate runner locations @@ -74,13 +73,12 @@ async fn load_cfg(args: Args) -> Result<(AppConfig, Command), Box> { // First set some default value let mut builder = ConfigBuilder::::default() .set_default("channels", "channels")? - .set_default("tmp_dir", "tmp")? .set_default("runners", "runners")?; // Try to override with config things for toml in &tomls { if Path::new(&toml).exists().await { - builder = builder.add_source(config::File::with_name(&toml)); + builder = builder.add_source(config::File::with_name(toml)); } else { eprintln!("config file not found '{}'", toml); } diff --git a/src/runner.rs b/src/runner.rs index 020091b..c011610 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -12,6 +12,7 @@ use crate::channel::Channel; pub struct Runner { pub id: String, pub start: Option, + pub docker: Option, pub stop: Option, #[serde(rename = "runnerScript")] pub script: String, @@ -35,6 +36,7 @@ impl<'de> Deserialize<'de> for Runner { #[derive(Deserialize)] struct R { pub id: String, + pub docker: Option, pub start: Option, pub stop: Option, #[serde(rename = "runnerScript")] @@ -51,6 +53,7 @@ impl<'de> Deserialize<'de> for Runner { start, can_use_channel, can_use_serialization, + docker, stop, script, id, @@ -69,6 +72,7 @@ impl<'de> Deserialize<'de> for Runner { start, schema, required_fields, + docker, can_use_channel, can_use_serialization, stop, @@ -78,16 +82,16 @@ impl<'de> Deserialize<'de> for Runner { } } -pub async fn parse_runners(path: &str, channels: &Vec) -> Vec { +pub async fn parse_runners(path: &str, channels: &[Channel]) -> Vec { let mut runners = Vec::new(); - let mut iterator = glob(path) + let iterator = glob(path) .expect("Failed to read channels glob pattern") .flatten() .map(parse_runner); - let channel_exists = |id: &str| channels.iter().any(|c| &c.id == id); + let channel_exists = |id: &str| channels.iter().any(|c| c.id == id); - while let Some(item) = iterator.next() { + for item in iterator { match item.await { Ok(runner) => { if runner.can_use_channel.iter().fold( diff --git a/src/step.rs b/src/step.rs index b2f2762..43ed886 100644 --- a/src/step.rs +++ b/src/step.rs @@ -25,6 +25,7 @@ pub struct Step { #[serde(rename = "runnerId")] pub runner_id: String, pub config: Value, + pub build: Option, pub args: Vec, pub location: Option, } @@ -36,25 +37,23 @@ fn config_is_valid(schema: &JSONSchema, config: &Value) -> bool { }); return false; } - return true; + + true } -pub async fn parse_steps<'a, S, I>( - paths: I, - runners: &'a Vec, -) -> Vec +pub async fn parse_steps<'a, S, I>(paths: I, runners: &'a [Runner]) -> Vec where S: AsRef + 'a, I: IntoIterator, { let mut steps = Vec::new(); - let mut iterator = paths.into_iter().map(parse_step); + let iterator = paths.into_iter().map(parse_step); - while let Some(item) = iterator.next() { + for item in iterator { match item.await { Ok(step) => { if let Some(runner) = - runners.iter().find(|runner| &runner.id == &step.runner_id) + runners.iter().find(|runner| runner.id == step.runner_id) { if config_is_valid(&runner.schema, &step.config) { steps.push(step); @@ -70,8 +69,8 @@ where steps } -pub async fn parse_step<'a, S: AsRef>( - path: &'a S, +pub async fn parse_step>( + path: &'_ S, ) -> Result> { let p = path.as_ref(); let loc = p