Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Jun 23, 2022
1 parent bb2f74e commit 5cfe4a9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 53 deletions.
43 changes: 34 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,55 @@ An orchestrator for the connector architecture.
This architecture defines processors that take input.
Input can be many things, mostly configuration stuff, streamReaders and streamWriters.

Readers and writers are connectors that use a channel to forward messages (currently all is json).
What channel is used, usually doesn't really matter, the orchestrator asks the user what channel can be used.
Readers and writers are connectors that use a channel to forward messages in some serialization (json, turtle, xml, plain).
What channel is used, usually doesn't really matter, they all have the same interface.

Processors are executed with runners, the task of a runner is to start up or configure the content around the processor
that the processor will use the specified connector.
Nautirust is primariy used to configure pipelines that consist of steps.
Each step corresponds to a processor and is configured with a step configuration file denoting the expected runner, runner specific arguments (source file, source function) and arguments to be configured.
Nautirust configures the parameters used for the processors per step. To do this it mostly asks the user the actual _implementations_ of the arguments.
Nautirust understands that readers and writers have to be linked up to function (the same channel configuration), this way it can guide the user in creating the correct pipeline.

After configuring the steps of a pipeline a pipeline config is generated. This config contains all the actual arguments (including channel configuration).

When Nautirust executes a configured pipeline, it executes a specific _runner_ for a specific step.
For example, if a step consists of a JS function, then a JSRunner is used to actually execute this function.
Ideally the runner starts up the channel and provides the step with a instance of a reader or writer and the configured arguments.

## Usage

Generate a plan:
Example step file (for more details see later)
```json
{
"id": "helloWorld",
"runnerId": "JsRunner",
"config": {
"jsFile": "main.js",
"methodName": "sayHello"
},
"args": [
{
"id": "to",
"type": "string"
}
]
}
```

Generate a plan and save it to plan.json:
```
cargo run -- generate -o plan.json [...steps]
```


Execute the plan:
```
cargo run -- run plan.json
```


## Configuration

Channels and runners have to be defined, this can be done with command line arguments or a config file.

