Skip to content

Commit

Permalink
Add tests to object handlers (#150)
Browse files Browse the repository at this point in the history
* add test

* Revert "add test"

This reverts commit 8bc69f5.

* add tests

* fix process result value
  • Loading branch information
tetter27 authored Jan 14, 2025
1 parent afc0db6 commit edfdf42
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 8 deletions.
104 changes: 104 additions & 0 deletions moqt-server/src/modules/message_handlers/object_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,107 @@ pub(crate) async fn try_read_object(
),
}
}

#[cfg(test)]
mod tests {
mod success {
use crate::modules::{
message_handlers::object_datagram::{try_read_object, ObjectDatagramProcessResult},
moqt_client::{MOQTClient, MOQTClientStatus},
server_processes::senders,
};
use bytes::BytesMut;
use moqt_core::{
data_stream_type::DataStreamType,
messages::data_streams::{object_datagram::ObjectDatagram, DataStreams},
variable_integer::write_variable_integer,
};
use std::{io::Cursor, sync::Arc};
use tokio::sync::Mutex;

#[tokio::test]
async fn datagram_object_success() {
let data_stream_type = DataStreamType::ObjectDatagram;
let bytes_array = [
0, // Subscribe ID (i)
1, // Track Alias (i)
2, // Group ID (i)
3, // Object ID (i)
4, // Subscriber Priority (8)
3, // Object Payload Length (i)
0, 1, 2, // Object Payload (..)
];
let mut buf = BytesMut::with_capacity(bytes_array.len() + 8);
buf.extend(write_variable_integer(data_stream_type as u64));
buf.extend_from_slice(&bytes_array);

let senders_mock = senders::test_helper_fn::create_senders_mock();
let upstream_session_id = 0;

let mut client = MOQTClient::new(upstream_session_id, senders_mock);
client.update_status(MOQTClientStatus::SetUp);
let client = Arc::new(Mutex::new(client));

let result = try_read_object(&mut buf, client).await;

let mut buf_without_type = BytesMut::with_capacity(bytes_array.len());
buf_without_type.extend_from_slice(&bytes_array);
let mut read_cur = Cursor::new(&buf_without_type[..]);
let object = ObjectDatagram::depacketize(&mut read_cur).unwrap();

assert_eq!(result, ObjectDatagramProcessResult::Success(object));
}

#[tokio::test]
async fn datagram_object_continue_insufficient_payload() {
let data_stream_type = DataStreamType::ObjectDatagram;
let bytes_array = [
0, // Subscribe ID (i)
1, // Track Alias (i)
2, // Group ID (i)
3, // Object ID (i)
4, // Subscriber Priority (8)
50, // Object Payload Length (i)
0, 1, 2, // Object Payload (..)
];
let mut buf = BytesMut::with_capacity(bytes_array.len() + 8);
buf.extend(write_variable_integer(data_stream_type as u64));
buf.extend_from_slice(&bytes_array);

let senders_mock = senders::test_helper_fn::create_senders_mock();
let upstream_session_id = 0;

let mut client = MOQTClient::new(upstream_session_id, senders_mock);
client.update_status(MOQTClientStatus::SetUp);
let client = Arc::new(Mutex::new(client));

let result = try_read_object(&mut buf, client).await;

assert_eq!(result, ObjectDatagramProcessResult::Continue);
}

#[tokio::test]
async fn datagram_object_continue_incomplete_message() {
let data_stream_type = DataStreamType::ObjectDatagram;
let bytes_array = [
0, // Subscribe ID (i)
1, // Track Alias (i)
2, // Group ID (i)
];
let mut buf = BytesMut::with_capacity(bytes_array.len() + 8);
buf.extend(write_variable_integer(data_stream_type as u64));
buf.extend_from_slice(&bytes_array);

let senders_mock = senders::test_helper_fn::create_senders_mock();
let upstream_session_id = 0;

let mut client = MOQTClient::new(upstream_session_id, senders_mock);
client.update_status(MOQTClientStatus::SetUp);
let client = Arc::new(Mutex::new(client));

let result = try_read_object(&mut buf, client).await;

assert_eq!(result, ObjectDatagramProcessResult::Continue);
}
}
}
127 changes: 127 additions & 0 deletions moqt-server/src/modules/message_handlers/object_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,130 @@ pub async fn try_read_object(
),
}
}

