From d71b85790810419521d37026c08492558d7295a9 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 22 Nov 2022 13:38:52 +0100 Subject: [PATCH 01/10] feat: add support for WebOfThings and C8Y --- .../src/service/session/dialect/mod.rs | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/mqtt-endpoint/src/service/session/dialect/mod.rs b/mqtt-endpoint/src/service/session/dialect/mod.rs index 96cdfd2a..c45eb7d3 100644 --- a/mqtt-endpoint/src/service/session/dialect/mod.rs +++ b/mqtt-endpoint/src/service/session/dialect/mod.rs @@ -154,6 +154,28 @@ impl DefaultTopicParser for MqttDialect { }), } } + Self::WebOfThings { .. } => { + let topic = path.split_once('/'); + log::debug!("Topic: {:?}", topic); + + match topic { + // No topic at all + None if path.is_empty() => Err(ParseError::Empty), + None => Ok(ParsedPublishTopic { + channel: "", + device: Some(path), + }), + Some(("", _)) => Err(ParseError::Syntax), + Some((device, path)) => Ok(ParsedPublishTopic { + channel: path, + device: Some(device), + }), + } + } + Self::Cumulocity => { + log::debug!("c8y: {path}"); + Err(ParseError::Syntax) + } } } @@ -193,6 +215,22 @@ impl DefaultTopicParser for MqttDialect { _ => Err(ParseError::Syntax), } } + Self::WebOfThings { node_wot_bug } => match path.split_once('/') { + Some((device, filter)) => Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::ProxiedDevice(device), + command: Some(filter), + }, + encoder: SubscriptionTopicEncoder::new(WoTCommandTopicEncoder { + node_wot_bug: *node_wot_bug, + }), + }), + _ => Err(ParseError::Syntax), + }, + Self::Cumulocity => { + log::debug!("c8y: {path}"); + Err(ParseError::Syntax) + } } } } From 18df305909b6b34571015085d9c5528d85d4e11d Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 12:30:36 +0100 Subject: [PATCH 02/10] feat: allow publishing commands via MQTT with slash Commands are derived from the MQTT topic the user publishes refactor on the MQTT integration. So far, the prevented the user from using a forward slash (/) as part of the command. This change allows to work around this limitation by taking the remainder of the topic as command name. --- mqtt-integration/src/service/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt-integration/src/service/session.rs b/mqtt-integration/src/service/session.rs index 077877db..641fed88 100644 --- a/mqtt-integration/src/service/session.rs +++ b/mqtt-integration/src/service/session.rs @@ -254,7 +254,7 @@ impl Session { #[async_trait(?Send)] impl mqtt::Session for Session { async fn publish(&self, publish: Publish<'_>) -> Result<(), PublishError> { - let topic = publish.topic().path().split('/').collect::>(); + let topic = publish.topic().path().splitn(4, '/').collect::>(); if topic.len() != 4 || !topic[0].eq_ignore_ascii_case("command") { log::info!("Invalid topic name {:?}", topic); From eca040d03cd5ac43db7fe69f06c05d2b6c63728b Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 12:31:05 +0100 Subject: [PATCH 03/10] refactor: add a bit of logging --- endpoint-common/src/command/commands.rs | 11 ++++++++--- endpoint-common/src/command/source.rs | 2 ++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/endpoint-common/src/command/commands.rs b/endpoint-common/src/command/commands.rs index a740c663..c9634b0c 100644 --- a/endpoint-common/src/command/commands.rs +++ b/endpoint-common/src/command/commands.rs @@ -111,6 +111,7 @@ pub struct Subscription { impl Commands { pub fn new() -> Self { + log::info!("New internal command broker"); Self { devices: Arc::new(Mutex::new(HashMap::new())), wildcards: Arc::new(Mutex::new(HashMap::new())), @@ -218,7 +219,7 @@ impl Commands { { log::debug!("Adding entry for: {key:?}"); - let map = match map.entry(key) { + let entry_map = match map.entry(key) { Entry::Occupied(entry) => entry.into_mut(), Entry::Vacant(entry) => entry.insert(HashMap::new()), }; @@ -226,13 +227,14 @@ impl Commands { loop { let id: usize = rand::random(); - match map.entry(id) { + match entry_map.entry(id) { Entry::Vacant(entry) => { // entry was free, we can insert entry.insert(value); break id; } Entry::Occupied(_) => { + log::debug!("ID clash, retrying"); // entry is occupied, we need to re-try } } @@ -263,9 +265,11 @@ impl CommandDispatcher for Commands { log::debug!("Dispatching command to {:?}", msg.address); + let mut possible: usize = 0; let mut num: usize = 0; if let Some(senders) = self.devices.lock().await.get(&msg.address) { + possible += senders.len(); log::debug!( "Sending command {:?} sent to device {:?}", msg.command, @@ -278,6 +282,7 @@ impl CommandDispatcher for Commands { msg.address.app_id.clone(), msg.address.gateway_id.clone(), )) { + possible += senders.len(); log::debug!( "Sending command {:?} sent to wildcard {:?}", msg.command, @@ -286,7 +291,7 @@ impl CommandDispatcher for Commands { num += dispatch_command(senders.values(), &msg).await; } - log::debug!("Sent to {} receivers", num); + log::debug!("Sent to {num} receivers of {possible}"); } } diff --git a/endpoint-common/src/command/source.rs b/endpoint-common/src/command/source.rs index 6e20e86f..4129b777 100644 --- a/endpoint-common/src/command/source.rs +++ b/endpoint-common/src/command/source.rs @@ -38,6 +38,8 @@ impl KafkaCommandSource { where D: CommandDispatcher + Send + Sync + 'static, { + log::info!("Starting Kafka command source"); + let mut source = EventStream::::new(EventStreamConfig { kafka: KafkaConfig { topic: config.topic, From 3776023b2d437fc9cd65a9e5fe702afbbcc43057 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 14:42:13 +0100 Subject: [PATCH 04/10] feat: implement an Azure dialect --- Cargo.lock | 3 +- Cargo.toml | 2 +- endpoint-common/src/auth.rs | 2 + mqtt-endpoint/Cargo.toml | 1 + mqtt-endpoint/src/service/app.rs | 5 +- .../src/service/session/dialect/az.rs | 19 +++ .../src/service/session/dialect/encoder.rs | 62 +++++++ .../src/service/session/dialect/mod.rs | 159 ++++++++++++------ mqtt-endpoint/src/service/session/mod.rs | 89 +++++++--- 9 files changed, 258 insertions(+), 84 deletions(-) create mode 100644 mqtt-endpoint/src/service/session/dialect/az.rs create mode 100644 mqtt-endpoint/src/service/session/dialect/encoder.rs diff --git a/Cargo.lock b/Cargo.lock index a02245c1..8f0ac2d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1465,7 +1465,7 @@ dependencies = [ [[package]] name = "drogue-client" version = "0.12.0" -source = "git+https://github.com/drogue-iot/drogue-client?rev=798c968f0a63a0debcff9965c66b361e85946458#798c968f0a63a0debcff9965c66b361e85946458" +source = "git+https://github.com/drogue-iot/drogue-client?rev=c3e5fc6e0ef1781f8362394a114b72738990d00d#c3e5fc6e0ef1781f8362394a114b72738990d00d" dependencies = [ "async-trait", "base64 0.13.1", @@ -2103,6 +2103,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "url", "uuid", "webpki", ] diff --git a/Cargo.toml b/Cargo.toml index 1f007049..70cd32c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ testcontainers = { git = "https://github.com/testcontainers/testcontainers-rs", drogue-bazaar = { git = "https://github.com/drogue-iot/drogue-bazaar", rev = "d19ad32f200938aeb5d7081ee3385ee40c5ae0ff" } # FIXME: awaiting release 0.4.0 #drogue-bazaar = { path = "../drogue-bazaar" } -drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "798c968f0a63a0debcff9965c66b361e85946458" } # FIXME: awaiting release 0.12.0 +drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "c3e5fc6e0ef1781f8362394a114b72738990d00d" } # FIXME: awaiting release 0.12.0 #drogue-client = { path = "../drogue-client" } operator-framework = { git = "https://github.com/ctron/operator-framework", rev = "8366506a3ed44b638f899dcce4a82ac32fcaff9e" } # FIXME: awaiting release 0.7.0 diff --git a/endpoint-common/src/auth.rs b/endpoint-common/src/auth.rs index baf1a937..508e6e12 100644 --- a/endpoint-common/src/auth.rs +++ b/endpoint-common/src/auth.rs @@ -303,6 +303,8 @@ impl DeviceAuthenticator { } // Client cert only (None, None, _, Some(certs), None) => self.authenticate_cert(certs.0).await, + // Client cert plus username + (Some(_username), None, _, Some(certs), None) => self.authenticate_cert(certs.0).await, // TLS-PSK verified identity (None, None, _, None, Some(verified_identity)) => { self.authenticate_verified_identity(verified_identity) diff --git a/mqtt-endpoint/Cargo.toml b/mqtt-endpoint/Cargo.toml index 8ada54ea..95c4766f 100644 --- a/mqtt-endpoint/Cargo.toml +++ b/mqtt-endpoint/Cargo.toml @@ -33,6 +33,7 @@ serde_json = "1" thiserror = "1" tokio = { version = "1", features = ["full"] } tracing = { version = "0.1", features = ["log-always"] } +url = "2" uuid = { version = "1", features = ["v4"] } webpki = "0.22" diff --git a/mqtt-endpoint/src/service/app.rs b/mqtt-endpoint/src/service/app.rs index fae01acd..faa89373 100644 --- a/mqtt-endpoint/src/service/app.rs +++ b/mqtt-endpoint/src/service/app.rs @@ -188,10 +188,6 @@ impl Service for App { ) -> Result, ServerError> { log::info!("new connection: {:?}", connect); - if !connect.clean_session() { - return Err(ServerError::UnsupportedOperation); - } - let certs = connect.io().client_certs(); let verified_identity = if self.disable_psk { None @@ -246,6 +242,7 @@ impl Service for App { wildcard_subscription_available: Some(true), shared_subscription_available: Some(false), subscription_identifiers_available: Some(false), + session_present: false, ..Default::default() }, }) diff --git a/mqtt-endpoint/src/service/session/dialect/az.rs b/mqtt-endpoint/src/service/session/dialect/az.rs new file mode 100644 index 00000000..14357fe6 --- /dev/null +++ b/mqtt-endpoint/src/service/session/dialect/az.rs @@ -0,0 +1,19 @@ +use std::borrow::Cow; + +/// Split an Azure topic, which might carry a "bag of properties" as the last topic segment +pub fn split_topic(path: &str) -> (&str, Vec<(Cow, Cow)>) { + if let Some((topic, last)) = path.rsplit_once('/') { + // at least two segments + if last.starts_with("?") { + // last one is a bag of properties + let query = url::form_urlencoded::parse(&last.as_bytes()[1..]); + (topic, query.collect()) + } else { + // last one is a regular one + (path, vec![]) + } + } else { + // single topic segment + (path, vec![]) + } +} diff --git a/mqtt-endpoint/src/service/session/dialect/encoder.rs b/mqtt-endpoint/src/service/session/dialect/encoder.rs new file mode 100644 index 00000000..f40ef9bd --- /dev/null +++ b/mqtt-endpoint/src/service/session/dialect/encoder.rs @@ -0,0 +1,62 @@ +use drogue_cloud_endpoint_common::command::Command; +use std::fmt::Debug; +use std::ops::Deref; + +/// A structure boxing a dynamic [`TopicEncoder`] instance. +#[derive(Debug)] +pub struct SubscriptionTopicEncoder(Box); + +impl Deref for SubscriptionTopicEncoder { + type Target = dyn TopicEncoder; + + fn deref(&self) -> &Self::Target { + self.0.as_ref() + } +} + +impl SubscriptionTopicEncoder { + pub fn new(encoder: T) -> Self + where + T: TopicEncoder + 'static, + { + Self(Box::new(encoder)) + } +} + +/// An encoder, from commands to topic names. +/// +/// This carries over the information from the subscription request, to the encoding of topic names +/// for received commands. +pub trait TopicEncoder: Debug { + /// Encode a topic from a command, requested originally by a SUB request + fn encode_command_topic(&self, command: &Command) -> String; +} + +/// The default (Drogue V1) encoder, which expects the command inbox pattern. +#[derive(Debug)] +pub struct DefaultCommandTopicEncoder(pub bool); + +impl TopicEncoder for DefaultCommandTopicEncoder { + fn encode_command_topic(&self, command: &Command) -> String { + // if we are forced to report the device part, or the device id is not equal to the + // connected device, then we need to add it. + if self.0 || command.address.gateway_id != command.address.device_id { + format!( + "command/inbox/{}/{}", + command.address.device_id, command.command + ) + } else { + format!("command/inbox//{}", command.command) + } + } +} + +/// An encoder which uses the plain command name as topic. +#[derive(Debug)] +pub struct PlainTopicEncoder; + +impl TopicEncoder for PlainTopicEncoder { + fn encode_command_topic(&self, command: &Command) -> String { + command.command.clone() + } +} diff --git a/mqtt-endpoint/src/service/session/dialect/mod.rs b/mqtt-endpoint/src/service/session/dialect/mod.rs index c45eb7d3..03732a24 100644 --- a/mqtt-endpoint/src/service/session/dialect/mod.rs +++ b/mqtt-endpoint/src/service/session/dialect/mod.rs @@ -1,12 +1,14 @@ +mod az; +mod encoder; mod wot; +pub use encoder::*; pub use wot::*; use drogue_client::registry::v1::MqttDialect; -use drogue_cloud_endpoint_common::command::{Command, CommandFilter}; +use drogue_cloud_endpoint_common::command::CommandFilter; use drogue_cloud_service_common::Id; -use std::fmt::Debug; -use std::ops::Deref; +use std::{borrow::Cow, fmt::Debug}; use thiserror::Error; /// A topic parser for the default session. @@ -18,31 +20,6 @@ pub trait DefaultTopicParser { fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError>; } -#[derive(Debug)] -pub struct SubscriptionTopicEncoder(Box); - -impl Deref for SubscriptionTopicEncoder { - type Target = dyn TopicEncoder; - - fn deref(&self) -> &Self::Target { - self.0.as_ref() - } -} - -impl SubscriptionTopicEncoder { - pub fn new(encoder: T) -> Self - where - T: TopicEncoder + 'static, - { - Self(Box::new(encoder)) - } -} - -pub trait TopicEncoder: Debug { - /// Encode a topic from a command, requested originally by a SUB request - fn encode_command_topic(&self, command: &Command) -> String; -} - #[derive(Clone, Debug, Error, PartialEq, Eq)] pub enum ParseError { #[error("Topic syntax error")] @@ -55,6 +32,7 @@ pub enum ParseError { pub struct ParsedPublishTopic<'a> { pub channel: &'a str, pub device: Option<&'a str>, + pub properties: Vec<(Cow<'a, str>, Cow<'a, str>)>, } #[derive(Debug)] @@ -96,26 +74,25 @@ pub enum DeviceFilter<'a> { ProxiedDevice(&'a str), } -#[derive(Debug)] -pub struct DefaultCommandTopicEncoder(bool); - impl DefaultTopicParser for MqttDialect { fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { match self { Self::DrogueV1 => { // This should mimic the behavior of the current parser let topic = path.split('/').collect::>(); - log::debug!("Topic: {:?}", topic); + log::debug!("Topic: {topic:?}",); match topic.as_slice() { [""] => Err(ParseError::Empty), [channel] => Ok(ParsedPublishTopic { channel, device: None, + properties: vec![], }), [channel, as_device] => Ok(ParsedPublishTopic { channel, device: Some(as_device), + properties: vec![], }), _ => Err(ParseError::Syntax), } @@ -129,6 +106,7 @@ impl DefaultTopicParser for MqttDialect { path => Ok(ParsedPublishTopic { channel: path, device: None, + properties: vec![], }), } } @@ -138,7 +116,7 @@ impl DefaultTopicParser for MqttDialect { // Plain topic (with device prefix). Strip the device, and then just take the complete path let topic = path.split_once('/'); - log::debug!("Topic: {:?}", topic); + log::debug!("Topic: {topic:?}",); match topic { // No topic at all @@ -147,16 +125,18 @@ impl DefaultTopicParser for MqttDialect { Some(("", path)) => Ok(ParsedPublishTopic { channel: path, device: None, + properties: vec![], }), Some((device, path)) => Ok(ParsedPublishTopic { channel: path, device: Some(device), + properties: vec![], }), } } Self::WebOfThings { .. } => { let topic = path.split_once('/'); - log::debug!("Topic: {:?}", topic); + log::debug!("Topic: {topic:?}",); match topic { // No topic at all @@ -164,17 +144,49 @@ impl DefaultTopicParser for MqttDialect { None => Ok(ParsedPublishTopic { channel: "", device: Some(path), + properties: vec![], }), Some(("", _)) => Err(ParseError::Syntax), Some((device, path)) => Ok(ParsedPublishTopic { channel: path, device: Some(device), + properties: vec![], }), } } Self::Cumulocity => { - log::debug!("c8y: {path}"); - Err(ParseError::Syntax) + let topic = path.split('/').collect::>(); + log::debug!("C8Y: {topic:?}",); + + match topic.as_slice() { + [""] => Err(ParseError::Empty), + ["s", "us"] => Ok(ParsedPublishTopic { + channel: "c8y", + device: None, + properties: vec![], + }), + ["s", "us", as_device] => Ok(ParsedPublishTopic { + channel: "c8y", + device: Some(as_device), + properties: vec![], + }), + _ => Err(ParseError::Syntax), + } + } + Self::Azure => { + let (channel, properties) = az::split_topic(path); + + if channel.is_empty() { + return Err(ParseError::Empty); + } + + log::debug!("Azure: {channel} - properties: {properties:?}"); + + Ok(ParsedPublishTopic { + channel, + device: None, + properties, + }) } } } @@ -229,23 +241,28 @@ impl DefaultTopicParser for MqttDialect { }, Self::Cumulocity => { log::debug!("c8y: {path}"); - Err(ParseError::Syntax) + match path.split('/').collect::>().as_slice() { + [] => Err(ParseError::Empty), + ["s", "e"] => Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::Device, + command: None, + }, + encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(false)), + }), + _ => Err(ParseError::Syntax), + } + } + Self::Azure => { + log::debug!("Azure: {path}"); + Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::Device, + command: Some(path), + }, + encoder: SubscriptionTopicEncoder::new(PlainTopicEncoder), + }) } - } - } -} - -impl TopicEncoder for DefaultCommandTopicEncoder { - fn encode_command_topic(&self, command: &Command) -> String { - // if we are forced to report the device part, or the device id is not equal to the - // connected device, then we need to add it. - if self.0 || command.address.gateway_id != command.address.device_id { - format!( - "command/inbox/{}/{}", - command.address.device_id, command.command - ) - } else { - format!("command/inbox//{}", command.command) } } } @@ -271,6 +288,7 @@ mod test { Ok(ParsedPublishTopic { channel: "foo", device: None, + properties: vec![], }), ); // channel for another device @@ -280,6 +298,7 @@ mod test { Ok(ParsedPublishTopic { channel: "foo", device: Some("device"), + properties: vec![], }), ); } @@ -301,6 +320,7 @@ mod test { Ok(ParsedPublishTopic { channel: "foo", device: None, + properties: vec![], }), ); assert_parse( @@ -309,6 +329,7 @@ mod test { Ok(ParsedPublishTopic { channel: "foo/bar", device: None, + properties: vec![], }), ); assert_parse( @@ -317,6 +338,7 @@ mod test { Ok(ParsedPublishTopic { channel: "/bar", device: None, + properties: vec![], }), ); } @@ -339,6 +361,7 @@ mod test { Ok(ParsedPublishTopic { channel: "bar", device: Some("foo"), + properties: vec![], }), ); // device may be empty though @@ -348,6 +371,7 @@ mod test { Ok(ParsedPublishTopic { channel: "bar", device: None, + properties: vec![], }), ); // longer topic @@ -357,6 +381,37 @@ mod test { Ok(ParsedPublishTopic { channel: "bar/baz//bam/bum", device: Some("foo"), + properties: vec![], + }), + ); + } + + #[test] + fn test_azure_prefix() { + let spec: MqttSpec = serde_json::from_value(json!({"dialect":{ + "type": "azure", + }})) + .unwrap(); + + assert_parse(&spec, "", Err(ParseError::Empty)); + // simple + assert_parse( + &spec, + "foo/bar/baz", + Ok(ParsedPublishTopic { + channel: "foo/bar/baz", + device: None, + properties: vec![], + }), + ); + // properties + assert_parse( + &spec, + "foo/bar/baz/?foo=bar&bar=baz", + Ok(ParsedPublishTopic { + channel: "foo/bar/baz", + device: None, + properties: vec![("foo".into(), "bar".into()), ("bar".into(), "baz".into())], }), ); } diff --git a/mqtt-endpoint/src/service/session/mod.rs b/mqtt-endpoint/src/service/session/mod.rs index 540c033c..739f4549 100644 --- a/mqtt-endpoint/src/service/session/mod.rs +++ b/mqtt-endpoint/src/service/session/mod.rs @@ -39,6 +39,7 @@ use ntex_mqtt::{ types::QoS, v5::codec::{self, DisconnectReasonCode}, }; +use std::borrow::Cow; use std::{ cell::Cell, collections::{hash_map::Entry, HashMap}, @@ -46,6 +47,12 @@ use std::{ }; use tracing::instrument; +pub struct PublishRequest { + pub channel: String, + pub device: Arc, + pub extensions: HashMap, +} + pub struct Session { sender: DownstreamSender, application: registry::v1::Application, @@ -152,38 +159,63 @@ impl Session { } #[instrument(level = "debug", skip(self), fields(self.id = ?self.id), err)] - async fn eval_device( - &self, - publish: &Publish<'_>, - ) -> Result<(String, Arc), PublishError> { + async fn eval_device(&self, publish: &Publish<'_>) -> Result { match self.dialect.parse_publish(publish.topic().path()) { - Ok(topic) => match topic.device { - None => Ok((topic.channel.to_string(), self.device.clone())), - Some(device) if device == self.id.device_id => { - Ok((topic.channel.to_string(), self.device.clone())) + Ok(topic) => { + let channel = topic.channel.to_string(); + let extensions = into_extensions(topic.properties); + + match topic.device { + // publish as device + None => Ok(PublishRequest { + channel, + device: self.device.clone(), + extensions, + }), + // also publish as device + Some(device) if device == self.id.device_id => Ok(PublishRequest { + channel, + device: self.device.clone(), + extensions, + }), + // publish as someone else + Some(device) => self + .device_cache + .fetch(device, |device| { + self.auth + .authorize_as( + &self.application.metadata.name, + &self.device.metadata.name, + device, + ) + .map_ok(|result| match result.outcome { + GatewayOutcome::Pass { r#as } => Some(r#as), + _ => None, + }) + }) + .await + .map(|r| PublishRequest { + channel, + device: r, + extensions, + }), } - Some(device) => self - .device_cache - .fetch(device, |device| { - self.auth - .authorize_as( - &self.application.metadata.name, - &self.device.metadata.name, - device, - ) - .map_ok(|result| match result.outcome { - GatewayOutcome::Pass { r#as } => Some(r#as), - _ => None, - }) - }) - .await - .map(|r| (topic.channel.to_string(), r)), - }, + } Err(_) => Err(PublishError::TopicNameInvalid), } } } +fn into_extensions(properties: Vec<(Cow, Cow)>) -> HashMap { + let mut result = HashMap::with_capacity(properties.len()); + + for (k, v) in properties { + result.insert(k.as_ref().to_string(), v.as_ref().to_string()); + } + + result +} + #[async_trait(? Send)] impl mqtt::Session for Session { #[instrument(level = "debug", skip(self), fields(self.id = ?self.id), err)] @@ -195,7 +227,11 @@ impl mqtt::Session for Session { .and_then(|p| p.content_type.as_ref()) .map(|s| s.to_string()); - let (channel, device) = self.eval_device(&publish).await?; + let PublishRequest { + channel, + device, + extensions, + } = self.eval_device(&publish).await?; log::debug!( "Publish as {} / {} ({}) to {}", @@ -215,6 +251,7 @@ impl mqtt::Session for Session { sender: self.device.metadata.to_id(), options: PublishOptions { content_type, + extensions, ..Default::default() }, }, From 9c906adad2c1a1bb7d23160f8dbe7e670dd5d554 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 14:53:50 +0100 Subject: [PATCH 05/10] refactor: tweak topic parser --- .../src/service/session/dialect/az.rs | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/mqtt-endpoint/src/service/session/dialect/az.rs b/mqtt-endpoint/src/service/session/dialect/az.rs index 14357fe6..6c1e54bd 100644 --- a/mqtt-endpoint/src/service/session/dialect/az.rs +++ b/mqtt-endpoint/src/service/session/dialect/az.rs @@ -10,10 +10,39 @@ pub fn split_topic(path: &str) -> (&str, Vec<(Cow, Cow)>) { (topic, query.collect()) } else { // last one is a regular one - (path, vec![]) + (path.trim_end_matches('/'), vec![]) } } else { // single topic segment (path, vec![]) } } + +#[cfg(test)] +mod test { + + use super::*; + + #[test] + fn test_plain() { + assert_eq!(split_topic("foo/bar"), ("foo/bar", vec![])); + } + + #[test] + fn test_plain_slash() { + assert_eq!(split_topic("foo/bar/"), ("foo/bar", vec![])); + } + + #[test] + fn test_plain_slash_q() { + assert_eq!(split_topic("foo/bar/?"), ("foo/bar", vec![])); + } + + #[test] + fn test_properties() { + assert_eq!( + split_topic("foo/bar/?baz=123"), + ("foo/bar", vec![("baz".into(), "123".into())]) + ); + } +} From 29e8c2b882b204ed74ebcd5632fa064b62d37c85 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 17:25:54 +0100 Subject: [PATCH 06/10] refactor: move away from having the logic in a big enum Instead of having a big enum, with all combinations, we now have the logic of the different dialects in different implementations --- mqtt-endpoint/src/service/app.rs | 34 +- .../src/service/session/dialect/az.rs | 42 +++ .../src/service/session/dialect/c8y.rs | 43 +++ .../src/service/session/dialect/drogue.rs | 81 +++++ .../src/service/session/dialect/encoder.rs | 19 -- .../src/service/session/dialect/mod.rs | 301 +++++++----------- .../src/service/session/dialect/plain.rs | 46 +++ .../src/service/session/dialect/wot.rs | 47 +++ mqtt-endpoint/src/service/session/mod.rs | 10 +- 9 files changed, 400 insertions(+), 223 deletions(-) create mode 100644 mqtt-endpoint/src/service/session/dialect/c8y.rs create mode 100644 mqtt-endpoint/src/service/session/dialect/drogue.rs create mode 100644 mqtt-endpoint/src/service/session/dialect/plain.rs diff --git a/mqtt-endpoint/src/service/app.rs b/mqtt-endpoint/src/service/app.rs index faa89373..dfac3f8e 100644 --- a/mqtt-endpoint/src/service/app.rs +++ b/mqtt-endpoint/src/service/app.rs @@ -1,4 +1,8 @@ -use crate::{auth::DeviceAuthenticator, config::EndpointConfig, service::session::Session}; +use crate::{ + auth::DeviceAuthenticator, + config::EndpointConfig, + service::session::{dialect::DialectBuilder, Session}, +}; use async_trait::async_trait; use drogue_client::{ registry::v1::{Application, Device, MqttSpec}, @@ -13,6 +17,7 @@ use drogue_cloud_endpoint_common::{ }; use drogue_cloud_mqtt_common::{ error::ServerError, + mqtt, mqtt::{AckOptions, Connect, ConnectAck, Service, Sink}, }; use drogue_cloud_service_api::{ @@ -71,7 +76,6 @@ impl App { fields( application = %application.metadata.name, device = %device.metadata.name, - lwt = ?lwt, ), err(Debug) )] @@ -79,8 +83,7 @@ impl App { &self, application: Application, device: Device, - sink: Sink, - lwt: Option, + connect: &Connect<'_>, ) -> Result { // eval dialect let dialect = match device @@ -100,6 +103,16 @@ impl App { log::debug!("MQTT dialect: {dialect:?}"); + let dialect = dialect.create(); + + // prepare + + let sink = connect.sink(); + let lwt = Self::make_lwt(&connect); + log::info!("LWT: {lwt:?}"); + + dialect.validate_connect(connect)?; + // acquire session let opts = CreateOptions { lwt }; @@ -227,14 +240,11 @@ impl Service for App { device, r#as: _, }) => { - let session = self - .create_session( - application, - device, - connect.sink(), - Self::make_lwt(&connect), - ) - .await?; + if !connect.clean_session() { + return Err(ServerError::UnsupportedOperation); + } + + let session = self.create_session(application, device, &connect).await?; Ok(ConnectAck { session, diff --git a/mqtt-endpoint/src/service/session/dialect/az.rs b/mqtt-endpoint/src/service/session/dialect/az.rs index 6c1e54bd..dcce1841 100644 --- a/mqtt-endpoint/src/service/session/dialect/az.rs +++ b/mqtt-endpoint/src/service/session/dialect/az.rs @@ -1,5 +1,47 @@ +use super::*; use std::borrow::Cow; +/// Azure IoT dialect. +pub struct Azure; + +impl ConnectValidator for Azure { + fn validate_connect(&self, _connect: &Connect) -> Result<(), ServerError> { + // we accept everything + Ok(()) + } +} + +impl PublishTopicParser for Azure { + fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { + let (channel, properties) = split_topic(path); + + if channel.is_empty() { + return Err(ParseError::Empty); + } + + log::debug!("Azure: {channel} - properties: {properties:?}"); + + Ok(ParsedPublishTopic { + channel, + device: None, + properties, + }) + } +} + +impl SubscribeTopicParser for Azure { + fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError> { + log::debug!("Azure: {path}"); + Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::Device, + command: Some(path), + }, + encoder: SubscriptionTopicEncoder::new(PlainTopicEncoder), + }) + } +} + /// Split an Azure topic, which might carry a "bag of properties" as the last topic segment pub fn split_topic(path: &str) -> (&str, Vec<(Cow, Cow)>) { if let Some((topic, last)) = path.rsplit_once('/') { diff --git a/mqtt-endpoint/src/service/session/dialect/c8y.rs b/mqtt-endpoint/src/service/session/dialect/c8y.rs new file mode 100644 index 00000000..9cd03916 --- /dev/null +++ b/mqtt-endpoint/src/service/session/dialect/c8y.rs @@ -0,0 +1,43 @@ +use super::*; + +/// Cumulocity dialect. +pub struct Cumulocity; + +impl PublishTopicParser for Cumulocity { + fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { + let topic = path.split('/').collect::>(); + log::debug!("C8Y: {topic:?}",); + + match topic.as_slice() { + [""] => Err(ParseError::Empty), + ["s", "us"] => Ok(ParsedPublishTopic { + channel: "c8y", + device: None, + properties: vec![], + }), + ["s", "us", as_device] => Ok(ParsedPublishTopic { + channel: "c8y", + device: Some(as_device), + properties: vec![], + }), + _ => Err(ParseError::Syntax), + } + } +} + +impl SubscribeTopicParser for Cumulocity { + fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError> { + log::debug!("c8y: {path}"); + match path.split('/').collect::>().as_slice() { + [] => Err(ParseError::Empty), + ["s", "e"] => Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::Device, + command: None, + }, + encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(false)), + }), + _ => Err(ParseError::Syntax), + } + } +} diff --git a/mqtt-endpoint/src/service/session/dialect/drogue.rs b/mqtt-endpoint/src/service/session/dialect/drogue.rs new file mode 100644 index 00000000..0d869d39 --- /dev/null +++ b/mqtt-endpoint/src/service/session/dialect/drogue.rs @@ -0,0 +1,81 @@ +use super::*; +use drogue_cloud_endpoint_common::command::Command; + +/// Drogue IoT v1 dialect. +pub struct DrogueV1; + +impl PublishTopicParser for DrogueV1 { + fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { + // This should mimic the behavior of the current parser + let topic = path.split('/').collect::>(); + log::debug!("Topic: {topic:?}",); + + match topic.as_slice() { + [""] => Err(ParseError::Empty), + [channel] => Ok(ParsedPublishTopic { + channel, + device: None, + properties: vec![], + }), + [channel, as_device] => Ok(ParsedPublishTopic { + channel, + device: Some(as_device), + properties: vec![], + }), + _ => Err(ParseError::Syntax), + } + } +} + +impl SubscribeTopicParser for DrogueV1 { + fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError> { + match path.split('/').collect::>().as_slice() { + // subscribe to commands for all proxied devices and ourself + ["command", "inbox", "#"] | ["command", "inbox", "+", "#"] => { + Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::Wildcard, + command: None, + }, + encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(false)), + }) + } + // subscribe to commands directly for us + ["command", "inbox", "", "#"] => Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::Device, + command: None, + }, + encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(false)), + }), + // subscribe to commands for a specific device + ["command", "inbox", device, "#"] => Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::ProxiedDevice(device), + command: None, + }, + encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(true)), + }), + _ => Err(ParseError::Syntax), + } + } +} + +/// The default (Drogue V1) encoder, which expects the command inbox pattern. +#[derive(Debug)] +pub struct DefaultCommandTopicEncoder(pub bool); + +impl TopicEncoder for DefaultCommandTopicEncoder { + fn encode_command_topic(&self, command: &Command) -> String { + // if we are forced to report the device part, or the device id is not equal to the + // connected device, then we need to add it. + if self.0 || command.address.gateway_id != command.address.device_id { + format!( + "command/inbox/{}/{}", + command.address.device_id, command.command + ) + } else { + format!("command/inbox//{}", command.command) + } + } +} diff --git a/mqtt-endpoint/src/service/session/dialect/encoder.rs b/mqtt-endpoint/src/service/session/dialect/encoder.rs index f40ef9bd..67a9d7f5 100644 --- a/mqtt-endpoint/src/service/session/dialect/encoder.rs +++ b/mqtt-endpoint/src/service/session/dialect/encoder.rs @@ -32,25 +32,6 @@ pub trait TopicEncoder: Debug { fn encode_command_topic(&self, command: &Command) -> String; } -/// The default (Drogue V1) encoder, which expects the command inbox pattern. -#[derive(Debug)] -pub struct DefaultCommandTopicEncoder(pub bool); - -impl TopicEncoder for DefaultCommandTopicEncoder { - fn encode_command_topic(&self, command: &Command) -> String { - // if we are forced to report the device part, or the device id is not equal to the - // connected device, then we need to add it. - if self.0 || command.address.gateway_id != command.address.device_id { - format!( - "command/inbox/{}/{}", - command.address.device_id, command.command - ) - } else { - format!("command/inbox//{}", command.command) - } - } -} - /// An encoder which uses the plain command name as topic. #[derive(Debug)] pub struct PlainTopicEncoder; diff --git a/mqtt-endpoint/src/service/session/dialect/mod.rs b/mqtt-endpoint/src/service/session/dialect/mod.rs index 03732a24..772f2974 100644 --- a/mqtt-endpoint/src/service/session/dialect/mod.rs +++ b/mqtt-endpoint/src/service/session/dialect/mod.rs @@ -1,21 +1,47 @@ mod az; +mod c8y; +mod drogue; mod encoder; +mod plain; mod wot; +pub use az::*; +pub use c8y::*; +pub use drogue::*; pub use encoder::*; +pub use plain::*; pub use wot::*; use drogue_client::registry::v1::MqttDialect; use drogue_cloud_endpoint_common::command::CommandFilter; +use drogue_cloud_mqtt_common::error::ServerError; +use drogue_cloud_mqtt_common::mqtt::Connect; use drogue_cloud_service_common::Id; -use std::{borrow::Cow, fmt::Debug}; +use std::{borrow::Cow, fmt::Debug, sync::Arc}; use thiserror::Error; -/// A topic parser for the default session. -pub trait DefaultTopicParser { +pub trait ConnectValidator { + fn validate_connect(&self, connect: &Connect) -> Result<(), ServerError>; +} + +/// Reject cleanSession=false +pub struct RejectResumeSession; + +impl ConnectValidator for RejectResumeSession { + fn validate_connect(&self, connect: &Connect) -> Result<(), ServerError> { + match connect.clean_session() { + true => Ok(()), + false => Err(ServerError::UnsupportedOperation), + } + } +} + +pub trait PublishTopicParser { /// Parse a topic from a PUB request fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError>; +} +pub trait SubscribeTopicParser { /// Parse a topic from a SUB request fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError>; } @@ -74,195 +100,98 @@ pub enum DeviceFilter<'a> { ProxiedDevice(&'a str), } -impl DefaultTopicParser for MqttDialect { - fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { - match self { - Self::DrogueV1 => { - // This should mimic the behavior of the current parser - let topic = path.split('/').collect::>(); - log::debug!("Topic: {topic:?}",); - - match topic.as_slice() { - [""] => Err(ParseError::Empty), - [channel] => Ok(ParsedPublishTopic { - channel, - device: None, - properties: vec![], - }), - [channel, as_device] => Ok(ParsedPublishTopic { - channel, - device: Some(as_device), - properties: vec![], - }), - _ => Err(ParseError::Syntax), - } - } - Self::PlainTopic { - device_prefix: false, - } => { - // Plain topic, just take the complete path - match path { - "" => Err(ParseError::Empty), - path => Ok(ParsedPublishTopic { - channel: path, - device: None, - properties: vec![], - }), - } - } - Self::PlainTopic { - device_prefix: true, - } => { - // Plain topic (with device prefix). Strip the device, and then just take the complete path - - let topic = path.split_once('/'); - log::debug!("Topic: {topic:?}",); - - match topic { - // No topic at all - None if path.is_empty() => Err(ParseError::Empty), - None => Err(ParseError::Syntax), - Some(("", path)) => Ok(ParsedPublishTopic { - channel: path, - device: None, - properties: vec![], - }), - Some((device, path)) => Ok(ParsedPublishTopic { - channel: path, - device: Some(device), - properties: vec![], - }), - } - } - Self::WebOfThings { .. } => { - let topic = path.split_once('/'); - log::debug!("Topic: {topic:?}",); - - match topic { - // No topic at all - None if path.is_empty() => Err(ParseError::Empty), - None => Ok(ParsedPublishTopic { - channel: "", - device: Some(path), - properties: vec![], - }), - Some(("", _)) => Err(ParseError::Syntax), - Some((device, path)) => Ok(ParsedPublishTopic { - channel: path, - device: Some(device), - properties: vec![], - }), - } - } - Self::Cumulocity => { - let topic = path.split('/').collect::>(); - log::debug!("C8Y: {topic:?}",); - - match topic.as_slice() { - [""] => Err(ParseError::Empty), - ["s", "us"] => Ok(ParsedPublishTopic { - channel: "c8y", - device: None, - properties: vec![], - }), - ["s", "us", as_device] => Ok(ParsedPublishTopic { - channel: "c8y", - device: Some(as_device), - properties: vec![], - }), - _ => Err(ParseError::Syntax), - } - } - Self::Azure => { - let (channel, properties) = az::split_topic(path); +pub trait DialectBuilder { + fn create(&self) -> Dialect; +} - if channel.is_empty() { - return Err(ParseError::Empty); - } +#[derive(Clone)] +pub struct Dialect { + connect: Arc, + publish: Arc, + subscribe: Arc, +} - log::debug!("Azure: {channel} - properties: {properties:?}"); +impl Dialect { + pub fn new(connect: C, publish: P, subscribe: S) -> Self + where + C: ConnectValidator + 'static, + P: PublishTopicParser + 'static, + S: SubscribeTopicParser + 'static, + { + Self { + connect: Arc::new(connect), + publish: Arc::new(publish), + subscribe: Arc::new(subscribe), + } + } - Ok(ParsedPublishTopic { - channel, - device: None, - properties, - }) - } + /// Parse a topic from a PUB request + pub fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { + self.publish.parse_publish(path) + } + + /// Parse a topic from a SUB request + pub fn parse_subscribe<'a>( + &self, + path: &'a str, + ) -> Result, ParseError> { + self.subscribe.parse_subscribe(path) + } + + pub fn validate_connect(&self, connect: &Connect) -> Result<(), ServerError> { + self.connect.validate_connect(connect) + } +} + +impl From for Dialect +where + CSP: ConnectValidator + PublishTopicParser + SubscribeTopicParser + 'static, +{ + fn from(composite: CSP) -> Self { + let subscribe = Arc::new(composite); + Self { + connect: subscribe.clone(), + publish: subscribe.clone(), + subscribe, + } + } +} + +impl From<(C, SP)> for Dialect +where + C: ConnectValidator + 'static, + SP: PublishTopicParser + SubscribeTopicParser + 'static, +{ + fn from((connect, composite): (C, SP)) -> Self { + let subscribe = Arc::new(composite); + Self { + connect: Arc::new(connect), + publish: subscribe.clone(), + subscribe, } } +} - fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError> { - // TODO: replace .collect() with .as_slice() after "split_as_slice" #96137 +impl DialectBuilder for MqttDialect { + fn create(&self) -> Dialect { match self { - Self::DrogueV1 | Self::PlainTopic { .. } => { - match path.split('/').collect::>().as_slice() { - // subscribe to commands for all proxied devices and ourself - ["command", "inbox", "#"] | ["command", "inbox", "+", "#"] => { - Ok(ParsedSubscribeTopic { - filter: SubscribeFilter { - device: DeviceFilter::Wildcard, - command: None, - }, - encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder( - false, - )), - }) - } - // subscribe to commands directly for us - ["command", "inbox", "", "#"] => Ok(ParsedSubscribeTopic { - filter: SubscribeFilter { - device: DeviceFilter::Device, - command: None, - }, - encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(false)), - }), - // subscribe to commands for a specific device - ["command", "inbox", device, "#"] => Ok(ParsedSubscribeTopic { - filter: SubscribeFilter { - device: DeviceFilter::ProxiedDevice(device), - command: None, - }, - encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(true)), - }), - _ => Err(ParseError::Syntax), - } - } - Self::WebOfThings { node_wot_bug } => match path.split_once('/') { - Some((device, filter)) => Ok(ParsedSubscribeTopic { - filter: SubscribeFilter { - device: DeviceFilter::ProxiedDevice(device), - command: Some(filter), - }, - encoder: SubscriptionTopicEncoder::new(WoTCommandTopicEncoder { - node_wot_bug: *node_wot_bug, - }), - }), - _ => Err(ParseError::Syntax), - }, - Self::Cumulocity => { - log::debug!("c8y: {path}"); - match path.split('/').collect::>().as_slice() { - [] => Err(ParseError::Empty), - ["s", "e"] => Ok(ParsedSubscribeTopic { - filter: SubscribeFilter { - device: DeviceFilter::Device, - command: None, - }, - encoder: SubscriptionTopicEncoder::new(DefaultCommandTopicEncoder(false)), - }), - _ => Err(ParseError::Syntax), - } - } - Self::Azure => { - log::debug!("Azure: {path}"); - Ok(ParsedSubscribeTopic { - filter: SubscribeFilter { - device: DeviceFilter::Device, - command: Some(path), - }, - encoder: SubscriptionTopicEncoder::new(PlainTopicEncoder), - }) - } + Self::DrogueV1 => (RejectResumeSession, DrogueV1).into(), + Self::PlainTopic { device_prefix } => Dialect::new( + RejectResumeSession, + PlainTopic { + device_prefix: *device_prefix, + }, + DrogueV1, + ), + Self::WebOfThings { node_wot_bug } => ( + RejectResumeSession, + WebOfThings { + node_wot_bug: *node_wot_bug, + }, + ) + .into(), + Self::Cumulocity => (RejectResumeSession, Cumulocity).into(), + Self::Azure => Azure.into(), } } } @@ -417,6 +346,6 @@ mod test { } fn assert_parse(spec: &MqttSpec, path: &str, expected: Result) { - assert_eq!(spec.dialect.parse_publish(path), expected); + assert_eq!(spec.dialect.create().parse_publish(path), expected); } } diff --git a/mqtt-endpoint/src/service/session/dialect/plain.rs b/mqtt-endpoint/src/service/session/dialect/plain.rs new file mode 100644 index 00000000..ddd1b137 --- /dev/null +++ b/mqtt-endpoint/src/service/session/dialect/plain.rs @@ -0,0 +1,46 @@ +use super::*; + +/// Plain topic dialect. +pub struct PlainTopic { + pub device_prefix: bool, +} + +impl PublishTopicParser for PlainTopic { + fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { + match self.device_prefix { + true => { + // Plain topic (with device prefix). Strip the device, and then just take the complete path + + let topic = path.split_once('/'); + log::debug!("Topic: {topic:?}",); + + match topic { + // No topic at all + None if path.is_empty() => Err(ParseError::Empty), + None => Err(ParseError::Syntax), + Some(("", path)) => Ok(ParsedPublishTopic { + channel: path, + device: None, + properties: vec![], + }), + Some((device, path)) => Ok(ParsedPublishTopic { + channel: path, + device: Some(device), + properties: vec![], + }), + } + } + false => { + // Plain topic, just take the complete path + match path { + "" => Err(ParseError::Empty), + path => Ok(ParsedPublishTopic { + channel: path, + device: None, + properties: vec![], + }), + } + } + } + } +} diff --git a/mqtt-endpoint/src/service/session/dialect/wot.rs b/mqtt-endpoint/src/service/session/dialect/wot.rs index 9d3f5b43..8a89fda5 100644 --- a/mqtt-endpoint/src/service/session/dialect/wot.rs +++ b/mqtt-endpoint/src/service/session/dialect/wot.rs @@ -1,6 +1,53 @@ +use super::*; + use crate::service::session::dialect::TopicEncoder; use drogue_cloud_endpoint_common::command::Command; +/// Web of Things dialect. +pub struct WebOfThings { + pub node_wot_bug: bool, +} + +impl PublishTopicParser for WebOfThings { + fn parse_publish<'a>(&self, path: &'a str) -> Result, ParseError> { + let topic = path.split_once('/'); + log::debug!("Topic: {topic:?}",); + + match topic { + // No topic at all + None if path.is_empty() => Err(ParseError::Empty), + None => Ok(ParsedPublishTopic { + channel: "", + device: Some(path), + properties: vec![], + }), + Some(("", _)) => Err(ParseError::Syntax), + Some((device, path)) => Ok(ParsedPublishTopic { + channel: path, + device: Some(device), + properties: vec![], + }), + } + } +} + +impl SubscribeTopicParser for WebOfThings { + fn parse_subscribe<'a>(&self, path: &'a str) -> Result, ParseError> { + match path.split_once('/') { + Some((device, filter)) => Ok(ParsedSubscribeTopic { + filter: SubscribeFilter { + device: DeviceFilter::ProxiedDevice(device), + command: Some(filter), + }, + encoder: SubscriptionTopicEncoder::new(WoTCommandTopicEncoder { + node_wot_bug: self.node_wot_bug, + }), + }), + _ => Err(ParseError::Syntax), + } + } +} + #[derive(Debug)] pub struct WoTCommandTopicEncoder { pub node_wot_bug: bool, diff --git a/mqtt-endpoint/src/service/session/mod.rs b/mqtt-endpoint/src/service/session/mod.rs index 739f4549..4cf2c63e 100644 --- a/mqtt-endpoint/src/service/session/mod.rs +++ b/mqtt-endpoint/src/service/session/mod.rs @@ -1,5 +1,5 @@ mod cache; -mod dialect; +pub mod dialect; mod disconnect; mod inbox; @@ -7,9 +7,7 @@ use self::disconnect::*; use crate::{ auth::DeviceAuthenticator, config::EndpointConfig, - service::session::dialect::{ - DefaultTopicParser, ParsedSubscribeTopic, SubscriptionTopicEncoder, - }, + service::session::dialect::{Dialect, ParsedSubscribeTopic, SubscriptionTopicEncoder}, CONNECTIONS_COUNTER, }; use async_trait::async_trait; @@ -57,7 +55,7 @@ pub struct Session { sender: DownstreamSender, application: registry::v1::Application, device: Arc, - dialect: registry::v1::MqttDialect, + dialect: Dialect, commands: Commands, auth: DeviceAuthenticator, sink: Sink, @@ -75,7 +73,7 @@ impl Session { sender: DownstreamSender, sink: Sink, application: registry::v1::Application, - dialect: registry::v1::MqttDialect, + dialect: Dialect, device: registry::v1::Device, commands: Commands, state: State, From 5eb7fab2dd590c5b284171b90e76512cdeab9c1c Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 17:27:53 +0100 Subject: [PATCH 07/10] chore: cleanup imports --- mqtt-endpoint/src/service/app.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mqtt-endpoint/src/service/app.rs b/mqtt-endpoint/src/service/app.rs index dfac3f8e..74d2c4ba 100644 --- a/mqtt-endpoint/src/service/app.rs +++ b/mqtt-endpoint/src/service/app.rs @@ -17,8 +17,7 @@ use drogue_cloud_endpoint_common::{ }; use drogue_cloud_mqtt_common::{ error::ServerError, - mqtt, - mqtt::{AckOptions, Connect, ConnectAck, Service, Sink}, + mqtt::{AckOptions, Connect, ConnectAck, Service}, }; use drogue_cloud_service_api::{ auth::device::authn::Outcome as AuthOutcome, From 259145a64a342325192119a2a9b6ebe1b3c76026 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 17:47:21 +0100 Subject: [PATCH 08/10] refactor: remove "clean session" check as this is done by dialects now --- endpoint-common/src/auth.rs | 6 +----- mqtt-endpoint/src/service/app.rs | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/endpoint-common/src/auth.rs b/endpoint-common/src/auth.rs index 508e6e12..1f8ac909 100644 --- a/endpoint-common/src/auth.rs +++ b/endpoint-common/src/auth.rs @@ -263,11 +263,7 @@ impl DeviceAuthenticator { C: AsRef + Debug, { log::debug!( - "Authenticate MQTT - username: {:?}, password: {:?}, client_id: {:?}, certs: {:?}", - username, - password, - client_id, - certs + "Authenticate MQTT - username: {username:?}, password: {password:?}, client_id: {client_id:?}, certs: {certs:?}, verified_identity: {verified_identity:?}", ); match ( diff --git a/mqtt-endpoint/src/service/app.rs b/mqtt-endpoint/src/service/app.rs index 74d2c4ba..466478d4 100644 --- a/mqtt-endpoint/src/service/app.rs +++ b/mqtt-endpoint/src/service/app.rs @@ -40,7 +40,13 @@ pub struct App { impl App { /// authenticate a client - #[instrument] + #[instrument(skip_all, fields( + username, + has_password = password.is_some(), + client_id, + has_certs = certs.is_some(), + has_verified_identity = verified_identity.is_some(), + ), err)] pub async fn authenticate( &self, username: Option<&str>, @@ -102,16 +108,18 @@ impl App { log::debug!("MQTT dialect: {dialect:?}"); + // validate + let dialect = dialect.create(); + dialect.validate_connect(connect)?; + // prepare let sink = connect.sink(); let lwt = Self::make_lwt(&connect); log::info!("LWT: {lwt:?}"); - dialect.validate_connect(connect)?; - // acquire session let opts = CreateOptions { lwt }; @@ -168,6 +176,7 @@ impl App { } } + #[instrument(skip(self))] async fn lookup_identity(&self, identity: &Identity) -> Option { if let Ok(PreSharedKeyResponse { outcome: @@ -239,10 +248,6 @@ impl Service for App { device, r#as: _, }) => { - if !connect.clean_session() { - return Err(ServerError::UnsupportedOperation); - } - let session = self.create_session(application, device, &connect).await?; Ok(ConnectAck { From 2120e7a797f32cc2575d3e8b36b4869d971a38c8 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 17:48:18 +0100 Subject: [PATCH 09/10] docs: add a note about the state --- mqtt-endpoint/src/service/session/dialect/az.rs | 2 ++ mqtt-endpoint/src/service/session/dialect/c8y.rs | 2 ++ mqtt-endpoint/src/service/session/dialect/wot.rs | 2 ++ 3 files changed, 6 insertions(+) diff --git a/mqtt-endpoint/src/service/session/dialect/az.rs b/mqtt-endpoint/src/service/session/dialect/az.rs index dcce1841..7458348b 100644 --- a/mqtt-endpoint/src/service/session/dialect/az.rs +++ b/mqtt-endpoint/src/service/session/dialect/az.rs @@ -2,6 +2,8 @@ use super::*; use std::borrow::Cow; /// Azure IoT dialect. +/// +/// NOTE: This is experimental. pub struct Azure; impl ConnectValidator for Azure { diff --git a/mqtt-endpoint/src/service/session/dialect/c8y.rs b/mqtt-endpoint/src/service/session/dialect/c8y.rs index 9cd03916..0d11f4f1 100644 --- a/mqtt-endpoint/src/service/session/dialect/c8y.rs +++ b/mqtt-endpoint/src/service/session/dialect/c8y.rs @@ -1,6 +1,8 @@ use super::*; /// Cumulocity dialect. +/// +/// NOTE: This is experimental. pub struct Cumulocity; impl PublishTopicParser for Cumulocity { diff --git a/mqtt-endpoint/src/service/session/dialect/wot.rs b/mqtt-endpoint/src/service/session/dialect/wot.rs index 8a89fda5..5f9e00a3 100644 --- a/mqtt-endpoint/src/service/session/dialect/wot.rs +++ b/mqtt-endpoint/src/service/session/dialect/wot.rs @@ -4,6 +4,8 @@ use crate::service::session::dialect::TopicEncoder; use drogue_cloud_endpoint_common::command::Command; /// Web of Things dialect. +/// +/// NOTE: This is experimental. pub struct WebOfThings { pub node_wot_bug: bool, } From 5be99f74d9f0f33a70a9a5f8edd405264e893759 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 24 Nov 2022 18:00:57 +0100 Subject: [PATCH 10/10] refactor: lower to debug level --- mqtt-endpoint/src/service/app.rs | 35 +++++++++++++++++++------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/mqtt-endpoint/src/service/app.rs b/mqtt-endpoint/src/service/app.rs index 466478d4..eeb33bb3 100644 --- a/mqtt-endpoint/src/service/app.rs +++ b/mqtt-endpoint/src/service/app.rs @@ -117,8 +117,9 @@ impl App { // prepare let sink = connect.sink(); + let lwt = Self::make_lwt(&connect); - log::info!("LWT: {lwt:?}"); + log::debug!("LWT: {lwt:?}"); // acquire session @@ -155,6 +156,7 @@ impl App { )) } + /// Build a LWT from the connect request. fn make_lwt(connect: &Connect<'_>) -> Option { match connect { Connect::V3(handshake) => match &handshake.packet().last_will { @@ -198,19 +200,10 @@ impl App { None } } -} - -#[async_trait(?Send)] -impl Service for App { - #[instrument] - async fn connect<'a>( - &'a self, - mut connect: Connect<'a>, - ) -> Result, ServerError> { - log::info!("new connection: {:?}", connect); - let certs = connect.io().client_certs(); - let verified_identity = if self.disable_psk { + /// Find the (optional) TLS-PSK identity from the underlying I/O system. + async fn find_verified_identity(&self, connect: &mut Connect<'_>) -> Option { + if self.disable_psk { None } else { use ntex_tls::PskIdentity; @@ -230,7 +223,21 @@ impl Service for App { } else { None } - }; + } + } +} + +#[async_trait(?Send)] +impl Service for App { + #[instrument] + async fn connect<'a>( + &'a self, + mut connect: Connect<'a>, + ) -> Result, ServerError> { + log::info!("new connection: {:?}", connect); + + let certs = connect.io().client_certs(); + let verified_identity = self.find_verified_identity(&mut connect).await; let (username, password) = connect.credentials(); match self