From fc6553817e009e42c1600e4f9b17ad110b74cb3f Mon Sep 17 00:00:00 2001 From: Cyril Fougeray Date: Thu, 2 May 2024 14:51:18 +0200 Subject: [PATCH] mcu-util: refactor blocking call to can-rs in async and some tiny refacto --- mcu-util/src/messaging/can/canfd.rs | 56 +++++++++++++--------------- mcu-util/src/messaging/can/isotp.rs | 58 +++++++++++++++-------------- 2 files changed, 56 insertions(+), 58 deletions(-) diff --git a/mcu-util/src/messaging/can/canfd.rs b/mcu-util/src/messaging/can/canfd.rs index eda2aa89..cc8a3c9b 100644 --- a/mcu-util/src/messaging/can/canfd.rs +++ b/mcu-util/src/messaging/can/canfd.rs @@ -1,18 +1,16 @@ -use std::process; -use std::sync::atomic::{AtomicU16, Ordering}; -use std::sync::mpsc; - use async_trait::async_trait; +use can_rs::filter::Filter; +use can_rs::stream::FrameStream; +use can_rs::{Frame, Id, CANFD_DATA_LEN}; use eyre::{eyre, Context, Result}; use orb_messages::CommonAckError; use prost::Message; +use std::process; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::{mpsc, Arc}; use tokio::time::Duration; use tracing::debug; -use can_rs::filter::Filter; -use can_rs::stream::FrameStream; -use can_rs::{Frame, Id}; - use crate::messaging::Device::{JetsonFromMain, JetsonFromSecurity, Main, Security}; use crate::messaging::{ handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload, @@ -20,7 +18,7 @@ use crate::messaging::{ }; pub struct CanRawMessaging { - stream: FrameStream<64>, + stream: FrameStream, ack_num_lsb: AtomicU16, ack_queue: mpsc::Receiver<(CommonAckError, u32)>, can_node: Device, @@ -35,7 +33,7 @@ impl CanRawMessaging { new_message_queue: mpsc::Sender, ) -> Result { // open socket - let stream = FrameStream::<64>::build() + let stream = FrameStream::::build() .nonblocking(false) .filters(vec![ Filter { @@ -47,13 +45,14 @@ impl CanRawMessaging { mask: 0xff, }, ]) - .bind(bus.as_str().parse().unwrap())?; + .bind(bus.as_str().parse().unwrap()) + .wrap_err("Failed to bind CAN stream")?; let (ack_tx, ack_rx) = mpsc::channel(); - - // spawn CAN receiver let stream_copy = stream.try_clone()?; - tokio::task::spawn(can_rx(stream_copy, can_node, ack_tx, new_message_queue)); + tokio::task::spawn_blocking(move || { + can_rx(stream_copy, can_node, ack_tx, new_message_queue) + }); Ok(Self { stream, @@ -78,9 +77,12 @@ impl CanRawMessaging { } } - #[allow(dead_code)] - async fn send_wait_ack(&mut self, frame: &Frame<64>) -> Result { - self.stream.send(frame, 0)?; + async fn send_wait_ack( + &mut self, + frame: Arc>, + ) -> Result { + let stream = self.stream.try_clone()?; + tokio::task::spawn_blocking(move || stream.send(&frame, 0)).await??; // put some randomness into ack number to prevent collision with other processes let expected_ack_number = @@ -94,15 +96,14 @@ impl CanRawMessaging { /// Receive CAN frames /// - relay acks to `ack_tx` /// - relay new McuMessage to `new_message_queue` -#[allow(dead_code)] -async fn can_rx( - stream: FrameStream<64>, +fn can_rx( + stream: FrameStream, remote_node: Device, ack_tx: mpsc::Sender<(CommonAckError, u32)>, new_message_queue: mpsc::Sender, ) -> Result<()> { loop { - let mut frame: Frame<64> = Frame::empty(); + let mut frame: Frame = Frame::empty(); loop { match stream.recv(&mut frame, 0) { Ok(_) => { @@ -146,7 +147,6 @@ async fn can_rx( #[async_trait] impl MessagingInterface for CanRawMessaging { /// Send payload into McuMessage - #[allow(dead_code)] async fn send(&mut self, payload: McuPayload) -> Result { // snowflake ack ID to avoid collisions: // prefix ack number with process ID @@ -201,26 +201,20 @@ impl MessagingInterface for CanRawMessaging { }; if let Some(bytes) = bytes { - let mut buf: [u8; 64] = [0u8; 64]; + let mut buf: [u8; CANFD_DATA_LEN] = [0u8; CANFD_DATA_LEN]; buf[..bytes.len()].copy_from_slice(bytes.as_slice()); let node_addr = self.can_node as u32; let frame = Frame { id: Id::Extended(node_addr), - len: 64, + len: CANFD_DATA_LEN as u8, flags: 0x0F, data: buf, }; - self.send_wait_ack(&frame).await + self.send_wait_ack(Arc::new(frame)).await } else { Err(eyre!("Failed to encode payload")) } } } - -impl Drop for CanRawMessaging { - fn drop(&mut self) { - // TODO - } -} diff --git a/mcu-util/src/messaging/can/isotp.rs b/mcu-util/src/messaging/can/isotp.rs index 0eb3bc86..44ce265e 100644 --- a/mcu-util/src/messaging/can/isotp.rs +++ b/mcu-util/src/messaging/can/isotp.rs @@ -1,18 +1,17 @@ -use std::io::{Read, Write}; -use std::process; -use std::sync::atomic::{AtomicU16, Ordering}; -use std::sync::mpsc; - use async_trait::async_trait; use eyre::{eyre, Context, Result}; use orb_messages::CommonAckError; use prost::Message; +use std::io::{Read, Write}; +use std::process; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::{mpsc, Arc}; use tokio::time::Duration; use tracing::{debug, error}; use can_rs::isotp::addr::CanIsotpAddr; use can_rs::isotp::stream::IsotpStream; -use can_rs::Id; +use can_rs::{Id, CAN_DATA_LEN}; use crate::messaging::{ handle_main_mcu_message, handle_sec_mcu_message, McuPayload, MessagingInterface, @@ -69,7 +68,7 @@ impl From for IsoTpNodeIdentifier { } pub struct CanIsoTpMessaging { - tx_stream: IsotpStream<8>, + stream: IsotpStream, ack_num_lsb: AtomicU16, ack_queue: mpsc::Receiver<(CommonAckError, u32)>, } @@ -103,23 +102,27 @@ impl CanIsoTpMessaging { let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?; debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst); - let (ack_tx, ack_rx) = mpsc::channel(); - // open TX stream - let tx_isotp_stream = IsotpStream::<8>::build().bind( - CanIsotpAddr::new( - bus.as_str(), - Id::Standard(tx_stdid_dst), - Id::Standard(tx_stdid_src), + let tx_isotp_stream = IsotpStream::::build() + .bind( + CanIsotpAddr::new( + bus.as_str(), + Id::Standard(tx_stdid_dst), + Id::Standard(tx_stdid_src), + ) + .expect("Unable to build IsoTpStream"), ) - .expect("Unable to build IsoTpStream"), - )?; + .wrap_err("Failed to bind CAN ISO-TP stream")?; + + let (ack_tx, ack_rx) = mpsc::channel(); // spawn CAN receiver - tokio::task::spawn(can_rx(bus, remote, local, ack_tx, new_message_queue)); + tokio::task::spawn_blocking(move || { + can_rx(bus, remote, local, ack_tx, new_message_queue) + }); Ok(CanIsoTpMessaging { - tx_stream: tx_isotp_stream, + stream: tx_isotp_stream, ack_num_lsb: AtomicU16::new(0), ack_queue: ack_rx, }) @@ -140,13 +143,14 @@ impl CanIsoTpMessaging { } } - async fn send_wait_ack(&mut self, frame: &[u8]) -> Result { - match self.tx_stream.write(frame) { - Ok(_) => {} - Err(err) => { - error!("Error writing stream: {}", err); + async fn send_wait_ack(&mut self, frame: Arc>) -> Result { + let mut stream = self.stream.try_clone()?; + tokio::task::spawn_blocking(move || { + if let Err(e) = stream.write(frame.as_slice()) { + error!("Error writing stream: {}", e); } - } + }) + .await?; let expected_ack_number = process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32; @@ -159,7 +163,7 @@ impl CanIsoTpMessaging { /// Receive CAN frames /// - relay acks to `ack_tx` /// - relay new McuMessage to `new_message_queue` -async fn can_rx( +fn can_rx( bus: String, remote: IsoTpNodeIdentifier, local: IsoTpNodeIdentifier, @@ -170,7 +174,7 @@ async fn can_rx( let (rx_stdid_src, rx_stdid_dest) = create_pair(remote, local)?; debug!("Listening on 0x{:x}->0x{:x}", rx_stdid_src, rx_stdid_dest); - let mut rx_isotp_stream = IsotpStream::<8>::build().bind( + let mut rx_isotp_stream = IsotpStream::::build().bind( CanIsotpAddr::new( bus.as_str(), Id::Standard(rx_stdid_src), @@ -256,6 +260,6 @@ impl MessagingInterface for CanIsoTpMessaging { _ => return Err(eyre!("Invalid payload")), }; - self.send_wait_ack(bytes.as_slice()).await + self.send_wait_ack(Arc::new(bytes)).await } }