From a7aa73dfb1920b7a7058ce8be814a0d29e2d85e8 Mon Sep 17 00:00:00 2001 From: pv42 Date: Fri, 27 Dec 2024 21:59:55 +0100 Subject: [PATCH 1/2] fix: unused_variables warning when compiled without signing feature --- mavlink-core/src/lib.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 2b5626a263..2532b55808 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -157,14 +157,6 @@ pub struct MavFrame { } impl MavFrame { - /// Create a new frame with given message - // pub fn new(msg: MavMessage) -> MavFrame { - // MavFrame { - // header: MavHeader::get_default_header(), - // msg - // } - // } - /// Serialize MavFrame into a vector, so it can be sent over a socket, for example. /// The resulting buffer will start with the sequence field of the Mavlink frame /// and will not include the initial packet marker, length field, and flags. @@ -908,6 +900,7 @@ pub fn read_v2_raw_message_signed( read_v2_raw_message_inner::(reader, signing_data) } +#[allow(unused_variables)] fn read_v2_raw_message_inner( reader: &mut PeekReader, signing_data: Option<&SigningData>, @@ -969,6 +962,7 @@ pub async fn read_v2_raw_message_async` #[cfg(feature = "tokio-1")] +#[allow(unused_variables)] async fn read_v2_raw_message_async_inner( reader: &mut AsyncPeekReader, signing_data: Option<&SigningData>, From 23aef75b13f88102a9c916907f608b079c75452a Mon Sep 17 00:00:00 2001 From: pv42 Date: Sun, 5 Jan 2025 14:21:45 +0100 Subject: [PATCH 2/2] feat: add ConnectionAddress type and Connectable traits --- .../src/async_connection/direct_serial.rs | 62 ++++--- mavlink-core/src/async_connection/file.rs | 14 +- mavlink-core/src/async_connection/mod.rs | 65 ++++---- mavlink-core/src/async_connection/tcp.rs | 36 ++--- mavlink-core/src/async_connection/udp.rs | 71 +++----- mavlink-core/src/connectable.rs | 152 ++++++++++++++++++ mavlink-core/src/connection/direct_serial.rs | 65 +++----- mavlink-core/src/connection/file.rs | 9 ++ mavlink-core/src/connection/mod.rs | 65 +++----- mavlink-core/src/connection/tcp.rs | 35 ++-- mavlink-core/src/connection/udp.rs | 62 ++----- mavlink-core/src/lib.rs | 11 +- mavlink/tests/parse_test.rs | 30 ++++ 13 files changed, 394 insertions(+), 283 deletions(-) create mode 100644 mavlink-core/src/connectable.rs create mode 100644 mavlink/tests/parse_test.rs diff --git a/mavlink-core/src/async_connection/direct_serial.rs b/mavlink-core/src/async_connection/direct_serial.rs index d9b2fe1de2..221edb0dd3 100644 --- a/mavlink-core/src/async_connection/direct_serial.rs +++ b/mavlink-core/src/async_connection/direct_serial.rs @@ -3,10 +3,15 @@ use core::ops::DerefMut; use std::io; +use async_trait::async_trait; use tokio::sync::Mutex; use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream}; -use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use super::AsyncConnectable; +use crate::{ + async_peek_reader::AsyncPeekReader, connectable::SerialConnectable, MavHeader, MavlinkVersion, + Message, +}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg_async, write_versioned_msg_async}; @@ -17,38 +22,6 @@ use crate::{ use super::AsyncMavConnection; -pub fn open(settings: &str) -> io::Result { - let settings_toks: Vec<&str> = settings.split(':').collect(); - if settings_toks.len() < 2 { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Incomplete port settings", - )); - } - - let Ok(baud) = settings_toks[1].parse::() else { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Invalid baud rate", - )); - }; - - let port_name = settings_toks[0]; - let mut port = tokio_serial::new(port_name, baud).open_native_async()?; - port.set_data_bits(tokio_serial::DataBits::Eight)?; - port.set_parity(tokio_serial::Parity::None)?; - port.set_stop_bits(tokio_serial::StopBits::One)?; - port.set_flow_control(tokio_serial::FlowControl::None)?; - - Ok(AsyncSerialConnection { - port: Mutex::new(AsyncPeekReader::new(port)), - sequence: Mutex::new(0), - protocol_version: MavlinkVersion::V2, - #[cfg(feature = "signing")] - signing_data: None, - }) -} - pub struct AsyncSerialConnection { port: Mutex>, sequence: Mutex, @@ -118,3 +91,26 @@ impl AsyncMavConnection for AsyncSerialConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +#[async_trait] +impl AsyncConnectable for SerialConnectable { + async fn connect_async(&self) -> io::Result + Sync + Send>> + where + M: Message + Sync + Send, + { + let mut port = + tokio_serial::new(&self.port_name, self.baud_rate as u32).open_native_async()?; + port.set_data_bits(tokio_serial::DataBits::Eight)?; + port.set_parity(tokio_serial::Parity::None)?; + port.set_stop_bits(tokio_serial::StopBits::One)?; + port.set_flow_control(tokio_serial::FlowControl::None)?; + + Ok(Box::new(AsyncSerialConnection { + port: Mutex::new(AsyncPeekReader::new(port)), + sequence: Mutex::new(0), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + })) + } +} diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs index 5dae56d99e..60b589d334 100644 --- a/mavlink-core/src/async_connection/file.rs +++ b/mavlink-core/src/async_connection/file.rs @@ -2,11 +2,13 @@ use core::ops::DerefMut; -use super::AsyncMavConnection; +use super::{AsyncConnectable, AsyncMavConnection}; +use crate::connectable::FileConnectable; use crate::error::{MessageReadError, MessageWriteError}; use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use async_trait::async_trait; use tokio::fs::File; use tokio::io; use tokio::sync::Mutex; @@ -81,3 +83,13 @@ impl AsyncMavConnection for AsyncFileConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +#[async_trait] +impl AsyncConnectable for FileConnectable { + async fn connect_async(&self) -> io::Result + Sync + Send>> + where + M: Message + Sync + Send, + { + Ok(Box::new(open(&self.address).await?)) + } +} diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index b2eb5c6757..281e255b9a 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -1,6 +1,7 @@ +use async_trait::async_trait; use tokio::io; -use crate::{MavFrame, MavHeader, MavlinkVersion, Message}; +use crate::{connectable::ConnectionAddress, MavFrame, MavHeader, MavlinkVersion, Message}; #[cfg(feature = "tcp")] mod tcp; @@ -81,43 +82,9 @@ pub trait AsyncMavConnection { pub async fn connect_async( address: &str, ) -> io::Result + Sync + Send>> { - let protocol_err = Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )); - - if cfg!(feature = "tcp") && address.starts_with("tcp") { - #[cfg(feature = "tcp")] - { - tcp::select_protocol(address).await - } - #[cfg(not(feature = "tcp"))] - { - protocol_err - } - } else if cfg!(feature = "udp") && address.starts_with("udp") { - #[cfg(feature = "udp")] - { - udp::select_protocol(address).await - } - #[cfg(not(feature = "udp"))] - { - protocol_err - } - } else if cfg!(feature = "direct-serial") && address.starts_with("serial") { - #[cfg(feature = "direct-serial")] - { - Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) - } - #[cfg(not(feature = "direct-serial"))] - { - protocol_err - } - } else if address.starts_with("file") { - Ok(Box::new(file::open(&address["file:".len()..]).await?)) - } else { - protocol_err - } + ConnectionAddress::parse_address(address)? + .connect_async::() + .await } /// Returns the socket address for the given address. @@ -135,3 +102,25 @@ pub(crate) fn get_socket_addr( }; Ok(addr) } + +#[async_trait] +pub trait AsyncConnectable { + async fn connect_async(&self) -> io::Result + Sync + Send>> + where + M: Message + Sync + Send; +} + +#[async_trait] +impl AsyncConnectable for ConnectionAddress { + async fn connect_async(&self) -> io::Result + Sync + Send>> + where + M: Message + Sync + Send, + { + match self { + Self::Tcp(connectable) => connectable.connect_async::().await, + Self::Udp(connectable) => connectable.connect_async::().await, + Self::Serial(connectable) => connectable.connect_async::().await, + Self::File(connectable) => connectable.connect_async::().await, + } + } +} diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs index 4cd1f9ac9a..721785e6e0 100644 --- a/mavlink-core/src/async_connection/tcp.rs +++ b/mavlink-core/src/async_connection/tcp.rs @@ -1,9 +1,11 @@ //! Async TCP MAVLink connection -use super::{get_socket_addr, AsyncMavConnection}; +use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection}; use crate::async_peek_reader::AsyncPeekReader; +use crate::connectable::TcpConnectable; use crate::{MavHeader, MavlinkVersion, Message}; +use async_trait::async_trait; use core::ops::DerefMut; use tokio::io; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; @@ -17,23 +19,6 @@ use crate::{ read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData, }; -pub async fn select_protocol( - address: &str, -) -> io::Result + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("tcpout:") { - tcpout(address).await - } else if let Some(address) = address.strip_prefix("tcpin:") { - tcpin(address).await - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - pub async fn tcpout(address: T) -> io::Result { let addr = get_socket_addr(address)?; @@ -154,3 +139,18 @@ impl AsyncMavConnection for AsyncTcpConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +#[async_trait] +impl AsyncConnectable for TcpConnectable { + async fn connect_async(&self) -> io::Result + Sync + Send>> + where + M: Message + Sync + Send, + { + let conn = if self.is_out { + tcpout(&self.address).await + } else { + tcpin(&self.address).await + }; + Ok(Box::new(conn?)) + } +} diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs index 3f06b83e1d..fa249a1ad2 100644 --- a/mavlink-core/src/async_connection/udp.rs +++ b/mavlink-core/src/async_connection/udp.rs @@ -3,15 +3,20 @@ use core::{ops::DerefMut, task::Poll}; use std::{collections::VecDeque, io::Read, sync::Arc}; +use async_trait::async_trait; use tokio::{ io::{self, AsyncRead, ReadBuf}, net::UdpSocket, sync::Mutex, }; -use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use crate::{ + async_peek_reader::AsyncPeekReader, + connectable::{UdpConnectable, UdpMode}, + MavHeader, MavlinkVersion, Message, +}; -use super::{get_socket_addr, AsyncMavConnection}; +use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg_async, write_versioned_msg_async}; @@ -20,50 +25,6 @@ use crate::{ read_versioned_msg_async_signed, write_versioned_msg_signed, SigningConfig, SigningData, }; -pub async fn select_protocol( - address: &str, -) -> io::Result + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("udpin:") { - udpin(address).await - } else if let Some(address) = address.strip_prefix("udpout:") { - udpout(address).await - } else if let Some(address) = address.strip_prefix("udpbcast:") { - udpbcast(address).await - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - -pub async fn udpbcast(address: T) -> io::Result { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0").await?; - socket - .set_broadcast(true) - .expect("Couldn't bind to broadcast address."); - AsyncUdpConnection::new(socket, false, Some(addr)) -} - -pub async fn udpout(address: T) -> io::Result { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0").await?; - AsyncUdpConnection::new(socket, false, Some(addr)) -} - -pub async fn udpin(address: T) -> io::Result { - let addr = address - .to_socket_addrs() - .unwrap() - .next() - .expect("Invalid address"); - let socket = UdpSocket::bind(addr).await?; - AsyncUdpConnection::new(socket, true, None) -} - struct UdpRead { socket: Arc, buffer: VecDeque, @@ -235,6 +196,24 @@ impl AsyncMavConnection for AsyncUdpConnection { } } +#[async_trait] +impl AsyncConnectable for UdpConnectable { + async fn connect_async(&self) -> io::Result + Sync + Send>> + where + M: Message + Sync + Send, + { + let (addr, server, dest): (&str, _, _) = match self.mode { + UdpMode::Udpin => (&self.address, true, None), + _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)), + }; + let socket = UdpSocket::bind(addr).await?; + if matches!(self.mode, UdpMode::Udpcast) { + socket.set_broadcast(true)?; + } + Ok(Box::new(AsyncUdpConnection::new(socket, server, dest)?)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/mavlink-core/src/connectable.rs b/mavlink-core/src/connectable.rs new file mode 100644 index 0000000000..620139b6f7 --- /dev/null +++ b/mavlink-core/src/connectable.rs @@ -0,0 +1,152 @@ +use core::fmt::Display; +use std::io; + +#[derive(Debug, Clone, Copy)] +pub enum UdpMode { + Udpin, + Udpout, + Udpcast, +} + +#[derive(Debug, Clone)] +pub struct UdpConnectable { + pub(crate) address: String, + pub(crate) mode: UdpMode, +} + +impl UdpConnectable { + pub fn new(address: String, mode: UdpMode) -> Self { + Self { address, mode } + } +} +impl Display for UdpConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mode = match self.mode { + UdpMode::Udpin => "udpin", + UdpMode::Udpout => "udpout", + UdpMode::Udpcast => "udpcast", + }; + write!(f, "{mode}:{}", self.address) + } +} + +#[derive(Debug, Clone)] +pub struct SerialConnectable { + pub(crate) port_name: String, + pub(crate) baud_rate: usize, +} + +impl SerialConnectable { + pub fn new(port_name: String, baud_rate: usize) -> Self { + Self { + port_name, + baud_rate, + } + } +} +impl Display for SerialConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "serial:{}:{}", self.port_name, self.baud_rate) + } +} + +#[derive(Debug, Clone)] +pub struct TcpConnectable { + pub(crate) address: String, + pub(crate) is_out: bool, +} + +impl TcpConnectable { + pub fn new(address: String, is_out: bool) -> Self { + Self { address, is_out } + } +} +impl Display for TcpConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if self.is_out { + write!(f, "tcpout:{}", self.address) + } else { + write!(f, "tcpin:{}", self.address) + } + } +} + +#[derive(Debug, Clone)] +pub struct FileConnectable { + pub(crate) address: String, +} + +impl FileConnectable { + pub fn new(address: String) -> Self { + Self { address } + } +} +impl Display for FileConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "file:{}", self.address) + } +} +pub enum ConnectionAddress { + Tcp(TcpConnectable), + Udp(UdpConnectable), + Serial(SerialConnectable), + File(FileConnectable), +} + +impl Display for ConnectionAddress { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Tcp(connectable) => write!(f, "{connectable}"), + Self::Udp(connectable) => write!(f, "{connectable}"), + Self::Serial(connectable) => write!(f, "{connectable}"), + Self::File(connectable) => write!(f, "{connectable}"), + } + } +} + +impl ConnectionAddress { + pub fn parse_address(address: &str) -> Result { + let (protocol, address) = address.split_once(':').ok_or(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + ))?; + let conn = match protocol { + #[cfg(feature = "direct-serial")] + "serial" => { + let (port_name, baud) = address.split_once(':').ok_or(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Incomplete port settings", + ))?; + Self::Serial(SerialConnectable::new( + port_name.to_string(), + baud.parse().map_err(|_| { + io::Error::new(io::ErrorKind::AddrNotAvailable, "Invalid baud rate") + })?, + )) + } + #[cfg(feature = "tcp")] + "tcpin" | "tcpout" => Self::Tcp(TcpConnectable::new( + address.to_string(), + protocol == "tcpout", + )), + #[cfg(feature = "udp")] + "udpin" | "udpout" | "udpcast" => Self::Udp(UdpConnectable::new( + address.to_string(), + match protocol { + "udpin" => UdpMode::Udpin, + "udpout" => UdpMode::Udpout, + "udpcast" => UdpMode::Udpcast, + _ => unreachable!(), + }, + )), + "file" => Self::File(FileConnectable::new(address.to_string())), + _ => { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + )) + } + }; + Ok(conn) + } +} diff --git a/mavlink-core/src/connection/direct_serial.rs b/mavlink-core/src/connection/direct_serial.rs index 2954b98107..85c771b22d 100644 --- a/mavlink-core/src/connection/direct_serial.rs +++ b/mavlink-core/src/connection/direct_serial.rs @@ -1,5 +1,6 @@ //! Serial MAVLINK connection +use crate::connectable::SerialConnectable; use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{MavHeader, MavlinkVersion, Message}; @@ -15,45 +16,7 @@ use crate::{read_versioned_msg, write_versioned_msg}; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData}; -pub fn open(settings: &str) -> io::Result { - let settings_toks: Vec<&str> = settings.split(':').collect(); - if settings_toks.len() < 2 { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Incomplete port settings", - )); - } - - let Ok(baud) = settings_toks[1] - .parse::() - .map(serial::core::BaudRate::from_speed) - else { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Invalid baud rate", - )); - }; - - let settings = serial::core::PortSettings { - baud_rate: baud, - char_size: serial::Bits8, - parity: serial::ParityNone, - stop_bits: serial::Stop1, - flow_control: serial::FlowNone, - }; - - let port_name = settings_toks[0]; - let mut port = serial::open(port_name)?; - port.configure(&settings)?; - - Ok(SerialConnection { - port: Mutex::new(PeekReader::new(port)), - sequence: Mutex::new(0), - protocol_version: MavlinkVersion::V2, - #[cfg(feature = "signing")] - signing_data: None, - }) -} +use super::Connectable; pub struct SerialConnection { port: Mutex>, @@ -127,3 +90,27 @@ impl MavConnection for SerialConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +impl Connectable for SerialConnectable { + fn connect(&self) -> io::Result + Sync + Send>> { + let baud_rate = serial::core::BaudRate::from_speed(self.baud_rate); + let settings = serial::core::PortSettings { + baud_rate, + char_size: serial::Bits8, + parity: serial::ParityNone, + stop_bits: serial::Stop1, + flow_control: serial::FlowNone, + }; + + let mut port = serial::open(&self.port_name)?; + port.configure(&settings)?; + + Ok(Box::new(SerialConnection { + port: Mutex::new(PeekReader::new(port)), + sequence: Mutex::new(0), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + })) + } +} diff --git a/mavlink-core/src/connection/file.rs b/mavlink-core/src/connection/file.rs index 7258259347..176a4a8be9 100644 --- a/mavlink-core/src/connection/file.rs +++ b/mavlink-core/src/connection/file.rs @@ -1,5 +1,6 @@ //! File MAVLINK connection +use crate::connectable::FileConnectable; use crate::connection::MavConnection; use crate::error::{MessageReadError, MessageWriteError}; use crate::peek_reader::PeekReader; @@ -14,6 +15,8 @@ use crate::read_versioned_msg; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, SigningConfig, SigningData}; +use super::Connectable; + pub fn open(file_path: &str) -> io::Result { let file = File::open(file_path)?; @@ -78,3 +81,9 @@ impl MavConnection for FileConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +impl Connectable for FileConnectable { + fn connect(&self) -> io::Result + Sync + Send>> { + Ok(Box::new(open(&self.address)?)) + } +} diff --git a/mavlink-core/src/connection/mod.rs b/mavlink-core/src/connection/mod.rs index e60386a6aa..5fff34bc70 100644 --- a/mavlink-core/src/connection/mod.rs +++ b/mavlink-core/src/connection/mod.rs @@ -1,5 +1,6 @@ -use crate::{MavFrame, MavHeader, MavlinkVersion, Message}; +use crate::{connectable::ConnectionAddress, MavFrame, MavHeader, MavlinkVersion, Message}; +use core::fmt::Display; use std::io::{self}; #[cfg(feature = "tcp")] @@ -70,52 +71,36 @@ pub trait MavConnection { /// /// The type of the connection is determined at runtime based on the address type, so the /// connection is returned as a trait object. -pub fn connect(address: &str) -> io::Result + Sync + Send>> { - let protocol_err = Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )); - - if cfg!(feature = "tcp") && address.starts_with("tcp") { - #[cfg(feature = "tcp")] - { - tcp::select_protocol(address) - } - #[cfg(not(feature = "tcp"))] - { - protocol_err - } - } else if cfg!(feature = "udp") && address.starts_with("udp") { - #[cfg(feature = "udp")] - { - udp::select_protocol(address) - } - #[cfg(not(feature = "udp"))] - { - protocol_err - } - } else if cfg!(feature = "direct-serial") && address.starts_with("serial:") { - #[cfg(feature = "direct-serial")] - { - Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) - } - #[cfg(not(feature = "direct-serial"))] - { - protocol_err - } - } else if address.starts_with("file") { - Ok(Box::new(file::open(&address["file:".len()..])?)) - } else { - protocol_err - } +pub fn connect( + address: &str, +) -> io::Result + Sync + Send>> { + ConnectionAddress::parse_address(address)?.connect::() } /// Returns the socket address for the given address. pub(crate) fn get_socket_addr( - address: T, + address: &T, ) -> Result { address.to_socket_addrs()?.next().ok_or(io::Error::new( io::ErrorKind::Other, "Host address lookup failed", )) } + +pub trait Connectable: Display { + fn connect(&self) -> io::Result + Sync + Send>>; +} + +impl Connectable for ConnectionAddress { + fn connect(&self) -> std::io::Result + Sync + Send>> + where + M: Message, + { + match self { + Self::Tcp(connectable) => connectable.connect::(), + Self::Udp(connectable) => connectable.connect::(), + Self::Serial(connectable) => connectable.connect::(), + Self::File(connectable) => connectable.connect::(), + } + } +} diff --git a/mavlink-core/src/connection/tcp.rs b/mavlink-core/src/connection/tcp.rs index d362a5664a..3f83c2dc62 100644 --- a/mavlink-core/src/connection/tcp.rs +++ b/mavlink-core/src/connection/tcp.rs @@ -1,5 +1,6 @@ //! TCP MAVLink connection +use crate::connectable::TcpConnectable; use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{MavHeader, MavlinkVersion, Message}; @@ -10,7 +11,7 @@ use std::net::{TcpListener, TcpStream}; use std::sync::Mutex; use std::time::Duration; -use super::get_socket_addr; +use super::{get_socket_addr, Connectable}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg, write_versioned_msg}; @@ -18,25 +19,8 @@ use crate::{read_versioned_msg, write_versioned_msg}; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData}; -pub fn select_protocol( - address: &str, -) -> io::Result + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("tcpout:") { - tcpout(address) - } else if let Some(address) = address.strip_prefix("tcpin:") { - tcpin(address) - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - pub fn tcpout(address: T) -> io::Result { - let addr = get_socket_addr(address)?; + let addr = get_socket_addr(&address)?; let socket = TcpStream::connect(addr)?; socket.set_read_timeout(Some(Duration::from_millis(100)))?; @@ -54,7 +38,7 @@ pub fn tcpout(address: T) -> io::Result { } pub fn tcpin(address: T) -> io::Result { - let addr = get_socket_addr(address)?; + let addr = get_socket_addr(&address)?; let listener = TcpListener::bind(addr)?; //For now we only accept one incoming stream: this blocks until we get one @@ -147,3 +131,14 @@ impl MavConnection for TcpConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +impl Connectable for TcpConnectable { + fn connect(&self) -> io::Result + Sync + Send>> { + let conn = if self.is_out { + tcpout(&self.address) + } else { + tcpin(&self.address) + }; + Ok(Box::new(conn?)) + } +} diff --git a/mavlink-core/src/connection/udp.rs b/mavlink-core/src/connection/udp.rs index 75e5be3c48..25f5662277 100644 --- a/mavlink-core/src/connection/udp.rs +++ b/mavlink-core/src/connection/udp.rs @@ -2,16 +2,16 @@ use std::collections::VecDeque; +use crate::connectable::{UdpConnectable, UdpMode}; use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{MavHeader, MavlinkVersion, Message}; use core::ops::DerefMut; use std::io::{self, Read}; -use std::net::ToSocketAddrs; use std::net::{SocketAddr, UdpSocket}; use std::sync::Mutex; -use super::get_socket_addr; +use super::{get_socket_addr, Connectable}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg, write_versioned_msg}; @@ -19,50 +19,6 @@ use crate::{read_versioned_msg, write_versioned_msg}; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData}; -pub fn select_protocol( - address: &str, -) -> io::Result + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("udpin:") { - udpin(address) - } else if let Some(address) = address.strip_prefix("udpout:") { - udpout(address) - } else if let Some(address) = address.strip_prefix("udpbcast:") { - udpbcast(address) - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - -pub fn udpbcast(address: T) -> io::Result { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket - .set_broadcast(true) - .expect("Couldn't bind to broadcast address."); - UdpConnection::new(socket, false, Some(addr)) -} - -pub fn udpout(address: T) -> io::Result { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0")?; - UdpConnection::new(socket, false, Some(addr)) -} - -pub fn udpin(address: T) -> io::Result { - let addr = address - .to_socket_addrs() - .unwrap() - .next() - .expect("Invalid address"); - let socket = UdpSocket::bind(addr)?; - UdpConnection::new(socket, true, None) -} - struct UdpRead { socket: UdpSocket, buffer: VecDeque, @@ -192,6 +148,20 @@ impl MavConnection for UdpConnection { } } +impl Connectable for UdpConnectable { + fn connect(&self) -> io::Result + Sync + Send>> { + let (addr, server, dest): (&str, _, _) = match self.mode { + UdpMode::Udpin => (&self.address, true, None), + _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)), + }; + let socket = UdpSocket::bind(addr)?; + if matches!(self.mode, UdpMode::Udpcast) { + socket.set_broadcast(true)?; + } + Ok(Box::new(UdpConnection::new(socket, server, dest)?)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 2532b55808..76cb13ec92 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -48,12 +48,12 @@ pub mod bytes_mut; mod connection; pub mod error; #[cfg(feature = "std")] -pub use self::connection::{connect, MavConnection}; +pub use self::connection::{connect, Connectable, MavConnection}; #[cfg(feature = "tokio-1")] mod async_connection; #[cfg(feature = "tokio-1")] -pub use self::async_connection::{connect_async, AsyncMavConnection}; +pub use self::async_connection::{connect_async, AsyncConnectable, AsyncMavConnection}; #[cfg(feature = "tokio-1")] pub mod async_peek_reader; @@ -74,6 +74,13 @@ pub use self::signing::{SigningConfig, SigningData}; #[cfg(feature = "signing")] use sha2::{Digest, Sha256}; +#[cfg(any(feature = "std", feature = "tokio-1"))] +mod connectable; +#[cfg(any(feature = "std", feature = "tokio-1"))] +pub use connectable::{ + ConnectionAddress, FileConnectable, SerialConnectable, TcpConnectable, UdpConnectable, +}; + pub const MAX_FRAME_SIZE: usize = 280; pub trait Message diff --git a/mavlink/tests/parse_test.rs b/mavlink/tests/parse_test.rs new file mode 100644 index 0000000000..cfa8a3054c --- /dev/null +++ b/mavlink/tests/parse_test.rs @@ -0,0 +1,30 @@ +mod parse_tests { + use mavlink::ConnectionAddress; + + fn assert_parse(addr: &str) { + assert_eq!( + format!("{}", ConnectionAddress::parse_address(addr).unwrap()), + addr + ); + } + + #[test] + fn test_parse() { + assert_parse("tcpin:example.com:99"); + assert_parse("tcpout:127.0.0.1:14549"); + assert_parse("file:/mnt/12_44-mav.bin"); + assert_parse("file:C:\\mav_logs\\test.bin"); + assert_parse("udpcast:[::1]:4567"); + assert_parse("udpin:[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443"); + assert_parse("udpout:1.1.1.1:1"); + assert_parse("serial:/dev/ttyUSB0:9600"); + assert_parse("serial:COM0:115200"); + + assert!(ConnectionAddress::parse_address("serial:/dev/ttyUSB0").is_err()); + assert!(ConnectionAddress::parse_address("updout:1.1.1.1:1").is_err()); + assert!(ConnectionAddress::parse_address("tcp:127.0.0.1:14540").is_err()); + assert!(ConnectionAddress::parse_address("tcpin127.0.0.1:14540").is_err()); + assert!(ConnectionAddress::parse_address(" udpout:1.1.1.1:1 ").is_err()); + assert!(ConnectionAddress::parse_address(":udpcast:[::1]:4567").is_err()); + } +}