Skip to content

Commit

Permalink
feat: judge stream end
Browse files Browse the repository at this point in the history
  • Loading branch information
tetter27 committed Dec 13, 2024
1 parent a56a2d9 commit 9b7c671
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl ObjectStreamSubgroup {
pub fn object_id(&self) -> u64 {
self.object_id
}

pub fn object_status(&self) -> Option<ObjectStatus> {
self.object_status

Check warning on line 52 in moqt-core/src/modules/messages/data_streams/object_stream_subgroup.rs

View check run for this annotation

Codecov / codecov/patch

moqt-core/src/modules/messages/data_streams/object_stream_subgroup.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
}
}

impl DataStreams for ObjectStreamSubgroup {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ impl ObjectStreamTrack {
pub fn object_id(&self) -> u64 {
self.object_id
}

pub fn object_status(&self) -> Option<ObjectStatus> {
self.object_status

Check warning on line 59 in moqt-core/src/modules/messages/data_streams/object_stream_track.rs

View check run for this annotation

Codecov / codecov/patch

moqt-core/src/modules/messages/data_streams/object_stream_track.rs#L58-L59

Added lines #L58 - L59 were not covered by tests
}
}

impl DataStreams for ObjectStreamTrack {
Expand Down
5 changes: 5 additions & 0 deletions moqt-core/src/modules/pubsub_relation_manager_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub trait PubSubRelationManagerRepository: Send + Sync {
track_namespace: Vec<String>,
track_name: String,
) -> Result<Option<Subscription>>;
async fn get_upstream_subscription_by_ids(
&self,
upstream_session_id: usize,
upstream_subscribe_id: u64,
) -> Result<Option<Subscription>>;
async fn get_downstream_subscription_by_ids(
&self,
downstream_session_id: usize,
Expand Down
32 changes: 16 additions & 16 deletions moqt-server/src/modules/message_handlers/object_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::io::Cursor;

#[derive(Debug, PartialEq)]
pub enum ObjectStreamProcessResult {
Success,
Success(CacheObject),
IncompleteMessage,
Failure(TerminationErrorCode, String),
}
Expand Down Expand Up @@ -61,16 +61,18 @@ pub async fn object_stream_handler(
Ok(object) => {
read_buf.advance(read_cur.position() as usize);

let cache_object = CacheObject::Track(object);
let received_object = CacheObject::Track(object);

Check warning on line 64 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L64

Added line #L64 was not covered by tests
object_cache_storage
.set_object(client.id(), subscribe_id, cache_object, duration)
.set_object(client.id(), subscribe_id, received_object.clone(), duration)

Check warning on line 66 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L66

Added line #L66 was not covered by tests
.await
.unwrap();

ObjectStreamProcessResult::Success(received_object)

Check warning on line 70 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L70

Added line #L70 was not covered by tests
}
Err(err) => {
tracing::warn!("{:#?}", err);
read_cur.set_position(0);
return ObjectStreamProcessResult::IncompleteMessage;
ObjectStreamProcessResult::IncompleteMessage

Check warning on line 75 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L75

Added line #L75 was not covered by tests
}
}
}
Expand All @@ -80,26 +82,24 @@ pub async fn object_stream_handler(
Ok(object) => {
read_buf.advance(read_cur.position() as usize);

let cache_object = CacheObject::Subgroup(object);
let received_object = CacheObject::Subgroup(object);

Check warning on line 85 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L85

Added line #L85 was not covered by tests
object_cache_storage
.set_object(client.id(), subscribe_id, cache_object, duration)
.set_object(client.id(), subscribe_id, received_object.clone(), duration)

Check warning on line 87 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L87

Added line #L87 was not covered by tests
.await
.unwrap();

ObjectStreamProcessResult::Success(received_object)

Check warning on line 91 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L91

Added line #L91 was not covered by tests
}
Err(err) => {
tracing::warn!("{:#?}", err);
read_cur.set_position(0);
return ObjectStreamProcessResult::IncompleteMessage;
ObjectStreamProcessResult::IncompleteMessage

Check warning on line 96 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L96

Added line #L96 was not covered by tests
}
}
}
unknown => {
return ObjectStreamProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),
);
}
};

ObjectStreamProcessResult::Success
unknown => ObjectStreamProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),

Check warning on line 102 in moqt-server/src/modules/message_handlers/object_stream.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/object_stream.rs#L101-L102

