Skip to content

Commit

Permalink
reduce nest (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
tetter27 authored Jan 14, 2025
1 parent edfdf42 commit 23e66f9
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 117 deletions.
55 changes: 22 additions & 33 deletions moqt-server/src/modules/message_handlers/object_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use moqt_core::{
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Debug, PartialEq)]
pub enum ObjectDatagramProcessResult {
Success(ObjectDatagram),
Expand All @@ -25,18 +26,8 @@ fn read_data_stream_type(read_cur: &mut std::io::Cursor<&[u8]>) -> Result<DataSt
bail!(err.to_string());
}
};

let data_stream_type: DataStreamType = match DataStreamType::try_from(type_value) {
Ok(v) => {
if v == DataStreamType::StreamHeaderTrack || v == DataStreamType::StreamHeaderSubgroup {
bail!("{:?} is not data stream type", v);
}
v
}
Err(err) => {
bail!(err.to_string());
}
};
let data_stream_type = DataStreamType::try_from(type_value)
.map_err(|err| anyhow::anyhow!("Failed to convert value: {}", err))?;
Ok(data_stream_type)
}

Expand Down Expand Up @@ -71,35 +62,33 @@ pub(crate) async fn try_read_object(
Err(err) => {
buf.advance(read_cur.position() as usize);

tracing::error!("data_stream_type is wrong: {:?}", err);
return ObjectDatagramProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
err.to_string(),
);
}
};

match data_stream_type {
DataStreamType::ObjectDatagram => {
let result = ObjectDatagram::depacketize(&mut read_cur);
match result {
Ok(object) => {
buf.advance(read_cur.position() as usize);

ObjectDatagramProcessResult::Success(object)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for an object has not yet arrived
read_cur.set_position(0);
ObjectDatagramProcessResult::Continue
}
}
let result = match data_stream_type {
DataStreamType::ObjectDatagram => ObjectDatagram::depacketize(&mut read_cur),
_ => {
return ObjectDatagramProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Invalid message type: {:?}", data_stream_type),
);
}
};
match result {
Ok(object) => {
buf.advance(read_cur.position() as usize);
ObjectDatagramProcessResult::Success(object)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for an object has not yet arrived
read_cur.set_position(0);
ObjectDatagramProcessResult::Continue
}
_ => ObjectDatagramProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Invalid message type: {:?}", data_stream_type),
),
}
}

Expand Down
60 changes: 22 additions & 38 deletions moqt-server/src/modules/message_handlers/object_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,48 +35,32 @@ pub async fn try_read_object(
}

let mut read_cur = Cursor::new(&buf[..]);

match data_stream_type {
let result = match data_stream_type {
DataStreamType::StreamHeaderTrack => {
let result = ObjectStreamTrack::depacketize(&mut read_cur);
match result {
Ok(object) => {
buf.advance(read_cur.position() as usize);

let object = StreamObject::Track(object);
ObjectStreamProcessResult::Success(object)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for an object has not yet arrived
read_cur.set_position(0);

ObjectStreamProcessResult::Continue
}
}
ObjectStreamTrack::depacketize(&mut read_cur).map(StreamObject::Track)
}
DataStreamType::StreamHeaderSubgroup => {
let result = ObjectStreamSubgroup::depacketize(&mut read_cur);
match result {
Ok(object) => {
buf.advance(read_cur.position() as usize);

let object = StreamObject::Subgroup(object);
ObjectStreamProcessResult::Success(object)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// // Reset the cursor position because data for an object has not yet arrived
read_cur.set_position(0);

ObjectStreamProcessResult::Continue
}
}
ObjectStreamSubgroup::depacketize(&mut read_cur).map(StreamObject::Subgroup)
}
unknown => {
return ObjectStreamProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),
);
}
};

match result {
Ok(stream_object) => {
buf.advance(read_cur.position() as usize);
ObjectStreamProcessResult::Success(stream_object)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for an object has not yet arrived
read_cur.set_position(0);
ObjectStreamProcessResult::Continue
}
unknown => ObjectStreamProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),
),
}
}

Expand Down
70 changes: 24 additions & 46 deletions moqt-server/src/modules/message_handlers/stream_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,8 @@ fn read_data_stream_type(read_cur: &mut std::io::Cursor<&[u8]>) -> Result<DataSt
}
};

let data_stream_type: DataStreamType = match DataStreamType::try_from(type_value) {
Ok(v) => {
if v == DataStreamType::ObjectDatagram {
bail!("{:?} is not data stream type", v);
}
v
}
Err(err) => {
bail!(err.to_string());
}
};
let data_stream_type = DataStreamType::try_from(type_value)
.map_err(|err| anyhow::anyhow!("Failed to convert value: {}", err))?;
Ok(data_stream_type)
}

Expand Down Expand Up @@ -90,45 +81,32 @@ pub async fn try_read_header(
};
tracing::info!("Received data stream type: {:?}", data_stream_type);

match data_stream_type {
let result = match data_stream_type {
DataStreamType::StreamHeaderTrack => {
let result = StreamHeaderTrack::depacketize(&mut read_cur);
match result {
Ok(header) => {
buf.advance(read_cur.position() as usize);

let header = StreamHeader::Track(header);
StreamHeaderProcessResult::Success(header)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for the header has not yet arrived
buf.advance(read_cur.position() as usize);
StreamHeaderProcessResult::Continue
}
}
StreamHeaderTrack::depacketize(&mut read_cur).map(StreamHeader::Track)
}
DataStreamType::StreamHeaderSubgroup => {
let result = StreamHeaderSubgroup::depacketize(&mut read_cur);
match result {
Ok(header) => {
buf.advance(read_cur.position() as usize);

let header = StreamHeader::Subgroup(header);
StreamHeaderProcessResult::Success(header)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for the header has not yet arrived
buf.advance(read_cur.position() as usize);
StreamHeaderProcessResult::Continue
}
}
StreamHeaderSubgroup::depacketize(&mut read_cur).map(StreamHeader::Subgroup)
}
unknown => {
return StreamHeaderProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),
);
}
};

match result {
Ok(stream_header) => {
buf.advance(read_cur.position() as usize);
StreamHeaderProcessResult::Success(stream_header)
}
Err(err) => {
tracing::warn!("{:#?}", err);
// Reset the cursor position because data for an object has not yet arrived
read_cur.set_position(0);
StreamHeaderProcessResult::Continue
}
unknown => StreamHeaderProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),
),
}
}

Expand Down

0 comments on commit 23e66f9

Please sign in to comment.