From 2235eb9a5947674400934a0c40b4179f8a6d2478 Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Thu, 17 Oct 2024 15:48:37 +0900 Subject: [PATCH 1/6] Experiment implementation of FOP-1 --- Cargo.lock | 5 +- Cargo.toml | 6 +- gaia-stub/proto/broker.proto | 34 ++++ gaia-tmtc/src/broker.rs | 105 ++++++++++- tmtc-c2a/Cargo.toml | 1 + tmtc-c2a/src/fop1.rs | 333 +++++++++++++++++++++++++++++++++++ tmtc-c2a/src/lib.rs | 1 + tmtc-c2a/src/main.rs | 5 +- tmtc-c2a/src/satellite.rs | 179 ++++++++++++++++++- 9 files changed, 653 insertions(+), 16 deletions(-) create mode 100644 tmtc-c2a/src/fop1.rs diff --git a/Cargo.lock b/Cargo.lock index 3cb76bb5..d9169041 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2157,6 +2157,7 @@ dependencies = [ "structpack", "tlmcmddb", "tokio", + "tokio-stream", "tokio-tungstenite", "tonic", "tonic-build", @@ -2222,9 +2223,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 09780be5..deac9c6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ license = "MPL-2.0" [workspace.dependencies] structpack = "1.0" -gaia-stub = "1.0" -gaia-ccsds-c2a = "1.0" -gaia-tmtc = "1.0" +gaia-stub = { path = "gaia-stub" } +gaia-ccsds-c2a = { path = "gaia-ccsds-c2a" } +gaia-tmtc = { path = "gaia-tmtc" } c2a-devtools-frontend = "1.0" diff --git a/gaia-stub/proto/broker.proto b/gaia-stub/proto/broker.proto index 6991bd04..298c4700 100644 --- a/gaia-stub/proto/broker.proto +++ b/gaia-stub/proto/broker.proto @@ -11,6 +11,10 @@ service Broker { rpc OpenCommandStream(stream CommandStreamRequest) returns (stream CommandStreamResponse); rpc PostTelemetry(PostTelemetryRequest) returns (PostTelemetryResponse); + + rpc PostSetVR(PostSetVRRequest) returns (PostSetVRResponse); + rpc PostADCommand(PostADCommandRequest) returns (PostADCommandResponse); + rpc SubscribeFopFrameEvents(SubscribeFopFrameEventsRequest) returns (stream FopFrameEvent); } message PostCommandRequest { @@ -52,3 +56,33 @@ message GetLastReceivedTelemetryRequest { message GetLastReceivedTelemetryResponse { tco_tmiv.Tmiv tmiv = 1; } + +message PostSetVRRequest { + uint32 vr = 1; +} + +message PostSetVRResponse { +} + +message PostADCommandRequest { + tco_tmiv.Tco tco = 1; +} + +message PostADCommandResponse { + bool success = 1; + uint64 frame_id = 2; +} + +message SubscribeFopFrameEventsRequest { +} + +message FopFrameEvent { + uint64 frame_id = 1; + enum EventType { + TRANSMIT = 0; + ACKNOWLEDGED = 1; + RETRANSMIT = 2; + CANCEL = 3; + }; + EventType event_type = 2; +} diff --git a/gaia-tmtc/src/broker.rs b/gaia-tmtc/src/broker.rs index d9f429bc..58c405db 100644 --- a/gaia-tmtc/src/broker.rs +++ b/gaia-tmtc/src/broker.rs @@ -3,6 +3,7 @@ use std::{fmt::Debug, sync::Arc}; use anyhow::Result; use futures::prelude::*; use gaia_stub::tco_tmiv::Tco; +use std::pin::Pin; use tokio::sync::Mutex; use tokio_stream::wrappers::BroadcastStream; use tonic::{Request, Response, Status, Streaming}; @@ -11,36 +12,61 @@ use super::telemetry::{self, LastTmivStore}; pub use gaia_stub::broker::*; -pub struct BrokerService { +pub struct BrokerService { cmd_handler: Mutex, + fop_command_service: Mutex, tlm_bus: telemetry::Bus, last_tmiv_store: Arc, } -impl BrokerService { +impl BrokerService { pub fn new( cmd_service: C, + fop_command_service: F, tlm_bus: telemetry::Bus, last_tmiv_store: Arc, ) -> Self { Self { cmd_handler: Mutex::new(cmd_service), + fop_command_service: Mutex::new(fop_command_service), tlm_bus, last_tmiv_store, } } } +use async_trait::async_trait; +pub enum FopFrameEvent { + Transmit(u64), + Acknowledged(u64), + Retransmit(u64), + Cancel(u64), +} + +#[async_trait] +pub trait FopCommandService { + async fn send_set_vr(&mut self, value: u8); + + async fn send_ad_command(&mut self, tco: Tco) -> Result; + + async fn subscribe_frame_events( + &mut self, + ) -> Result + Send>>>; +} + #[tonic::async_trait] -impl broker_server::Broker for BrokerService +impl broker_server::Broker for BrokerService where C: super::Handle> + Send + Sync + 'static, C::Response: Send + 'static, + F: FopCommandService + Send + Sync + 'static, { type OpenCommandStreamStream = stream::BoxStream<'static, Result>; type OpenTelemetryStreamStream = stream::BoxStream<'static, Result>; + type SubscribeFopFrameEventsStream = + stream::BoxStream<'static, Result>; #[tracing::instrument(skip(self))] async fn post_command( @@ -66,6 +92,50 @@ where Ok(Response::new(PostCommandResponse {})) } + #[tracing::instrument(skip(self))] + async fn post_set_vr( + &self, + request: Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + let value = message.vr; + self.fop_command_service + .lock() + .await + .send_set_vr(value as _) + .await; + Ok(Response::new(PostSetVrResponse {})) + } + + #[tracing::instrument(skip(self))] + async fn post_ad_command( + &self, + request: Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + + let tco = message + .tco + .ok_or_else(|| Status::invalid_argument("tco is required"))?; + + fn internal_error(e: E) -> Status { + Status::internal(format!("{:?}", e)) + } + let id = self + .fop_command_service + .lock() + .await + .send_ad_command(tco) + .await + .map_err(internal_error)?; + + tracing::info!("AD command sent"); + Ok(Response::new(PostAdCommandResponse { + success: true, + frame_id: id, + })) + } + #[tracing::instrument(skip(self))] async fn open_telemetry_stream( &self, @@ -115,4 +185,33 @@ where Err(Status::not_found("not received yet")) } } + + #[tracing::instrument(skip(self))] + async fn subscribe_fop_frame_events( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + use futures::StreamExt; + let stream = self + .fop_command_service + .lock() + .await + .subscribe_frame_events() + .await + .map_err(|_| Status::internal("failed to subscribe frame events"))?; + use gaia_stub::broker::fop_frame_event::EventType; + let stream = stream.map(|e| { + let (frame_id, event_type) = match e { + FopFrameEvent::Transmit(id) => (id, EventType::Transmit), + FopFrameEvent::Acknowledged(id) => (id, EventType::Acknowledged), + FopFrameEvent::Retransmit(id) => (id, EventType::Retransmit), + FopFrameEvent::Cancel(id) => (id, EventType::Cancel), + }; + Ok(gaia_stub::broker::FopFrameEvent { + frame_id, + event_type: event_type.into(), + }) + }); + Ok(Response::new(stream.boxed())) + } } diff --git a/tmtc-c2a/Cargo.toml b/tmtc-c2a/Cargo.toml index 3797be01..e9f65737 100644 --- a/tmtc-c2a/Cargo.toml +++ b/tmtc-c2a/Cargo.toml @@ -43,6 +43,7 @@ tokio-tungstenite = "0.20.1" itertools = "0.12.1" notalawyer = "0.1.0" notalawyer-clap = "0.1.0" +tokio-stream = { version = "0.1.16", features = ["sync"] } [build-dependencies] tonic-build = "0.11" diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs new file mode 100644 index 00000000..c8354832 --- /dev/null +++ b/tmtc-c2a/src/fop1.rs @@ -0,0 +1,333 @@ +use anyhow::Result; +use gaia_ccsds_c2a::ccsds::tc::{self, clcw::CLCW}; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::broadcast; + +fn wrapping_le(a: u8, b: u8) -> bool { + let diff = b.wrapping_sub(a); + diff < 128 +} + +fn wrapping_lt(a: u8, b: u8) -> bool { + a != b && wrapping_le(a, b) +} + +fn remove_acknowledged_frames( + queue: &mut VecDeque, + acknowledged_fsn: u8, + on_acknowledge: impl Fn(u64), +) -> usize { + let mut ack_count = 0; + while !queue.is_empty() { + let front = queue.front().unwrap(); + if wrapping_lt(front.sequence_number, acknowledged_fsn) { + ack_count += 1; + let frame = queue.pop_front().unwrap().frame; + on_acknowledge(frame.id); + } else { + break; + } + } + ack_count +} + +#[derive(Clone, Copy)] +struct FarmState { + next_expected_fsn: u8, + _lockout: bool, + _wait: bool, + retransmit: bool, +} + +enum FopState { + Initial, + Active(ActiveState), + Retransmit(RetransmitState), + Initializing { expected_nr: u8 }, +} + +struct SentFrame { + frame: Arc, + sent_at: std::time::Instant, + sequence_number: u8, +} + +struct ActiveState { + next_fsn: u8, + sent_queue: VecDeque, +} + +struct RetransmitState { + next_fsn: u8, + retransmit_count: usize, + retransmit_sent_queue: VecDeque, + retransmit_wait_queue: VecDeque, +} + +impl ActiveState { + fn acknowledge(&mut self, acknowledged_fsn: u8, on_acknowledge: impl Fn(u64)) { + remove_acknowledged_frames(&mut self.sent_queue, acknowledged_fsn, on_acknowledge); + } + + fn send( + &mut self, + next_frame_id: &mut u64, + data_field: Vec, + on_transmit: impl Fn(u64), + ) -> Option> { + let fsn = self.next_fsn; + self.next_fsn = self.next_fsn.wrapping_add(1); + let frame = Frame { + id: *next_frame_id, + frame_type: tc::sync_and_channel_coding::FrameType::TypeAD, + sequence_number: fsn, + data_field, + }; + *next_frame_id += 1; + let frame = Arc::new(frame); + on_transmit(frame.id); + self.sent_queue.push_back(SentFrame { + frame: frame.clone(), + sent_at: std::time::Instant::now(), + sequence_number: fsn, + }); + + Some(frame) + } + + fn timeout(&self) -> bool { + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); + if let Some(head) = self.sent_queue.front() { + if head.sent_at.elapsed() > TIMEOUT { + return true; + } + } + false + } +} + +impl RetransmitState { + fn acknowledge( + &mut self, + acknowledged_fsn: u8, + retransmit: bool, + on_acknowledge: impl Fn(u64), + ) -> bool { + let ack_count = remove_acknowledged_frames( + &mut self.retransmit_wait_queue, + acknowledged_fsn, + &on_acknowledge, + ) + remove_acknowledged_frames( + &mut self.retransmit_sent_queue, + acknowledged_fsn, + &on_acknowledge, + ); + if ack_count > 0 { + self.retransmit_count = 0; + } + + if !retransmit { + return self.retransmit_wait_queue.is_empty() && self.retransmit_sent_queue.is_empty(); + } + + if ack_count > 0 { + self.redo_retransmit(); + } + false + } + + fn redo_retransmit(&mut self) { + self.retransmit_count += 1; + // prepend sent_queue to wait_queue + // but the library doesn't provide "prepend" method... + self.retransmit_sent_queue + .append(&mut self.retransmit_wait_queue); + std::mem::swap( + &mut self.retransmit_sent_queue, + &mut self.retransmit_wait_queue, + ); + } + + fn update(&mut self) -> Option> { + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); + if let Some(head) = self.retransmit_sent_queue.front() { + if head.sent_at.elapsed() > TIMEOUT { + self.redo_retransmit(); + } + } + + let mut next_retransmit = self.retransmit_wait_queue.pop_front()?; + let frame = next_retransmit.frame.clone(); + next_retransmit.sent_at = std::time::Instant::now(); + self.retransmit_sent_queue.push_back(next_retransmit); + Some(frame) + } +} + +pub(crate) struct Fop { + next_frame_id: u64, + state: FopState, + last_received_farm_state: Option, + event_sender: broadcast::Sender, +} + +impl Fop { + pub(crate) fn new() -> Self { + let (event_sender, _) = broadcast::channel(16); + Self { + next_frame_id: 0, + state: FopState::Initial, + last_received_farm_state: None, + event_sender, + } + } + + pub(crate) fn subscribe_frame_events(&self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + pub(crate) async fn handle_clcw(&mut self, clcw: CLCW) -> Result<()> { + tracing::debug!("Received CLCW: {:?}", clcw); + let farm_state = FarmState { + next_expected_fsn: clcw.report_value(), + _lockout: clcw.lockout() != 0, + _wait: clcw.wait() != 0, + retransmit: clcw.retransmit() != 0, + }; + self.last_received_farm_state = Some(farm_state); + + let on_acknowledge = |frame_id| { + self.event_sender + .send(FrameEvent::Acknowledged(frame_id)) + .ok(); + }; + + match &mut self.state { + FopState::Initial => { + // do nothing + } + FopState::Initializing { expected_nr } => { + if farm_state.next_expected_fsn == *expected_nr { + tracing::info!("FOP initialized"); + self.state = FopState::Active(ActiveState { + next_fsn: *expected_nr, + sent_queue: VecDeque::new(), + }); + } + } + FopState::Active(state) => { + state.acknowledge(farm_state.next_expected_fsn, on_acknowledge); + if farm_state.retransmit { + self.state = FopState::Retransmit(RetransmitState { + next_fsn: state.next_fsn, + retransmit_count: 1, + retransmit_sent_queue: VecDeque::new(), + retransmit_wait_queue: std::mem::take(&mut state.sent_queue), + }); + } + } + FopState::Retransmit(state) => { + let completed = state.acknowledge( + farm_state.next_expected_fsn, + farm_state.retransmit, + on_acknowledge, + ); + if completed { + self.state = FopState::Active(ActiveState { + next_fsn: state.next_fsn, + sent_queue: VecDeque::new(), + }); + } + } + } + Ok(()) + } + + pub(crate) fn set_vr(&mut self, vr: u8) -> Option { + tracing::info!("Setting VR to {}", vr); + let mut canceled_frames = VecDeque::new(); + match &mut self.state { + FopState::Initializing { .. } => { + return None; + } + FopState::Initial => { + // do nothing + } + FopState::Active(state) => { + canceled_frames.append(&mut state.sent_queue); + } + FopState::Retransmit(state) => { + canceled_frames.append(&mut state.retransmit_sent_queue); + canceled_frames.append(&mut state.retransmit_wait_queue); + } + } + + for frame in canceled_frames { + self.event_sender + .send(FrameEvent::Cancel(frame.frame.id)) + .ok(); + } + + self.state = FopState::Initializing { expected_nr: vr }; + let frame = Frame { + //TODO: manage BC retransmission and frame id for setvr command + //id: self.next_frame_id, + id: 0, + frame_type: tc::sync_and_channel_coding::FrameType::TypeBC, + // TODO: frame number of setvr command??? + sequence_number: 0, + data_field: vec![0x82, 0x00, vr], + }; + Some(frame) + } + + pub(crate) fn send_ad(&mut self, data_field: Vec) -> Option> { + let state = match &mut self.state { + FopState::Active(state) => state, + _ => return None, + }; + + state.send(&mut self.next_frame_id, data_field, |frame_id| { + self.event_sender.send(FrameEvent::Transmit(frame_id)).ok(); + }) + } + + pub(crate) fn update(&mut self) -> Option> { + if let FopState::Active(state) = &mut self.state { + if state.timeout() { + self.state = FopState::Retransmit(RetransmitState { + next_fsn: state.next_fsn, + retransmit_count: 1, + retransmit_sent_queue: VecDeque::new(), + retransmit_wait_queue: std::mem::take(&mut state.sent_queue), + }); + } + } + + let frame = match &mut self.state { + FopState::Retransmit(state) => state.update(), + _ => None, + }; + let frame = frame?; + self.event_sender + .send(FrameEvent::Retransmit(frame.id)) + .ok(); + Some(frame) + } +} + +pub struct Frame { + pub id: u64, + pub frame_type: tc::sync_and_channel_coding::FrameType, + pub sequence_number: u8, + pub data_field: Vec, +} + +#[derive(Debug, Clone)] +pub enum FrameEvent { + Transmit(u64), + Acknowledged(u64), + Retransmit(u64), + Cancel(u64), +} diff --git a/tmtc-c2a/src/lib.rs b/tmtc-c2a/src/lib.rs index 0093a2c3..d68aca11 100644 --- a/tmtc-c2a/src/lib.rs +++ b/tmtc-c2a/src/lib.rs @@ -1,5 +1,6 @@ mod satconfig; pub use satconfig::Satconfig; +mod fop1; pub mod kble_gs; pub mod proto; pub mod registry; diff --git a/tmtc-c2a/src/main.rs b/tmtc-c2a/src/main.rs index 51638393..e66c66e6 100644 --- a/tmtc-c2a/src/main.rs +++ b/tmtc-c2a/src/main.rs @@ -128,7 +128,7 @@ async fn main() -> Result<()> { let (link, socket) = kble_gs::new(); let kble_socket_fut = socket.serve((args.kble_addr, args.kble_port)); - let (satellite_svc, sat_tlm_reporter) = satellite::new( + let (satellite_svc, fop_cmd_service, sat_tlm_reporter) = satellite::new( satconfig.aos_scid, satconfig.tc_scid, tlm_registry, @@ -144,7 +144,8 @@ async fn main() -> Result<()> { // Constructing gRPC services let server_task = { - let broker_service = BrokerService::new(cmd_handler, tlm_bus, last_tmiv_store); + let broker_service = + BrokerService::new(cmd_handler, fop_cmd_service, tlm_bus, last_tmiv_store); let broker_server = BrokerServer::new(broker_service); let tmtc_generic_c2a_server = TmtcGenericC2aServer::new(tmtc_generic_c2a_service); diff --git a/tmtc-c2a/src/satellite.rs b/tmtc-c2a/src/satellite.rs index 35cd17a9..c00a580e 100644 --- a/tmtc-c2a/src/satellite.rs +++ b/tmtc-c2a/src/satellite.rs @@ -1,4 +1,5 @@ -use std::{sync::Arc, time}; +use std::{pin::Pin, sync::Arc, time}; +use tokio::sync::Mutex; use crate::{ registry::{CommandRegistry, FatCommandSchema, TelemetryRegistry}, @@ -120,7 +121,7 @@ impl<'a> CommandContext<'a> { #[derive(Clone)] pub struct Service { - sync_and_channel_coding: T, + sync_and_channel_coding: Arc>, registry: Arc, tc_scid: u16, } @@ -138,7 +139,8 @@ where fat_schema, tco, }; - ctx.transmit_to(&mut self.sync_and_channel_coding).await?; + ctx.transmit_to(&mut *self.sync_and_channel_coding.lock().await) + .await?; Ok(true) } } @@ -151,21 +153,29 @@ pub fn new( cmd_registry: impl Into>, receiver: R, transmitter: T, -) -> (Service, TelemetryReporter) +) -> (Service, FopCommandService, TelemetryReporter) where - T: tc::SyncAndChannelCoding, + T: tc::SyncAndChannelCoding + Send + 'static, R: aos::SyncAndChannelCoding, { + let registry = cmd_registry.into(); + let transmitter = Arc::new(Mutex::new(transmitter)); + let fop = crate::fop1::Fop::new(); + let fop = Arc::new(Mutex::new(fop)); + let fop_command_service = + FopCommandService::start(fop.clone(), tc_scid, transmitter.clone(), registry.clone()); ( Service { tc_scid, sync_and_channel_coding: transmitter, - registry: cmd_registry.into(), + registry, }, + fop_command_service, TelemetryReporter { aos_scid, receiver, tmiv_builder: TmivBuilder { tlm_registry }, + fop, }, ) } @@ -187,6 +197,7 @@ pub struct TelemetryReporter { aos_scid: u8, tmiv_builder: TmivBuilder, receiver: R, + fop: Arc>, } impl TelemetryReporter @@ -211,6 +222,15 @@ where ); continue; }; + + { + let clcw = tf.trailer.into_ref().clone(); + let mut fop = self.fop.lock().await; + if let Err(e) = fop.handle_clcw(clcw).await { + error!("failed to handle CLCW: {:?}", e); + } + } + let incoming_scid = tf.primary_header.scid(); if incoming_scid != self.aos_scid { warn!("unknown SCID: {incoming_scid}"); @@ -265,3 +285,150 @@ where } } } + +pub struct FopCommandService { + transmitter: Arc>, + tc_scid: u16, + fop: Arc>, + registry: Arc, +} + +impl FopCommandService { + pub(crate) fn start( + fop: Arc>, + tc_scid: u16, + transmitter: Arc>, + registry: Arc, + ) -> Self { + let service = Self { + transmitter, + tc_scid, + fop, + registry, + }; + + tokio::spawn(Self::run_update( + tc_scid, + service.transmitter.clone(), + service.fop.clone(), + )); + service + } + + async fn run_update( + tc_scid: u16, + transmitter: Arc>, + fop: Arc>, + ) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + //tracing::debug!("FopCommandService: update"); + while let Some(frame) = fop.lock().await.update() { + tracing::debug!( + "FopCommandService: retransmitting {}", + frame.sequence_number + ); + let mut transmitter = transmitter.lock().await; + let vcid = 0; + let _ = transmitter + .transmit( + tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + } + } + } +} + +#[async_trait] +impl gaia_tmtc::broker::FopCommandService + for FopCommandService +{ + async fn send_set_vr(&mut self, vr: u8) { + let frame = { + let mut fop = self.fop.lock().await; + let frame = fop.set_vr(vr); + match frame { + Some(frame) => frame, + None => { + //TODO: return error? + return; + } + } + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + //transmitter. + } + + async fn send_ad_command(&mut self, tco: Tco) -> Result { + let Some(fat_schema) = self.registry.lookup(&tco.name) else { + return Err(anyhow!("unknown command: {}", tco.name)); + }; + let ctx = CommandContext { + tc_scid: 0, // dummy + fat_schema, + tco: &tco, + }; + let mut buf = vec![0u8; 1017]; // FIXME: hard-coded max size + let len = ctx.build_tc_segment(&mut buf)?; + buf.truncate(len); + + let mut fop = self.fop.lock().await; + let frame = match fop.send_ad(buf) { + None => { + tracing::warn!("FOP is not ready"); + return Err(anyhow!("FOP is not ready")); + } + Some(frame) => frame, + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + + Ok(frame.id) + } + + async fn subscribe_frame_events( + &mut self, + ) -> Result + Send>>> { + use futures::StreamExt; + let rx = self.fop.lock().await.subscribe_frame_events(); + let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|e| async { + use crate::fop1::FrameEvent; + use gaia_tmtc::broker::FopFrameEvent; + let e = e.ok()?; + let e = match e { + FrameEvent::Transmit(id) => FopFrameEvent::Transmit(id), + FrameEvent::Acknowledged(id) => FopFrameEvent::Acknowledged(id), + FrameEvent::Retransmit(id) => FopFrameEvent::Retransmit(id), + FrameEvent::Cancel(id) => FopFrameEvent::Cancel(id), + }; + Some(e) + }); + Ok(stream.boxed()) + } +} From ba9145cddbb1af52d41aa690e124c6551cc5222d Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Mon, 18 Nov 2024 16:00:52 +0900 Subject: [PATCH 2/6] set fop TIMEOUT to 5s --- tmtc-c2a/src/fop1.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs index c8354832..3b197405 100644 --- a/tmtc-c2a/src/fop1.rs +++ b/tmtc-c2a/src/fop1.rs @@ -97,7 +97,7 @@ impl ActiveState { } fn timeout(&self) -> bool { - const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); if let Some(head) = self.sent_queue.front() { if head.sent_at.elapsed() > TIMEOUT { return true; @@ -150,7 +150,7 @@ impl RetransmitState { } fn update(&mut self) -> Option> { - const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); if let Some(head) = self.retransmit_sent_queue.front() { if head.sent_at.elapsed() > TIMEOUT { self.redo_retransmit(); From af10ff15f9ba01732523b73e83c7343f1c92f18c Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Tue, 19 Nov 2024 09:43:44 +0900 Subject: [PATCH 3/6] forget ongoing Set V(R) command on new one's request --- tmtc-c2a/src/fop1.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs index 3b197405..040aa9b2 100644 --- a/tmtc-c2a/src/fop1.rs +++ b/tmtc-c2a/src/fop1.rs @@ -249,7 +249,8 @@ impl Fop { let mut canceled_frames = VecDeque::new(); match &mut self.state { FopState::Initializing { .. } => { - return None; + // forget the previous setvr command + // do nothing } FopState::Initial => { // do nothing From 7f53f5b97dc2ec9ec673614d435444c0f88f6221 Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Tue, 19 Nov 2024 20:32:55 +0900 Subject: [PATCH 4/6] unlock command --- gaia-stub/proto/broker.proto | 5 +++ gaia-tmtc/src/broker.rs | 11 ++++++ tmtc-c2a/src/fop1.rs | 72 ++++++++++++++++++++++++++++-------- tmtc-c2a/src/satellite.rs | 27 ++++++++++++++ 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/gaia-stub/proto/broker.proto b/gaia-stub/proto/broker.proto index 298c4700..b26d734c 100644 --- a/gaia-stub/proto/broker.proto +++ b/gaia-stub/proto/broker.proto @@ -13,6 +13,7 @@ service Broker { rpc PostTelemetry(PostTelemetryRequest) returns (PostTelemetryResponse); rpc PostSetVR(PostSetVRRequest) returns (PostSetVRResponse); + rpc PostUnlock(PostUnlockRequest) returns (PostUnlockResponse); rpc PostADCommand(PostADCommandRequest) returns (PostADCommandResponse); rpc SubscribeFopFrameEvents(SubscribeFopFrameEventsRequest) returns (stream FopFrameEvent); } @@ -64,6 +65,10 @@ message PostSetVRRequest { message PostSetVRResponse { } +message PostUnlockRequest { } + +message PostUnlockResponse { } + message PostADCommandRequest { tco_tmiv.Tco tco = 1; } diff --git a/gaia-tmtc/src/broker.rs b/gaia-tmtc/src/broker.rs index 58c405db..3d83a7a4 100644 --- a/gaia-tmtc/src/broker.rs +++ b/gaia-tmtc/src/broker.rs @@ -47,6 +47,8 @@ pub enum FopFrameEvent { pub trait FopCommandService { async fn send_set_vr(&mut self, value: u8); + async fn send_unlock(&mut self); + async fn send_ad_command(&mut self, tco: Tco) -> Result; async fn subscribe_frame_events( @@ -107,6 +109,15 @@ where Ok(Response::new(PostSetVrResponse {})) } + #[tracing::instrument(skip(self))] + async fn post_unlock( + &self, + _request: Request, + ) -> Result, tonic::Status> { + self.fop_command_service.lock().await.send_unlock().await; + Ok(Response::new(PostUnlockResponse {})) + } + #[tracing::instrument(skip(self))] async fn post_ad_command( &self, diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs index 040aa9b2..22cf2569 100644 --- a/tmtc-c2a/src/fop1.rs +++ b/tmtc-c2a/src/fop1.rs @@ -35,16 +35,15 @@ fn remove_acknowledged_frames( #[derive(Clone, Copy)] struct FarmState { next_expected_fsn: u8, - _lockout: bool, + lockout: bool, _wait: bool, retransmit: bool, } enum FopState { - Initial, Active(ActiveState), Retransmit(RetransmitState), - Initializing { expected_nr: u8 }, + Initial { expected_nr: Option }, } struct SentFrame { @@ -177,7 +176,7 @@ impl Fop { let (event_sender, _) = broadcast::channel(16); Self { next_frame_id: 0, - state: FopState::Initial, + state: FopState::Initial { expected_nr: None }, last_received_farm_state: None, event_sender, } @@ -191,7 +190,7 @@ impl Fop { tracing::debug!("Received CLCW: {:?}", clcw); let farm_state = FarmState { next_expected_fsn: clcw.report_value(), - _lockout: clcw.lockout() != 0, + lockout: clcw.lockout() != 0, _wait: clcw.wait() != 0, retransmit: clcw.retransmit() != 0, }; @@ -204,14 +203,11 @@ impl Fop { }; match &mut self.state { - FopState::Initial => { - // do nothing - } - FopState::Initializing { expected_nr } => { - if farm_state.next_expected_fsn == *expected_nr { + FopState::Initial { expected_nr } => { + if Some(farm_state.next_expected_fsn) == *expected_nr && !farm_state.lockout { tracing::info!("FOP initialized"); self.state = FopState::Active(ActiveState { - next_fsn: *expected_nr, + next_fsn: farm_state.next_expected_fsn, sent_queue: VecDeque::new(), }); } @@ -241,6 +237,38 @@ impl Fop { } } } + + if !farm_state.lockout { + return Ok(()); + } + + //lockout + let mut canceled_frames = VecDeque::new(); + match &mut self.state { + FopState::Initial { .. } => { + // do nothing + } + FopState::Active(state) => { + canceled_frames.append(&mut state.sent_queue); + self.state = FopState::Initial { + expected_nr: Some(state.next_fsn), + }; + } + FopState::Retransmit(state) => { + canceled_frames.append(&mut state.retransmit_sent_queue); + canceled_frames.append(&mut state.retransmit_wait_queue); + self.state = FopState::Initial { + expected_nr: Some(state.next_fsn), + }; + } + } + + for frame in canceled_frames { + self.event_sender + .send(FrameEvent::Cancel(frame.frame.id)) + .ok(); + } + Ok(()) } @@ -248,13 +276,10 @@ impl Fop { tracing::info!("Setting VR to {}", vr); let mut canceled_frames = VecDeque::new(); match &mut self.state { - FopState::Initializing { .. } => { + FopState::Initial { .. } => { // forget the previous setvr command // do nothing } - FopState::Initial => { - // do nothing - } FopState::Active(state) => { canceled_frames.append(&mut state.sent_queue); } @@ -270,7 +295,9 @@ impl Fop { .ok(); } - self.state = FopState::Initializing { expected_nr: vr }; + self.state = FopState::Initial { + expected_nr: Some(vr), + }; let frame = Frame { //TODO: manage BC retransmission and frame id for setvr command //id: self.next_frame_id, @@ -283,6 +310,19 @@ impl Fop { Some(frame) } + pub(crate) fn unlock(&mut self) -> Option { + let frame = Frame { + //TODO: manage BC retransmission and frame id for setvr command + //id: self.next_frame_id, + id: 0, + frame_type: tc::sync_and_channel_coding::FrameType::TypeBC, + // TODO: frame number of setvr command??? + sequence_number: 0, + data_field: vec![0x00], + }; + Some(frame) + } + pub(crate) fn send_ad(&mut self, data_field: Vec) -> Option> { let state = match &mut self.state { FopState::Active(state) => state, diff --git a/tmtc-c2a/src/satellite.rs b/tmtc-c2a/src/satellite.rs index c00a580e..8247409e 100644 --- a/tmtc-c2a/src/satellite.rs +++ b/tmtc-c2a/src/satellite.rs @@ -375,6 +375,33 @@ impl gaia_tmtc::broker::FopCommandService //transmitter. } + async fn send_unlock(&mut self) { + let frame = { + let mut fop = self.fop.lock().await; + let frame = fop.unlock(); + match frame { + Some(frame) => frame, + None => { + //TODO: return error? + return; + } + } + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + //transmitter. + } + async fn send_ad_command(&mut self, tco: Tco) -> Result { let Some(fat_schema) = self.registry.lookup(&tco.name) else { return Err(anyhow!("unknown command: {}", tco.name)); From f5ddbc04047b33c2455211b4e232aa1fe35e9b81 Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Wed, 20 Nov 2024 09:44:13 +0900 Subject: [PATCH 5/6] broker: get fop state --- gaia-stub/proto/broker.proto | 14 ++++++++++++ gaia-tmtc/src/broker.rs | 44 ++++++++++++++++++++++++++++++++++++ tmtc-c2a/src/fop1.rs | 24 +++++++++++++++----- tmtc-c2a/src/satellite.rs | 17 ++++++++++++++ 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/gaia-stub/proto/broker.proto b/gaia-stub/proto/broker.proto index b26d734c..f18d39b9 100644 --- a/gaia-stub/proto/broker.proto +++ b/gaia-stub/proto/broker.proto @@ -16,6 +16,7 @@ service Broker { rpc PostUnlock(PostUnlockRequest) returns (PostUnlockResponse); rpc PostADCommand(PostADCommandRequest) returns (PostADCommandResponse); rpc SubscribeFopFrameEvents(SubscribeFopFrameEventsRequest) returns (stream FopFrameEvent); + rpc GetFopState(GetFopStateRequest) returns (GetFopStateResponse); } message PostCommandRequest { @@ -91,3 +92,16 @@ message FopFrameEvent { }; EventType event_type = 2; } + +message GetFopStateRequest {} + +message GetFopStateResponse { + bool received_clcw = 1; + bool lockout_flag = 2; + bool wait_flag = 3; + bool retransmit_flag = 4; + uint32 next_expected_sequence_number = 5; + + bool has_next_sequence_number = 11; + uint32 next_sequence_number = 12; +} diff --git a/gaia-tmtc/src/broker.rs b/gaia-tmtc/src/broker.rs index 3d83a7a4..7d97c184 100644 --- a/gaia-tmtc/src/broker.rs +++ b/gaia-tmtc/src/broker.rs @@ -43,6 +43,19 @@ pub enum FopFrameEvent { Cancel(u64), } +#[derive(Default)] +pub struct ClcwInfo { + pub lockout: bool, + pub wait: bool, + pub retransmit: bool, + pub next_expected_fsn: u8, +} + +pub struct FopState { + pub last_clcw: Option, + pub next_fsn: Option, +} + #[async_trait] pub trait FopCommandService { async fn send_set_vr(&mut self, value: u8); @@ -54,6 +67,8 @@ pub trait FopCommandService { async fn subscribe_frame_events( &mut self, ) -> Result + Send>>>; + + async fn get_fop_state(&mut self) -> Result; } #[tonic::async_trait] @@ -225,4 +240,33 @@ where }); Ok(Response::new(stream.boxed())) } + + #[tracing::instrument(skip(self))] + async fn get_fop_state( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + let state = self + .fop_command_service + .lock() + .await + .get_fop_state() + .await + .map_err(|_| Status::internal("failed to get fop state"))?; + let received_clcw = state.last_clcw.is_some(); + let clcw = state.last_clcw.unwrap_or_default(); + + let resp = GetFopStateResponse { + received_clcw, + lockout_flag: clcw.lockout, + wait_flag: clcw.wait, + retransmit_flag: clcw.retransmit, + next_expected_sequence_number: clcw.next_expected_fsn as _, + + has_next_sequence_number: state.next_fsn.is_some(), + next_sequence_number: state.next_fsn.unwrap_or_default() as _, + }; + + Ok(Response::new(resp)) + } } diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs index 22cf2569..21df3b13 100644 --- a/tmtc-c2a/src/fop1.rs +++ b/tmtc-c2a/src/fop1.rs @@ -33,11 +33,11 @@ fn remove_acknowledged_frames( } #[derive(Clone, Copy)] -struct FarmState { - next_expected_fsn: u8, - lockout: bool, - _wait: bool, - retransmit: bool, +pub(crate) struct FarmState { + pub(crate) next_expected_fsn: u8, + pub(crate) lockout: bool, + pub(crate) wait: bool, + pub(crate) retransmit: bool, } enum FopState { @@ -182,6 +182,18 @@ impl Fop { } } + pub(crate) fn last_received_farm_state(&self) -> Option<&FarmState> { + self.last_received_farm_state.as_ref() + } + + pub(crate) fn next_fsn(&self) -> Option { + match &self.state { + FopState::Initial { expected_nr } => *expected_nr, + FopState::Active(state) => Some(state.next_fsn), + FopState::Retransmit(state) => Some(state.next_fsn), + } + } + pub(crate) fn subscribe_frame_events(&self) -> broadcast::Receiver { self.event_sender.subscribe() } @@ -191,7 +203,7 @@ impl Fop { let farm_state = FarmState { next_expected_fsn: clcw.report_value(), lockout: clcw.lockout() != 0, - _wait: clcw.wait() != 0, + wait: clcw.wait() != 0, retransmit: clcw.retransmit() != 0, }; self.last_received_farm_state = Some(farm_state); diff --git a/tmtc-c2a/src/satellite.rs b/tmtc-c2a/src/satellite.rs index 8247409e..3a4583c2 100644 --- a/tmtc-c2a/src/satellite.rs +++ b/tmtc-c2a/src/satellite.rs @@ -458,4 +458,21 @@ impl gaia_tmtc::broker::FopCommandService }); Ok(stream.boxed()) } + + async fn get_fop_state(&mut self) -> Result { + let fop = self.fop.lock().await; + let last_clcw = fop + .last_received_farm_state() + .map(|s| gaia_tmtc::broker::ClcwInfo { + lockout: s.lockout, + wait: s.wait, + retransmit: s.retransmit, + next_expected_fsn: s.next_expected_fsn as _, + }); + let next_fsn = fop.next_fsn(); + Ok(gaia_tmtc::broker::FopState { + last_clcw, + next_fsn, + }) + } } From 78e291df2a1a8d41b6437da678b5143289fb66fa Mon Sep 17 00:00:00 2001 From: KOBAYASHI Kazuhiro Date: Mon, 20 Jan 2025 14:21:31 +0900 Subject: [PATCH 6/6] change signature of FopCommandService --- gaia-tmtc/src/broker.rs | 12 ++++++------ tmtc-c2a/src/satellite.rs | 19 ++++++++++--------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/gaia-tmtc/src/broker.rs b/gaia-tmtc/src/broker.rs index 7d97c184..8b3a53b8 100644 --- a/gaia-tmtc/src/broker.rs +++ b/gaia-tmtc/src/broker.rs @@ -58,17 +58,17 @@ pub struct FopState { #[async_trait] pub trait FopCommandService { - async fn send_set_vr(&mut self, value: u8); + async fn send_set_vr(&self, value: u8); - async fn send_unlock(&mut self); + async fn send_unlock(&self); - async fn send_ad_command(&mut self, tco: Tco) -> Result; + async fn send_ad_command(&self, tco: Tco) -> Result; async fn subscribe_frame_events( - &mut self, - ) -> Result + Send>>>; + &self, + ) -> Result + Send + Sync>>>; - async fn get_fop_state(&mut self) -> Result; + async fn get_fop_state(&self) -> Result; } #[tonic::async_trait] diff --git a/tmtc-c2a/src/satellite.rs b/tmtc-c2a/src/satellite.rs index 3a4583c2..c25e7504 100644 --- a/tmtc-c2a/src/satellite.rs +++ b/tmtc-c2a/src/satellite.rs @@ -348,7 +348,7 @@ impl FopCommandService { impl gaia_tmtc::broker::FopCommandService for FopCommandService { - async fn send_set_vr(&mut self, vr: u8) { + async fn send_set_vr(&self, vr: u8) { let frame = { let mut fop = self.fop.lock().await; let frame = fop.set_vr(vr); @@ -375,7 +375,7 @@ impl gaia_tmtc::broker::FopCommandService //transmitter. } - async fn send_unlock(&mut self) { + async fn send_unlock(&self) { let frame = { let mut fop = self.fop.lock().await; let frame = fop.unlock(); @@ -402,7 +402,7 @@ impl gaia_tmtc::broker::FopCommandService //transmitter. } - async fn send_ad_command(&mut self, tco: Tco) -> Result { + async fn send_ad_command(&self, tco: Tco) -> Result { let Some(fat_schema) = self.registry.lookup(&tco.name) else { return Err(anyhow!("unknown command: {}", tco.name)); }; @@ -440,11 +440,12 @@ impl gaia_tmtc::broker::FopCommandService } async fn subscribe_frame_events( - &mut self, - ) -> Result + Send>>> { - use futures::StreamExt; + &self, + ) -> Result + Send + Sync>>> + { + use tokio_stream::StreamExt; let rx = self.fop.lock().await.subscribe_frame_events(); - let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|e| async { + let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|e| { use crate::fop1::FrameEvent; use gaia_tmtc::broker::FopFrameEvent; let e = e.ok()?; @@ -456,10 +457,10 @@ impl gaia_tmtc::broker::FopCommandService }; Some(e) }); - Ok(stream.boxed()) + Ok(Box::pin(stream)) } - async fn get_fop_state(&mut self) -> Result { + async fn get_fop_state(&self) -> Result { let fop = self.fop.lock().await; let last_clcw = fop .last_received_farm_state()