Added lines #L101 - L102 were not covered by tests
),
}
}
38 changes: 18 additions & 20 deletions moqt-server/src/modules/message_handlers/stream_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
stream_track_subgroup::process_stream_header_subgroup,
},
moqt_client::{MOQTClient, MOQTClientStatus},
object_cache_storage::ObjectCacheStorageWrapper,
object_cache_storage::{CacheHeader, ObjectCacheStorageWrapper},
},
};
use anyhow::{bail, Result};
Expand All @@ -23,7 +23,7 @@ use std::io::Cursor;

#[derive(Debug, PartialEq)]
pub enum StreamHeaderProcessResult {
Success((u64, DataStreamType)),
Success(CacheHeader),
IncompleteMessage,
Failure(TerminationErrorCode, String),
}
Expand Down Expand Up @@ -92,7 +92,7 @@ pub async fn stream_header_handler(
);
}

let subscribe_id = match header_type {
match header_type {

Check warning on line 95 in moqt-server/src/modules/message_handlers/stream_header.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header.rs#L95

Added line #L95 was not covered by tests
DataStreamType::StreamHeaderTrack => {
match process_stream_header_track(
&mut read_cur,
Expand All @@ -102,16 +102,17 @@ pub async fn stream_header_handler(
)
.await
{
Ok(subscribe_id) => {
Ok(received_header) => {

Check warning on line 105 in moqt-server/src/modules/message_handlers/stream_header.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header.rs#L105

Added line #L105 was not covered by tests
read_buf.advance(read_cur.position() as usize);
subscribe_id

StreamHeaderProcessResult::Success(received_header)

Check warning on line 108 in moqt-server/src/modules/message_handlers/stream_header.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header.rs#L108

Added line #L108 was not covered by tests
}
Err(err) => {
read_buf.advance(read_cur.position() as usize);
return StreamHeaderProcessResult::Failure(
StreamHeaderProcessResult::Failure(
TerminationErrorCode::InternalError,
err.to_string(),
);
)
}
}
}
Expand All @@ -124,26 +125,23 @@ pub async fn stream_header_handler(
)
.await
{
Ok(subscribe_id) => {
Ok(received_header) => {

Check warning on line 128 in moqt-server/src/modules/message_handlers/stream_header.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header.rs#L128

Added line #L128 was not covered by tests
read_buf.advance(read_cur.position() as usize);
subscribe_id

StreamHeaderProcessResult::Success(received_header)

Check warning on line 131 in moqt-server/src/modules/message_handlers/stream_header.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header.rs#L131

Added line #L131 was not covered by tests
}
Err(err) => {
read_buf.advance(read_cur.position() as usize);
return StreamHeaderProcessResult::Failure(
StreamHeaderProcessResult::Failure(
TerminationErrorCode::InternalError,
err.to_string(),
);
)
}
}
}
unknown => {
return StreamHeaderProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),
);
}
};

StreamHeaderProcessResult::Success((subscribe_id, header_type))
unknown => StreamHeaderProcessResult::Failure(
TerminationErrorCode::ProtocolViolation,
format!("Unknown message type: {:?}", unknown),

Check warning on line 144 in moqt-server/src/modules/message_handlers/stream_header.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header.rs#L143-L144

Added lines #L143 - L144 were not covered by tests
),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn stream_header_subgroup_handler(
pubsub_relation_manager_repository: &mut dyn PubSubRelationManagerRepository,
object_cache_storage: &mut ObjectCacheStorageWrapper,
client: &MOQTClient,
) -> Result<u64> {
) -> Result<CacheHeader> {
tracing::trace!("stream_header_subgroup_handler start.");

tracing::debug!(
Expand All @@ -37,8 +37,12 @@ pub(crate) async fn stream_header_subgroup_handler(

let cache_header = CacheHeader::Subgroup(stream_header_subgroup_message);
object_cache_storage
.set_subscription(upstream_session_id, upstream_subscribe_id, cache_header)
.set_subscription(
upstream_session_id,
upstream_subscribe_id,
cache_header.clone(),

Check warning on line 43 in moqt-server/src/modules/message_handlers/stream_header/handlers/stream_subgroup_header_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header/handlers/stream_subgroup_header_handler.rs#L41-L43

Added lines #L41 - L43 were not covered by tests
)
.await?;

Ok(upstream_subscribe_id)
Ok(cache_header)

Check warning on line 47 in moqt-server/src/modules/message_handlers/stream_header/handlers/stream_subgroup_header_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header/handlers/stream_subgroup_header_handler.rs#L47

Added line #L47 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn stream_header_track_handler(
pubsub_relation_manager_repository: &mut dyn PubSubRelationManagerRepository,
object_cache_storage: &mut ObjectCacheStorageWrapper,
client: &MOQTClient,
) -> Result<u64> {
) -> Result<CacheHeader> {
tracing::trace!("stream_header_track_handler start.");

tracing::debug!(
Expand All @@ -37,8 +37,12 @@ pub(crate) async fn stream_header_track_handler(

let cache_header = CacheHeader::Track(stream_header_track_message);
object_cache_storage
.set_subscription(upstream_session_id, upstream_subscribe_id, cache_header)
.set_subscription(
upstream_session_id,
upstream_subscribe_id,
cache_header.clone(),

Check warning on line 43 in moqt-server/src/modules/message_handlers/stream_header/handlers/stream_track_header_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header/handlers/stream_track_header_handler.rs#L41-L43

Added lines #L41 - L43 were not covered by tests
)
.await?;

Ok(upstream_subscribe_id)
Ok(cache_header)

Check warning on line 47 in moqt-server/src/modules/message_handlers/stream_header/handlers/stream_track_header_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/message_handlers/stream_header/handlers/stream_track_header_handler.rs#L47

Added line #L47 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use moqt_core::{

use crate::modules::{
message_handlers::stream_header::handlers::stream_track_header_handler::stream_header_track_handler,
moqt_client::MOQTClient, object_cache_storage::ObjectCacheStorageWrapper,
moqt_client::MOQTClient,
object_cache_storage::{CacheHeader, ObjectCacheStorageWrapper},
};

pub(crate) async fn process_stream_header_track(
read_cur: &mut std::io::Cursor<&[u8]>,
pubsub_relation_manager_repository: &mut dyn PubSubRelationManagerRepository,
object_cache_storage: &mut ObjectCacheStorageWrapper,
client: &MOQTClient,
) -> Result<u64> {
) -> Result<CacheHeader> {
let stream_header_track = match StreamHeaderTrack::depacketize(read_cur) {
Ok(stream_header_track) => stream_header_track,
Err(err) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use moqt_core::{

use crate::modules::{
message_handlers::stream_header::handlers::stream_subgroup_header_handler::stream_header_subgroup_handler,
moqt_client::MOQTClient, object_cache_storage::ObjectCacheStorageWrapper,
moqt_client::MOQTClient,
object_cache_storage::{CacheHeader, ObjectCacheStorageWrapper},
};

pub(crate) async fn process_stream_header_subgroup(
read_cur: &mut std::io::Cursor<&[u8]>,
pubsub_relation_manager_repository: &mut dyn PubSubRelationManagerRepository,
object_cache_storage: &mut ObjectCacheStorageWrapper,
client: &MOQTClient,
) -> Result<u64> {
) -> Result<CacheHeader> {
let stream_header_subgroup = match StreamHeaderSubgroup::depacketize(read_cur) {
Ok(stream_header_subgroup) => stream_header_subgroup,
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion moqt-server/src/modules/object_cache_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) enum CacheHeader {
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum CacheObject {
Datagram(ObjectDatagram),
Track(ObjectStreamTrack),
Expand Down
5 changes: 5 additions & 0 deletions moqt-server/src/modules/pubsub_relation_manager/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ pub(crate) enum PubSubRelationCommand {
track_name: String,
resp: oneshot::Sender<Result<Option<Subscription>>>,
},
GetUpstreamSubscriptionBySessionIdAndSubscribeId {
upstream_session_id: usize,
upstream_subscribe_id: u64,
resp: oneshot::Sender<Result<Option<Subscription>>>,
},
GetDownstreamSubscriptionBySessionIdAndSubscribeId {
downstream_session_id: usize,
downstream_subscribe_id: u64,
Expand Down
10 changes: 10 additions & 0 deletions moqt-server/src/modules/pubsub_relation_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ pub(crate) async fn pubsub_relation_manager(rx: &mut mpsc::Receiver<PubSubRelati

resp.send(result).unwrap();
}
GetUpstreamSubscriptionBySessionIdAndSubscribeId {
upstream_session_id,
upstream_subscribe_id,
resp,
} => {
let consumer = consumers.get(&upstream_session_id).unwrap();
let result = consumer.get_subscription(upstream_subscribe_id);

resp.send(result).unwrap();
}
GetDownstreamSubscriptionBySessionIdAndSubscribeId {
downstream_session_id,
downstream_subscribe_id,
Expand Down
Loading

0 comments on commit 9b7c671

Please sign in to comment.