diff --git a/moqt-server/src/modules/message_handlers/object_datagram.rs b/moqt-server/src/modules/message_handlers/object_datagram.rs index 70cc1e0..0a3614d 100644 --- a/moqt-server/src/modules/message_handlers/object_datagram.rs +++ b/moqt-server/src/modules/message_handlers/object_datagram.rs @@ -102,3 +102,107 @@ pub(crate) async fn try_read_object( ), } } + +#[cfg(test)] +mod tests { + mod success { + use crate::modules::{ + message_handlers::object_datagram::{try_read_object, ObjectDatagramProcessResult}, + moqt_client::{MOQTClient, MOQTClientStatus}, + server_processes::senders, + }; + use bytes::BytesMut; + use moqt_core::{ + data_stream_type::DataStreamType, + messages::data_streams::{object_datagram::ObjectDatagram, DataStreams}, + variable_integer::write_variable_integer, + }; + use std::{io::Cursor, sync::Arc}; + use tokio::sync::Mutex; + + #[tokio::test] + async fn datagram_object_success() { + let data_stream_type = DataStreamType::ObjectDatagram; + let bytes_array = [ + 0, // Subscribe ID (i) + 1, // Track Alias (i) + 2, // Group ID (i) + 3, // Object ID (i) + 4, // Subscriber Priority (8) + 3, // Object Payload Length (i) + 0, 1, 2, // Object Payload (..) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_object(&mut buf, client).await; + + let mut buf_without_type = BytesMut::with_capacity(bytes_array.len()); + buf_without_type.extend_from_slice(&bytes_array); + let mut read_cur = Cursor::new(&buf_without_type[..]); + let object = ObjectDatagram::depacketize(&mut read_cur).unwrap(); + + assert_eq!(result, ObjectDatagramProcessResult::Success(object)); + } + + #[tokio::test] + async fn datagram_object_continue_insufficient_payload() { + let data_stream_type = DataStreamType::ObjectDatagram; + let bytes_array = [ + 0, // Subscribe ID (i) + 1, // Track Alias (i) + 2, // Group ID (i) + 3, // Object ID (i) + 4, // Subscriber Priority (8) + 50, // Object Payload Length (i) + 0, 1, 2, // Object Payload (..) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_object(&mut buf, client).await; + + assert_eq!(result, ObjectDatagramProcessResult::Continue); + } + + #[tokio::test] + async fn datagram_object_continue_incomplete_message() { + let data_stream_type = DataStreamType::ObjectDatagram; + let bytes_array = [ + 0, // Subscribe ID (i) + 1, // Track Alias (i) + 2, // Group ID (i) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_object(&mut buf, client).await; + + assert_eq!(result, ObjectDatagramProcessResult::Continue); + } + } +} diff --git a/moqt-server/src/modules/message_handlers/object_stream.rs b/moqt-server/src/modules/message_handlers/object_stream.rs index b976f6a..5f5fa0c 100644 --- a/moqt-server/src/modules/message_handlers/object_stream.rs +++ b/moqt-server/src/modules/message_handlers/object_stream.rs @@ -79,3 +79,130 @@ pub async fn try_read_object( ), } } + +#[cfg(test)] +mod tests { + mod success { + use crate::modules::message_handlers::object_stream::{ + try_read_object, ObjectStreamProcessResult, StreamObject, + }; + use bytes::BytesMut; + use moqt_core::{ + data_stream_type::DataStreamType, + messages::data_streams::{ + object_stream_subgroup::ObjectStreamSubgroup, + object_stream_track::ObjectStreamTrack, DataStreams, + }, + }; + use std::io::Cursor; + + #[tokio::test] + async fn stream_object_track_success() { + let data_stream_type = DataStreamType::StreamHeaderTrack; + let bytes_array = [ + 0, // Group ID (i) + 1, // Object ID (i) + 3, // Object Payload Length (i) + 0, 1, 2, // Object Payload (..) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len()); + buf.extend_from_slice(&bytes_array); + let buf_clone = buf.clone(); + + let result = try_read_object(&mut buf, data_stream_type).await; + + let mut read_cur = Cursor::new(&buf_clone[..]); + let object = ObjectStreamTrack::depacketize(&mut read_cur).unwrap(); + + assert_eq!( + result, + ObjectStreamProcessResult::Success(StreamObject::Track(object)) + ); + } + + #[tokio::test] + async fn stream_object_subgroup_success() { + let data_stream_type = DataStreamType::StreamHeaderSubgroup; + let bytes_array = [ + 0, // Object ID (i) + 3, // Object Payload Length (i) + 0, 1, 2, // Object Payload (..) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len()); + buf.extend_from_slice(&bytes_array); + let buf_clone = buf.clone(); + + let result = try_read_object(&mut buf, data_stream_type).await; + + let mut read_cur = Cursor::new(&buf_clone[..]); + let object = ObjectStreamSubgroup::depacketize(&mut read_cur).unwrap(); + + assert_eq!( + result, + ObjectStreamProcessResult::Success(StreamObject::Subgroup(object)) + ); + } + + #[tokio::test] + async fn stream_object_track_continue_insufficient_payload() { + let data_stream_type = DataStreamType::StreamHeaderTrack; + let bytes_array = [ + 0, // Group ID (i) + 1, // Object ID (i) + 50, // Object Payload Length (i) + 0, 1, 2, // Object Payload (..) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len()); + buf.extend_from_slice(&bytes_array); + + let result = try_read_object(&mut buf, data_stream_type).await; + + assert_eq!(result, ObjectStreamProcessResult::Continue); + } + + #[tokio::test] + async fn stream_object_subgroup_continue_insufficient_payload() { + let data_stream_type = DataStreamType::StreamHeaderSubgroup; + let bytes_array = [ + 0, // Object ID (i) + 50, // Object Payload Length (i) + 0, 1, 2, // Object Payload (..) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len()); + buf.extend_from_slice(&bytes_array); + + let result = try_read_object(&mut buf, data_stream_type).await; + + assert_eq!(result, ObjectStreamProcessResult::Continue); + } + + #[tokio::test] + async fn stream_object_track_continue_incomplete_message() { + let data_stream_type = DataStreamType::StreamHeaderTrack; + let bytes_array = [ + 0, // Group ID (i) + 1, // Object ID (i) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len()); + buf.extend_from_slice(&bytes_array); + + let result = try_read_object(&mut buf, data_stream_type).await; + + assert_eq!(result, ObjectStreamProcessResult::Continue); + } + + #[tokio::test] + async fn stream_object_subgroup_continue_incomplete_message() { + let data_stream_type = DataStreamType::StreamHeaderSubgroup; + let bytes_array = [ + 0, // Object ID (i) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len()); + buf.extend_from_slice(&bytes_array); + + let result = try_read_object(&mut buf, data_stream_type).await; + + assert_eq!(result, ObjectStreamProcessResult::Continue); + } + } +} diff --git a/moqt-server/src/modules/message_handlers/stream_header.rs b/moqt-server/src/modules/message_handlers/stream_header.rs index fde4661..849e8db 100644 --- a/moqt-server/src/modules/message_handlers/stream_header.rs +++ b/moqt-server/src/modules/message_handlers/stream_header.rs @@ -104,10 +104,7 @@ pub async fn try_read_header( tracing::warn!("{:#?}", err); // Reset the cursor position because data for the header has not yet arrived buf.advance(read_cur.position() as usize); - StreamHeaderProcessResult::Failure( - TerminationErrorCode::InternalError, - err.to_string(), - ) + StreamHeaderProcessResult::Continue } } } @@ -124,10 +121,7 @@ pub async fn try_read_header( tracing::warn!("{:#?}", err); // Reset the cursor position because data for the header has not yet arrived buf.advance(read_cur.position() as usize); - StreamHeaderProcessResult::Failure( - TerminationErrorCode::InternalError, - err.to_string(), - ) + StreamHeaderProcessResult::Continue } } } @@ -137,3 +131,138 @@ pub async fn try_read_header( ), } } + +#[cfg(test)] +mod tests { + mod success { + use crate::modules::{ + message_handlers::stream_header::{ + try_read_header, StreamHeader, StreamHeaderProcessResult, + }, + moqt_client::{MOQTClient, MOQTClientStatus}, + server_processes::senders, + }; + use bytes::BytesMut; + use moqt_core::{ + data_stream_type::DataStreamType, + messages::data_streams::{ + stream_header_subgroup::StreamHeaderSubgroup, + stream_header_track::StreamHeaderTrack, DataStreams, + }, + variable_integer::write_variable_integer, + }; + use std::{io::Cursor, sync::Arc}; + use tokio::sync::Mutex; + + #[tokio::test] + async fn stream_header_track_success() { + let data_stream_type = DataStreamType::StreamHeaderTrack; + let bytes_array = [ + 0, // Subscribe ID (i) + 1, // Track Alias (i) + 2, // Subscriber Priority (8) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_header(&mut buf, client).await; + + let mut buf_without_type = BytesMut::with_capacity(bytes_array.len()); + buf_without_type.extend_from_slice(&bytes_array); + let mut read_cur = Cursor::new(&buf_without_type[..]); + let header = StreamHeaderTrack::depacketize(&mut read_cur).unwrap(); + + assert_eq!( + result, + StreamHeaderProcessResult::Success(StreamHeader::Track(header)) + ); + } + + #[tokio::test] + async fn stream_header_subgroup_success() { + let data_stream_type = DataStreamType::StreamHeaderSubgroup; + let bytes_array = [ + 0, // Subscribe ID (i) + 1, // Track Alias (i) + 2, // Group ID (i) + 3, // Subgroup ID (i) + 4, // Subscriber Priority (8) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_header(&mut buf, client).await; + + let mut buf_without_type = BytesMut::with_capacity(bytes_array.len()); + buf_without_type.extend_from_slice(&bytes_array); + let mut read_cur = Cursor::new(&buf_without_type[..]); + let header = StreamHeaderSubgroup::depacketize(&mut read_cur).unwrap(); + + assert_eq!( + result, + StreamHeaderProcessResult::Success(StreamHeader::Subgroup(header)) + ); + } + + #[tokio::test] + async fn stream_header_track_continue_incomplete_message() { + let data_stream_type = DataStreamType::StreamHeaderTrack; + let bytes_array = [ + 0, // Group ID (i) + 1, // Object ID (i) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_header(&mut buf, client).await; + + assert_eq!(result, StreamHeaderProcessResult::Continue); + } + + #[tokio::test] + async fn stream_header_subgroup_continue_incomplete_message() { + let data_stream_type = DataStreamType::StreamHeaderSubgroup; + let bytes_array = [ + 0, // Object ID (i) + ]; + let mut buf = BytesMut::with_capacity(bytes_array.len() + 8); + buf.extend(write_variable_integer(data_stream_type as u64)); + buf.extend_from_slice(&bytes_array); + + let senders_mock = senders::test_helper_fn::create_senders_mock(); + let upstream_session_id = 0; + + let mut client = MOQTClient::new(upstream_session_id, senders_mock); + client.update_status(MOQTClientStatus::SetUp); + let client = Arc::new(Mutex::new(client)); + + let result = try_read_header(&mut buf, client).await; + + assert_eq!(result, StreamHeaderProcessResult::Continue); + } + } +}