```toml
channels = "configs/channels/*.json"
runners = "configs/runners/*/runner.json"
Expand Down Expand Up @@ -62,7 +89,6 @@ Example runner configuration:
{
"id": "JsRunner",
"runnerScript": "node ./lib/index.js {config} {cwd}",
"stopScript": "...",
"canUseChannel": [
"file", "ws"
],
Expand All @@ -75,10 +101,9 @@ Example runner configuration:

Here a runner called JsRunner is defined. Required fields are
- `runnerScript`: how is the runner started
- `stopScript`: how the runner is stopped
- `canUseChannel`: what channels can this runner provide to the processor

Runners can also be configured by a step. Here it requires a `jsFile` and a `methodName`.
When a runner is configured in a step `jsFile` and `methodName` have to be provided.


### Step configuration
Expand Down
62 changes: 32 additions & 30 deletions src/commands/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ use crate::channel::{Channel, ChannelConfig};
use crate::runner::Runner;
use crate::step::{self, Step, StepArguments};

/// Generate json that is ready to execute
/// Generate a pipeline of steps
#[derive(clap::Args, Debug)]
pub struct Command {
/// Steps to include in the pipeline (ordered)
steps: Vec<String>,

/// Output location of the generated pipeline file
#[clap(short, long)]
output: Option<String>,

/// Try infer basic configurations details
#[clap(short, long)]
automatic: bool,
}
Expand All @@ -28,15 +31,15 @@ pub struct Command {
pub struct RunConfig {
#[serde(rename = "processorConfig")]
processor: Step,
args: HashMap<String, Value>,
args: HashMap<String, Value>,
}

#[derive(Debug)]
struct TmpTarget<'a> {
step_id: &'a str,
writer_id: &'a str,
name: &'a str,
possible_channels: &'a Vec<String>,
step_id: &'a str,
writer_id: &'a str,
name: &'a str,
possible_channels: &'a Vec<String>,
possible_serializations: &'a Vec<String>,
}

Expand Down Expand Up @@ -86,7 +89,7 @@ impl Completion for Complete {
common = &common[..index];
}

if common.len() == 0 {
if common.is_empty() {
None
} else {
Some(common.to_string())
Expand Down Expand Up @@ -184,10 +187,10 @@ impl Command {

for id in ids {
let target = TmpTarget {
name: id,
writer_id: &arg.id,
step_id: &step.id,
possible_channels: channel_types,
name: id,
writer_id: &arg.id,
step_id: &step.id,
possible_channels: channel_types,
possible_serializations: serialization_types,
};
open_channels.push(target);
Expand Down Expand Up @@ -218,8 +221,9 @@ impl Command {
}
}

if let Some(_) =
all_step_args.insert(step.id.to_string(), step_args)
if all_step_args
.insert(step.id.to_string(), step_args)
.is_some()
{
panic!("Found multiple steps with the same id '{}'", step.id);
}
Expand All @@ -236,13 +240,13 @@ impl Command {
);

let (config, ty) = ask_user_for_channel(
&target.possible_channels,
target.possible_channels,
&mut channel_options,
self.automatic,
);

let ser =
ask_user_for_serialization(&target.possible_serializations);
ask_user_for_serialization(target.possible_serializations);

let ch_config = ChannelConfig::new(
target.name.to_string(),
Expand Down Expand Up @@ -278,8 +282,8 @@ impl Command {
}

fn create_valid_tmp_target_fn<'a>(
channel_types: &'a Vec<String>,
ser_types: &'a Vec<String>,
channel_types: &'a [String],
ser_types: &'a [String],
) -> impl for<'r, 's> Fn(&'r TmpTarget<'s>) -> bool + 'a {
|ch: &TmpTarget| {
ch.possible_channels
Expand All @@ -299,8 +303,8 @@ fn get_if_only_one<T, I: Iterator<Item = T>>(mut iter: I) -> Option<T> {

fn ask_channel_config<'a>(
id: &str,
channel_types: &Vec<String>,
ser_types: &Vec<String>,
channel_types: &[String],
ser_types: &[String],
open_channels: &mut Vec<TmpTarget<'a>>,
channel_options: &mut HashMap<String, Vec<Value>>,
automatic: bool,
Expand Down Expand Up @@ -341,7 +345,7 @@ fn ask_channel_config<'a>(
// serialization that are both possible for the current processor and that target
let (target, types, sers) = {
if n >= options.len() {
(None, channel_types.clone(), ser_types.clone())
(None, channel_types.to_owned(), ser_types.to_owned())
} else {
let target = open_channels.remove(n);

Expand Down Expand Up @@ -372,18 +376,18 @@ fn ask_channel_config<'a>(
))
}

fn ask_user_for_serialization(options: &Vec<String>) -> String {
let ser_index = ask_user_for("What serialization?", &options, false);
fn ask_user_for_serialization(options: &[String]) -> String {
let ser_index = ask_user_for("What serialization?", options, false);

options[ser_index].to_string()
}

fn ask_user_for_channel<'a>(
types: &'a Vec<String>,
types: &'a [String],
channel_options: &mut HashMap<String, Vec<Value>>,
automatic: bool,
) -> (Value, &'a String) {
let ty_index = ask_user_for("Choose channel type", &types, false);
let ty_index = ask_user_for("Choose channel type", types, false);
let ty = &types[ty_index];

let options = channel_options.get_mut(ty).unwrap();
Expand All @@ -400,9 +404,9 @@ fn ask_user_for_channel<'a>(
(options.remove(channel_index), ty)
}

fn ask_user_for<'a, T: std::fmt::Display>(
fn ask_user_for<T: std::fmt::Display>(
name: &str,
things: &'a Vec<T>,
things: &'_ [T],
allow_other: bool,
) -> usize {
let theme = ColorfulTheme::default();
Expand All @@ -414,11 +418,9 @@ fn ask_user_for<'a, T: std::fmt::Display>(
item.item("Other");
}

let index = loop {
loop {
if let Ok(output) = item.interact() {
break output;
}
};

index
}
}
13 changes: 2 additions & 11 deletions src/commands/prepare.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
use std::process::Child;

use async_std::fs::read_to_string;
use serde::{Deserialize, Serialize};

use crate::channel::Channel;
use crate::runner::Runner;

use super::run::Values;

#[derive(Serialize, Deserialize, Debug)]
struct PrepareConfig {
id: String,
#[serde(rename = "stopScript")]
script: String,
location: String,
}

// Prepares the execution pipeline by starting the required channels/services defined in the config file
/// Prepares the execution pipeline by starting the required channels/runner
#[derive(clap::Args, Debug)]
pub struct Command {
/// Config file
file: String,
}

Expand Down
5 changes: 3 additions & 2 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ pub struct Values {
pub values: Vec<RunThing>,
}

/// Run the actual configs
/// Run a configured pipeline
#[derive(clap::Args, Debug)]
pub struct Command {
/// Config file
file: String,
/// tmpdir to put temporary files
/// temporary directory to put step configuration files
#[clap(short, long)]
tmp_dir: Option<String>,
}
Expand Down
1 change: 1 addition & 0 deletions src/commands/stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::runner::Runner;
/// Gracefully stop the runners and channels specified in the config
#[derive(clap::Args, Debug)]
pub struct Command {
/// Config file
file: String,
}

Expand Down
2 changes: 1 addition & 1 deletion src/commands/validate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::channel::Channel;
use crate::runner::Runner;

/// Validate configs for runners and channels
/// Validate configureations for runners and channels
#[derive(clap::Args, Debug)]
pub struct Command {}
impl Command {
Expand Down

0 comments on commit 5cfe4a9

Please sign in to comment.