From 9a2b34f3ee2f3fec3e68b74aad0fe52d9bcb343b Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Mon, 22 Apr 2024 10:14:57 +0900 Subject: [PATCH 1/4] kble: move run function to app module --- kble/src/app.rs | 28 ++++++++++++++++++++++++++++ kble/src/main.rs | 3 ++- kble/src/spaghetti.rs | 33 ++------------------------------- 3 files changed, 32 insertions(+), 32 deletions(-) create mode 100644 kble/src/app.rs diff --git a/kble/src/app.rs b/kble/src/app.rs new file mode 100644 index 0000000..d411756 --- /dev/null +++ b/kble/src/app.rs @@ -0,0 +1,28 @@ +use crate::{plug, spaghetti::Config}; +use anyhow::{anyhow, Result}; +use futures::future; +use futures::StreamExt; +use std::collections::HashMap; + +pub async fn run(config: &Config) -> Result<()> { + let mut sinks = HashMap::new(); + let mut streams = HashMap::new(); + for (name, url) in config.plugs.iter() { + let (sink, stream) = plug::connect(url).await?; + sinks.insert(name.as_str(), sink); + streams.insert(name.as_str(), stream); + } + let mut edges = vec![]; + for (stream_name, sink_name) in config.links.iter() { + let Some(stream) = streams.remove(stream_name.as_str()) else { + return Err(anyhow!("No such plug: {stream_name}")); + }; + let Some(sink) = sinks.remove(sink_name.as_str()) else { + return Err(anyhow!("No such plug or already used: {sink_name}")); + }; + let edge = stream.forward(sink); + edges.push(edge); + } + future::try_join_all(edges).await?; + Ok(()) +} diff --git a/kble/src/main.rs b/kble/src/main.rs index 1fda032..5b3a3d5 100644 --- a/kble/src/main.rs +++ b/kble/src/main.rs @@ -6,6 +6,7 @@ use notalawyer_clap::*; mod plug; mod spaghetti; +mod app; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -30,6 +31,6 @@ impl Args { async fn main() -> Result<()> { let args = Args::parse_with_license_notice(include_notice!()); let config = args.load_spaghetti_config()?; - config.run().await?; + app::run(&config).await?; Ok(()) } diff --git a/kble/src/spaghetti.rs b/kble/src/spaghetti.rs index bddabf5..c6d1824 100644 --- a/kble/src/spaghetti.rs +++ b/kble/src/spaghetti.rs @@ -1,41 +1,12 @@ use std::collections::HashMap; -use anyhow::{anyhow, Result}; -use futures::{future, StreamExt}; use serde::{Deserialize, Serialize}; use url::Url; -use crate::plug; - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Config { - plugs: HashMap, - links: HashMap, -} - -impl Config { - pub async fn run(&self) -> Result<()> { - let mut sinks = HashMap::new(); - let mut streams = HashMap::new(); - for (name, url) in self.plugs.iter() { - let (sink, stream) = plug::connect(url).await?; - sinks.insert(name.as_str(), sink); - streams.insert(name.as_str(), stream); - } - let mut edges = vec![]; - for (stream_name, sink_name) in self.links.iter() { - let Some(stream) = streams.remove(stream_name.as_str()) else { - return Err(anyhow!("No such plug: {stream_name}")); - }; - let Some(sink) = sinks.remove(sink_name.as_str()) else { - return Err(anyhow!("No such plug or already used: {sink_name}")); - }; - let edge = stream.forward(sink); - edges.push(edge); - } - future::try_join_all(edges).await?; - Ok(()) - } + pub plugs: HashMap, + pub links: HashMap, } #[cfg(test)] From 74ccdbdf583dbacb4c22e5cdef3018501be71312 Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Mon, 22 Apr 2024 10:15:01 +0900 Subject: [PATCH 2/4] kble: implement config validation --- kble/src/app.rs | 8 ++-- kble/src/main.rs | 10 +++-- kble/src/spaghetti.rs | 97 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 104 insertions(+), 11 deletions(-) diff --git a/kble/src/app.rs b/kble/src/app.rs index d411756..8cc9780 100644 --- a/kble/src/app.rs +++ b/kble/src/app.rs @@ -7,18 +7,18 @@ use std::collections::HashMap; pub async fn run(config: &Config) -> Result<()> { let mut sinks = HashMap::new(); let mut streams = HashMap::new(); - for (name, url) in config.plugs.iter() { + for (name, url) in config.plugs().iter() { let (sink, stream) = plug::connect(url).await?; sinks.insert(name.as_str(), sink); streams.insert(name.as_str(), stream); } let mut edges = vec![]; - for (stream_name, sink_name) in config.links.iter() { + for (stream_name, sink_name) in config.links().iter() { let Some(stream) = streams.remove(stream_name.as_str()) else { - return Err(anyhow!("No such plug: {stream_name}")); + unreachable!("No such plug: {stream_name}"); }; let Some(sink) = sinks.remove(sink_name.as_str()) else { - return Err(anyhow!("No such plug or already used: {sink_name}")); + unreachable!("No such plug or already used: {sink_name}"); }; let edge = stream.forward(sink); edges.push(edge); diff --git a/kble/src/main.rs b/kble/src/main.rs index 5b3a3d5..86f1897 100644 --- a/kble/src/main.rs +++ b/kble/src/main.rs @@ -4,9 +4,11 @@ use anyhow::{Context, Result}; use clap::Parser; use notalawyer_clap::*; +mod app; mod plug; mod spaghetti; -mod app; + +use spaghetti::{Config, Raw}; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -22,8 +24,10 @@ impl Args { .open(&self.spaghetti) .with_context(|| format!("Failed to open {:?}", &self.spaghetti))?; let spagetthi_rdr = std::io::BufReader::new(spaghetti_file); - serde_yaml::from_reader(spagetthi_rdr) - .with_context(|| format!("Unable to parse {:?}", self.spaghetti)) + let raw: Config = serde_yaml::from_reader(spagetthi_rdr) + .with_context(|| format!("Unable to parse {:?}", self.spaghetti))?; + raw.validate() + .with_context(|| format!("Invalid configuration in {:?}", self.spaghetti)) } } diff --git a/kble/src/spaghetti.rs b/kble/src/spaghetti.rs index c6d1824..15c8fa3 100644 --- a/kble/src/spaghetti.rs +++ b/kble/src/spaghetti.rs @@ -1,12 +1,75 @@ +use anyhow::{anyhow, Result}; use std::collections::HashMap; use serde::{Deserialize, Serialize}; use url::Url; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct Config { - pub plugs: HashMap, - pub links: HashMap, +pub struct Inner { + plugs: HashMap, + links: HashMap, +} + +#[derive(PartialEq, Debug)] +pub enum Raw {} +pub enum Validated {} + +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct Config { + #[serde(flatten)] + inner: Inner, + state: std::marker::PhantomData, +} + +impl<'de> serde::Deserialize<'de> for Config { + fn deserialize(deserializer: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + let inner = Inner::deserialize(deserializer)?; + Ok(Config::new(inner)) + } +} + +impl Config { + fn new(inner: Inner) -> Self { + Config { + inner, + state: std::marker::PhantomData, + } + } +} + +impl Config { + pub fn validate(self) -> Result> { + use std::collections::HashSet; + let mut seen_sinks = HashSet::new(); + + for (stream_name, sink_name) in self.inner.links.iter() { + if !self.inner.plugs.contains_key(stream_name) { + return Err(anyhow!("No such plug: {stream_name}")); + } + if !self.inner.plugs.contains_key(sink_name) { + return Err(anyhow!("No such plug: {sink_name}")); + } + + if seen_sinks.contains(sink_name) { + return Err(anyhow!("Sink {sink_name} used more than once")); + } + seen_sinks.insert(sink_name); + } + Ok(Config::new(self.inner)) + } +} + +impl Config { + pub fn plugs(&self) -> &HashMap { + &self.inner.plugs + } + + pub fn links(&self) -> &HashMap { + &self.inner.links + } } #[cfg(test)] @@ -18,7 +81,7 @@ mod tests { #[test] fn test_de() { let yaml = "plugs:\n tfsync: exec:tfsync foo\n seriald: ws://seriald.local/\nlinks:\n tfsync: seriald\n"; - let expected = Config { + let inner = Inner { plugs: HashMap::from_iter([ ("tfsync".to_string(), Url::parse("exec:tfsync foo").unwrap()), ( @@ -28,7 +91,33 @@ mod tests { ]), links: HashMap::from_iter([("tfsync".to_string(), "seriald".to_string())]), }; + let expected = Config { + inner, + state: std::marker::PhantomData, + }; let actual = serde_yaml::from_str(yaml).unwrap(); assert_eq!(expected, actual); + actual.validate().unwrap(); + } + + #[test] + fn test_de_invalid_dest() { + let yaml = "plugs:\n tfsync: exec:tfsync foo\n seriald: ws://seriald.local/\nlinks:\n tfsync: serialdxxxx\n"; + let actual: Config = serde_yaml::from_str(yaml).unwrap(); + assert!(actual.validate().is_err()); + } + + #[test] + fn test_de_invalid_source() { + let yaml = "plugs:\n tfsync: exec:tfsync foo\n seriald: ws://seriald.local/\nlinks:\n tfsyncxxxx: seriald\n"; + let actual: Config = serde_yaml::from_str(yaml).unwrap(); + assert!(actual.validate().is_err()); + } + + #[test] + fn test_de_duplicate_sink() { + let yaml = "plugs:\n tfsync: exec:tfsync foo\n seriald: ws://seriald.local/\nlinks:\n tfsync: seriald\n seriald: seriald\n"; + let actual: Config = serde_yaml::from_str(yaml).unwrap(); + assert!(actual.validate().is_err()); } } From e2a0ac65be200bb837d5d9755abad3482d1e1372 Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Mon, 22 Apr 2024 10:15:01 +0900 Subject: [PATCH 3/4] kble: introduce tracing-subscriber --- Cargo.lock | 2 ++ kble/Cargo.toml | 2 ++ kble/src/main.rs | 10 ++++++++++ 3 files changed, 14 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 216d91c..d9024e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -770,6 +770,8 @@ dependencies = [ "serde_yaml", "tokio", "tokio-tungstenite", + "tracing", + "tracing-subscriber", "url", ] diff --git a/kble/Cargo.toml b/kble/Cargo.toml index 2912729..e1d2c86 100644 --- a/kble/Cargo.toml +++ b/kble/Cargo.toml @@ -24,5 +24,7 @@ clap.workspace = true serde.workspace = true serde_yaml = "0.9" serde_with = "3.7" +tracing-subscriber.workspace = true +tracing.workspace = true notalawyer.workspace = true notalawyer-clap.workspace = true diff --git a/kble/src/main.rs b/kble/src/main.rs index 86f1897..09862eb 100644 --- a/kble/src/main.rs +++ b/kble/src/main.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use clap::Parser; use notalawyer_clap::*; +use tracing_subscriber::{prelude::*, EnvFilter}; mod app; mod plug; @@ -33,6 +34,15 @@ impl Args { #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_writer(std::io::stderr), + ) + .with(EnvFilter::from_default_env()) + .init(); + let args = Args::parse_with_license_notice(include_notice!()); let config = args.load_spaghetti_config()?; app::run(&config).await?; From 49f70e4f21e5adfe35561d1651be6fb7a4d6b8dc Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Mon, 22 Apr 2024 18:53:05 +0900 Subject: [PATCH 4/4] kble: fix warning --- kble/src/app.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kble/src/app.rs b/kble/src/app.rs index 8cc9780..0fa07e0 100644 --- a/kble/src/app.rs +++ b/kble/src/app.rs @@ -1,5 +1,5 @@ use crate::{plug, spaghetti::Config}; -use anyhow::{anyhow, Result}; +use anyhow::Result; use futures::future; use futures::StreamExt; use std::collections::HashMap;