#[cfg(test)]
mod tests {
mod success {
use crate::modules::message_handlers::object_stream::{
try_read_object, ObjectStreamProcessResult, StreamObject,
};
use bytes::BytesMut;
use moqt_core::{
data_stream_type::DataStreamType,
messages::data_streams::{
object_stream_subgroup::ObjectStreamSubgroup,
object_stream_track::ObjectStreamTrack, DataStreams,
},
};
use std::io::Cursor;

#[tokio::test]
async fn stream_object_track_success() {
let data_stream_type = DataStreamType::StreamHeaderTrack;
let bytes_array = [
0, // Group ID (i)
1, // Object ID (i)
3, // Object Payload Length (i)
0, 1, 2, // Object Payload (..)
];
let mut buf = BytesMut::with_capacity(bytes_array.len());
buf.extend_from_slice(&bytes_array);
let buf_clone = buf.clone();

let result = try_read_object(&mut buf, data_stream_type).await;

let mut read_cur = Cursor::new(&buf_clone[..]);
let object = ObjectStreamTrack::depacketize(&mut read_cur).unwrap();

assert_eq!(
result,
ObjectStreamProcessResult::Success(StreamObject::Track(object))
);
}

#[tokio::test]
async fn stream_object_subgroup_success() {
let data_stream_type = DataStreamType::StreamHeaderSubgroup;
let bytes_array = [
0, // Object ID (i)
3, // Object Payload Length (i)
0, 1, 2, // Object Payload (..)
];
let mut buf = BytesMut::with_capacity(bytes_array.len());
buf.extend_from_slice(&bytes_array);
let buf_clone = buf.clone();

let result = try_read_object(&mut buf, data_stream_type).await;

let mut read_cur = Cursor::new(&buf_clone[..]);
let object = ObjectStreamSubgroup::depacketize(&mut read_cur).unwrap();

assert_eq!(
result,
ObjectStreamProcessResult::Success(StreamObject::Subgroup(object))
);
}

#[tokio::test]
async fn stream_object_track_continue_insufficient_payload() {
let data_stream_type = DataStreamType::StreamHeaderTrack;
let bytes_array = [
0, // Group ID (i)
1, // Object ID (i)
50, // Object Payload Length (i)
0, 1, 2, // Object Payload (..)
];
let mut buf = BytesMut::with_capacity(bytes_array.len());
buf.extend_from_slice(&bytes_array);

let result = try_read_object(&mut buf, data_stream_type).await;

assert_eq!(result, ObjectStreamProcessResult::Continue);
}

#[tokio::test]
async fn stream_object_subgroup_continue_insufficient_payload() {
let data_stream_type = DataStreamType::StreamHeaderSubgroup;
let bytes_array = [
0, // Object ID (i)
50, // Object Payload Length (i)
0, 1, 2, // Object Payload (..)
];
let mut buf = BytesMut::with_capacity(bytes_array.len());
buf.extend_from_slice(&bytes_array);

let result = try_read_object(&mut buf, data_stream_type).await;

assert_eq!(result, ObjectStreamProcessResult::Continue);
}

#[tokio::test]
async fn stream_object_track_continue_incomplete_message() {
let data_stream_type = DataStreamType::StreamHeaderTrack;
let bytes_array = [
0, // Group ID (i)
1, // Object ID (i)
];
let mut buf = BytesMut::with_capacity(bytes_array.len());
buf.extend_from_slice(&bytes_array);

let result = try_read_object(&mut buf, data_stream_type).await;

assert_eq!(result, ObjectStreamProcessResult::Continue);
}

#[tokio::test]
async fn stream_object_subgroup_continue_incomplete_message() {
let data_stream_type = DataStreamType::StreamHeaderSubgroup;
let bytes_array = [
0, // Object ID (i)
];
let mut buf = BytesMut::with_capacity(bytes_array.len());
buf.extend_from_slice(&bytes_array);

let result = try_read_object(&mut buf, data_stream_type).await;

assert_eq!(result, ObjectStreamProcessResult::Continue);
}
}
}
Loading

0 comments on commit edfdf42

Please sign in to comment.