diff --git a/src/connections/handlers.rs b/src/connections/handlers.rs index ca5a2c3..494eb80 100644 --- a/src/connections/handlers.rs +++ b/src/connections/handlers.rs @@ -1,14 +1,12 @@ -use std::sync::Arc; - use crate::errors_internal::{Error, InternalChannelError, InternalStreamError}; use crate::protobufs; use crate::types::EncodedToRadioPacketWithHeader; +use crate::utils::format_data_packet; use log::{debug, error, trace}; use prost::Message; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::spawn; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -90,7 +88,7 @@ where pub fn spawn_write_handler( cancellation_token: CancellationToken, - write_stream: Arc>, + write_stream: W, write_input_rx: tokio::sync::mpsc::UnboundedReceiver, ) -> JoinHandle> where @@ -116,7 +114,7 @@ where async fn start_write_handler( _cancellation_token: CancellationToken, - write_stream: Arc>, + mut write_stream: W, mut write_input_rx: tokio::sync::mpsc::UnboundedReceiver, ) -> Result<(), Error> where @@ -127,8 +125,6 @@ where while let Some(message) = write_input_rx.recv().await { trace!("Writing packet data: {:?}", message); - let mut write_stream = write_stream.lock().await; - if let Err(e) = write_stream.write(message.data()).await { error!("Error writing to stream: {:?}", e); return Err(Error::InternalStreamError( @@ -181,14 +177,11 @@ async fn start_processing_handler( trace!("Processing read_output_rx channel closed"); } -pub fn spawn_heartbeat_handler( +pub fn spawn_heartbeat_handler( cancellation_token: CancellationToken, - write_stream: Arc>, -) -> JoinHandle> -where - W: AsyncWriteExt + Send + Unpin + 'static, -{ - let handle = start_heartbeat_handler(cancellation_token.clone(), write_stream); + write_input_tx: UnboundedSender, +) -> JoinHandle> { + let handle = start_heartbeat_handler(cancellation_token.clone(), write_input_tx); spawn(async move { tokio::select! { @@ -206,21 +199,20 @@ where }) } -async fn start_heartbeat_handler( +async fn start_heartbeat_handler( _cancellation_token: CancellationToken, - write_stream: Arc>, -) -> Result<(), Error> -where - W: AsyncWriteExt + Send + Unpin + 'static, -{ + write_input_tx: UnboundedSender, +) -> Result<(), Error> { debug!("Started heartbeat handler"); loop { tokio::time::sleep(std::time::Duration::from_secs(CLIENT_HEARTBEAT_INTERVAL)).await; - let mut write_stream = write_stream.lock().await; - - let heartbeat_packet = protobufs::Heartbeat::default(); + let heartbeat_packet = protobufs::ToRadio { + payload_variant: Some(protobufs::to_radio::PayloadVariant::Heartbeat( + protobufs::Heartbeat::default(), + )), + }; let mut buffer = Vec::new(); match heartbeat_packet.encode(&mut buffer) { @@ -231,7 +223,17 @@ where } }; - if let Err(e) = write_stream.write(&buffer).await { + let packet_with_header = match format_data_packet(buffer.into()) { + Ok(p) => p, + Err(e) => { + error!("Error formatting heartbeat packet: {:?}", e); + continue; + } + }; + + trace!("Sending heartbeat packet"); + + if let Err(e) = write_input_tx.send(packet_with_header) { error!("Error writing heartbeat packet to stream: {:?}", e); return Err(Error::InternalStreamError( InternalStreamError::StreamWriteError { diff --git a/src/connections/stream_api.rs b/src/connections/stream_api.rs index 8f84481..6a5f79c 100644 --- a/src/connections/stream_api.rs +++ b/src/connections/stream_api.rs @@ -1,10 +1,10 @@ use futures_util::future::join3; use log::trace; use prost::Message; -use std::{fmt::Display, marker::PhantomData, sync::Arc}; +use std::{fmt::Display, marker::PhantomData}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - sync::{mpsc::UnboundedSender, Mutex}, + sync::mpsc::UnboundedSender, task::JoinHandle, }; use tokio_util::sync::CancellationToken; @@ -438,16 +438,11 @@ impl StreamApi { let (read_stream, write_stream) = tokio::io::split(stream_handle.stream); let cancellation_token = CancellationToken::new(); - let write_stream_mutex = Arc::new(Mutex::new(write_stream)); - let read_handle = handlers::spawn_read_handler(cancellation_token.clone(), read_stream, read_output_tx); - let write_handle = handlers::spawn_write_handler( - cancellation_token.clone(), - write_stream_mutex.clone(), - write_input_rx, - ); + let write_handle = + handlers::spawn_write_handler(cancellation_token.clone(), write_stream, write_input_rx); let processing_handle = handlers::spawn_processing_handler( cancellation_token.clone(), @@ -456,7 +451,7 @@ impl StreamApi { ); let heartbeat_handle = - handlers::spawn_heartbeat_handler(cancellation_token.clone(), write_stream_mutex); + handlers::spawn_heartbeat_handler(cancellation_token.clone(), write_input_tx.clone()); // Persist channels and kill switch to struct diff --git a/src/protobufs b/src/protobufs index 6942175..b2b145e 160000 --- a/src/protobufs +++ b/src/protobufs @@ -1 +1 @@ -Subproject commit 69421753948f2cb0e3eedabaef3a3ca9a286ce77 +Subproject commit b2b145e3321beab1441fa59290137ab42eb38dc8