diff --git a/Cargo.toml b/Cargo.toml index 0abd498..e09bad0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,14 +4,14 @@ name = "hollywood" description = "hollywood actor framework" edition = "2021" include = [ - "**/*.rs", - "Cargo.toml", + "**/*.rs", + "Cargo.toml", ] -license = "Apache-2.0" keywords = ["actor", "compute", "graph", "pipeline"] +license = "Apache-2.0" readme = "README.md" repository = "https://github.com/farm-ng/hollywood/" -version = "0.3.0" +version = "0.4.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -19,10 +19,13 @@ version = "0.3.0" async-trait = "0.1.51" drawille = "0.3.0" grid = "0.13.0" -hollywood_macros = { version = "0.3.0", path = "hollywood_macros" } +hollywood_macros = {version = "0.4.0", path = "hollywood_macros"} +# hollywood intends to use only very basic features of nalgebra, hence +# future versions of nalgebra before the major < 1.0 release are likely to work +nalgebra = ">= 0.32, <1.0" petgraph = "0.6.3" rand = "0.8.4" rand_distr = "0.4.3" # executor feature needed -tokio = { version = "1.28.0", features = ["full"] } +tokio = {version = "1.28.0", features = ["full"]} tokio-stream = "0.1.14" diff --git a/examples/moving_average.rs b/examples/moving_average.rs index bccd965..e614b5b 100644 --- a/examples/moving_average.rs +++ b/examples/moving_average.rs @@ -3,7 +3,9 @@ use hollywood::actors::{Periodic, Printer}; use hollywood::compute::Context; use hollywood::core::{FromPropState, NullState}; -use hollywood::example_actors::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState}; +use hollywood::example_actors::moving_average::{ + MovingAverage, MovingAverageProp, MovingAverageState, +}; /// pub async fn run_moving_average_example() { diff --git a/examples/nudge.rs b/examples/nudge.rs new file mode 100644 index 0000000..0d1eb83 --- /dev/null +++ b/examples/nudge.rs @@ -0,0 +1,34 @@ +use hollywood::actors::printer::PrinterProp; +use hollywood::actors::{Nudge, Printer}; +use hollywood::compute::Context; +use hollywood::core::*; + +pub async fn run_tick_print_example() { + let pipeline = Context::configure(&mut |context| { + let mut nudge = Nudge::::new(context, "nudge".to_owned()); + let mut nudge_printer = Printer::::from_prop_and_state( + context, + PrinterProp { + topic: "nudge: ".to_string(), + }, + NullState::default(), + ); + nudge + .outbound + .nudge + .connect(context, &mut nudge_printer.inbound.printable); + }); + + pipeline.print_flow_graph(); + pipeline.run().await; +} + +fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + run_tick_print_example().await; + }) +} diff --git a/examples/one_dim_robot.rs b/examples/one_dim_robot.rs index 46aeaa6..68fed1a 100644 --- a/examples/one_dim_robot.rs +++ b/examples/one_dim_robot.rs @@ -1,9 +1,11 @@ use hollywood::actors::printer::PrinterProp; +use hollywood::actors::zip::ZipPair; use hollywood::actors::Periodic; use hollywood::actors::Printer; +use hollywood::actors::Zip3; use hollywood::compute::Context; use hollywood::core::*; -use hollywood::example_actors::one_dim_robot::draw::DrawState; + use hollywood::example_actors::one_dim_robot::filter::FilterState; use hollywood::example_actors::one_dim_robot::{ DrawActor, Filter, NamedFilterState, Robot, Sim, SimState, Stamped, @@ -11,13 +13,14 @@ use hollywood::example_actors::one_dim_robot::{ async fn run_robot_example() { let pipeline = Context::configure(&mut |context| { - let mut timer = Periodic::new_with_period(context, 0.25); + let mut timer = Periodic::new_with_period(context, 0.1); let mut sim = Sim::from_prop_and_state( context, NullProp {}, SimState { - shutdown_time: 10.0, + shutdown_time: 15.0, time: 0.0, + seq: 0, true_robot: Robot { position: -2.0, velocity: 0.4, @@ -40,8 +43,9 @@ async fn run_robot_example() { NullState::default(), ); - let mut draw_actor = - DrawActor::from_prop_and_state(context, NullProp {}, DrawState::default()); + let mut zip = Zip3::new_default_init_state(context, NullProp {}); + + let mut draw = DrawActor::new_default_init_state(context, NullProp {}); timer .outbound @@ -54,28 +58,47 @@ async fn run_robot_example() { sim.outbound .noisy_range .connect(context, &mut filter.inbound.noisy_range); - sim.outbound - .true_robot - .connect(context, &mut draw_actor.inbound.true_pos); - sim.outbound - .true_range - .connect(context, &mut draw_actor.inbound.true_range); + sim.outbound.true_robot.connect_with_adapter( + context, + |x| ZipPair { + key: x.seq, + value: x, + }, + &mut zip.inbound.item0, + ); + sim.outbound.true_range.connect_with_adapter( + context, + |x| ZipPair { + key: x.seq, + value: x, + }, + &mut zip.inbound.item1, + ); sim.outbound .true_robot .connect(context, &mut truth_printer.inbound.printable); - - sim.request.ping_pong.connect(context, &mut filter.inbound.ping_pong_request); + sim.request + .ping_pong + .connect(context, &mut filter.inbound.ping_pong_request); context.register_cancel_requester(&mut sim.outbound.cancel_request); filter .outbound .updated_state .connect(context, &mut filter_state_printer.inbound.printable); - filter - .outbound - .updated_state - .connect(context, &mut draw_actor.inbound.filter_est); + filter.outbound.updated_state.connect_with_adapter( + context, + |x| ZipPair { + key: x.state.seq, + value: x, + }, + &mut zip.inbound.item2, + ); + + zip.outbound + .zipped + .connect(context, &mut draw.inbound.zipped); }); pipeline.print_flow_graph(); diff --git a/examples/print_ticks.rs b/examples/print_ticks.rs index 02f2164..3a2a62f 100644 --- a/examples/print_ticks.rs +++ b/examples/print_ticks.rs @@ -14,11 +14,11 @@ pub async fn run_tick_print_example() { }, NullState::default(), ); - timer - .outbound - .time_stamp - .connect(context, &mut time_printer.inbound.printable); - + timer.outbound.time_stamp.connect_with_adapter( + context, + |t| 10.0 * t, + &mut time_printer.inbound.printable, + ); }); pipeline.print_flow_graph(); diff --git a/examples/zip.rs b/examples/zip.rs new file mode 100644 index 0000000..a827b8b --- /dev/null +++ b/examples/zip.rs @@ -0,0 +1,55 @@ +use hollywood::actors::printer::PrinterProp; +use hollywood::actors::zip::{Tuple2, ZipPair}; +use hollywood::actors::{periodic, Printer, Zip2}; +use hollywood::compute::Context; +use hollywood::core::*; + +pub async fn run_tick_print_example() { + let pipeline = Context::configure(&mut |context| { + let mut periodic = periodic::Periodic::new_with_period(context, 1.0); + + let mut zip = + Zip2::::new_default_init_state(context, NullProp::default()); + let mut printer = Printer::>::from_prop_and_state( + context, + PrinterProp { + topic: "zipped".to_string(), + }, + NullState::default(), + ); + + periodic.outbound.time_stamp.connect_with_adapter( + context, + |t| ZipPair { + key: t as u64, + value: "hello".to_string(), + }, + &mut zip.inbound.item0, + ); + periodic.outbound.time_stamp.connect_with_adapter( + context, + |t| ZipPair { + key: 2 * t as u64, + value: "world".to_string(), + }, + &mut zip.inbound.item1, + ); + + zip.outbound + .zipped + .connect(context, &mut printer.inbound.printable); + }); + + pipeline.print_flow_graph(); + pipeline.run().await; +} + +fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + run_tick_print_example().await; + }) +} diff --git a/hollywood_macros/Cargo.toml b/hollywood_macros/Cargo.toml index dd76754..a8d0f68 100644 --- a/hollywood_macros/Cargo.toml +++ b/hollywood_macros/Cargo.toml @@ -4,14 +4,14 @@ name = "hollywood_macros" description = "Macros for the Hollywood actor framework" edition = "2021" include = [ - "**/*.rs", - "Cargo.toml", + "**/*.rs", + "Cargo.toml", ] -license = "Apache-2.0" keywords = ["actor", "compute", "graph", "pipeline"] +license = "Apache-2.0" readme = "../README.md" repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros" -version = "0.3.0" +version = "0.4.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] @@ -19,6 +19,6 @@ proc-macro = true [dependencies] convert_case = "0.6.0" +proc-macro2 = "1.0" quote = "1.0.9" -syn = { version = "2.0.18", features = ["full"] } - +syn = {version = "2.0.18", features = ["full"]} diff --git a/hollywood_macros/src/actors.rs b/hollywood_macros/src/actors.rs new file mode 100644 index 0000000..d8390f9 --- /dev/null +++ b/hollywood_macros/src/actors.rs @@ -0,0 +1 @@ +pub mod zip; \ No newline at end of file diff --git a/hollywood_macros/src/actors/zip.rs b/hollywood_macros/src/actors/zip.rs new file mode 100644 index 0000000..e46a423 --- /dev/null +++ b/hollywood_macros/src/actors/zip.rs @@ -0,0 +1,563 @@ +use proc_macro2::TokenStream; +use quote::{format_ident, quote}; +use syn::{parse::Parse, parse::ParseStream, parse2, LitInt, Result}; + +struct ZipInput { + num_fields: usize, +} + +impl Parse for ZipInput { + fn parse(input: ParseStream) -> Result { + let num_fields: LitInt = input.parse()?; + Ok(ZipInput { + num_fields: num_fields.base10_parse()?, + }) + } +} + +pub(crate) fn tuple_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let tuple_struct = format_ident!("Tuple{}", num_fields); + + let field_seq = (0..num_fields).map(|i| format_ident!("item{}", i)); + let field_seq2 = field_seq.clone(); + let field_seq3 = field_seq.clone(); + + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + let type_seq2 = type_seq.clone(); + let type_seq3 = type_seq.clone(); + let type_with_bounds_seq = (0..num_fields).map(|i| { + let ident = format_ident!("Item{}", i); + quote! { #ident: Default + Clone + Debug + Sync + Send + 'static + + std::marker::Sync+ std::marker::Send} + }); + + let expanded = quote! { + #[derive(Default, Clone, Debug)] + + /// A tuple struct X fields. + /// + /// Used to send merged items from X inbound channels to one outbound channel. + pub struct #tuple_struct { + /// Key to associate message from different inbound channels with. + pub key: Key, + #( + /// The value to be zipped. + pub #field_seq: #type_seq3 + ),* + } + impl< + Key: Default + + Debug + + Clone + + Display + + PartialEq + + Eq + + PartialOrd + + Ord + + Sync + + Send + + 'static, + #( #type_with_bounds_seq ),* + > Display for #tuple_struct + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + concat!( "key: {}", #( stringify!(, #field_seq2), ": {:?}" ),* ), + self.key, #( self.#field_seq3), *) + } + } + }; + + TokenStream::from(expanded) +} + +pub(crate) fn zip_outbound_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let outbound_struct = format_ident!("Zip{}Outbound", num_fields); + let tuple_struct = format_ident!("Tuple{}", num_fields); + + let params_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + let params_seq2 = params_seq.clone(); + let params_seq3 = params_seq.clone(); + let params_seq4 = params_seq.clone(); + let params_seq5 = params_seq.clone(); + + let params_with_bounds_seq: Vec<_> = (0..num_fields) + .map(|i| { + let ident = format_ident!("Item{}", i); + quote! { #ident: Default + Clone + Debug + Sync + Send + + 'static +std::marker::Sync+ std::marker::Send} + }) + .collect(); + let params_with_bounds_seq2 = params_with_bounds_seq.clone(); + let params_with_bounds_seq3 = params_with_bounds_seq.clone(); + + let expanded = quote! { + /// ZipX outbound hub + /// + /// Contains one outbound channel of the merged inbound channels. + pub struct #outbound_struct { + /// Outbound channel of the merged inbound channels. + pub zipped: OutboundChannel<#tuple_struct>, + } + + impl + Activate for #outbound_struct + { + fn extract(&mut self) -> Self { + Self { + zipped: self.zipped.extract(), + } + } + + fn activate(&mut self) { + self.zipped.activate(); + } + } + + impl + OutboundHub for #outbound_struct + { + fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { + Self { + zipped: OutboundChannel::<#tuple_struct>::new( + context, + "zipped".to_owned(), + actor_name, + ), + } + } + } + }; + + TokenStream::from(expanded) +} + +pub(crate) fn zip_state_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let state_struct = format_ident!("Zip{}State", num_fields); + + let heap_item_seq = (0..num_fields).map(|i| { + let heap_item = format_ident!("item{}_heap", i); + let item = format_ident!("Item{}", i); // Corrected this line + let pair = quote! { ZipPair<#i, Key, #item> }; + quote! { + /// Heap for the Xth inbound channel. + pub #heap_item: std::collections::BinaryHeap> + } + }); + + let item_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + + let expanded = quote! { + + /// State of the zip actor with X inbound channels. + #[derive(Clone, Debug, Default)] + pub struct #state_struct + { + #( #heap_item_seq ),* + } + }; + + TokenStream::from(expanded) +} + +pub(crate) fn zip_inbound_message_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + let state_struct = format_ident!("Zip{}State", num_fields); + let outbound_struct = format_ident!("Zip{}Outbound", num_fields); + + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + let type_seq2 = type_seq.clone(); + let type_seq3 = type_seq.clone(); + let type_seq4 = type_seq.clone(); + let type_seq5 = type_seq.clone(); + let type_seq6 = type_seq.clone(); + let type_with_bounds_seq: Vec<_> = (0..num_fields) + .map(|i| { + let ident = format_ident!("Item{}", i); + quote! { #ident: Default + Clone + Debug + Sync + Send + + 'static} + }) + .collect(); + + let i_seq: Vec<_> = (0..num_fields) + .map(|i| { + quote! { #i } + }) + .collect(); + + let msg_new_impl_seq = (0..num_fields).map(|i| { + let item = format_ident!("Item{}", i); + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + + quote! { + impl< + Key: Default + + Clone + + Debug + + PartialEq + + Eq + + PartialOrd + + Ord + + Debug + + Clone + + Sync + + Send + + 'static, + #( #type_with_bounds_seq ),* + > InboundMessageNew> + for #inbound_message_enum + { + fn new(_inbound_name: String, msg: ZipPair<#i, Key, #item>) -> Self { + #inbound_message_enum::#item(msg) + } + } + } + }); + + let expand = quote! { + + /// Inbound message for the zip actor. + #[derive(Clone,Debug)] + pub enum #inbound_message_enum< + Key: Ord + Clone + Debug + Sync + Send + 'static, + #( #type_with_bounds_seq ),* + > { + #( + /// Inbound message for the Xth inbound channel. + #type_seq(ZipPair<#i_seq, Key, #type_seq>) + ),* + } + + #(#msg_new_impl_seq)* + + impl< + Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + + Sync + Send + 'static, + #( #type_with_bounds_seq ),* + > InboundMessage for #inbound_message_enum + { + type Prop = NullProp; + type State = #state_struct; + type OutboundHub = #outbound_struct; + type RequestHub = NullRequest; + + fn inbound_channel(&self) -> String { + match self { + #( #inbound_message_enum::#type_seq5(_) => stringify!(#type_seq6).to_owned(), )* + } + } + } + }; + + TokenStream::from(expand) +} + +pub(crate) fn zip_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let zip_struct = format_ident!("Zip{}", num_fields); + let state_struct = format_ident!("Zip{}State", num_fields); + let inbound_struct = format_ident!("Zip{}Inbound", num_fields); + let outbound_struct = format_ident!("Zip{}Outbound", num_fields); + let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + let type_seq2 = type_seq.clone(); + let type_seq3 = type_seq.clone(); + let type_seq4 = type_seq.clone(); + let type_seq5 = type_seq.clone(); + let type_seq6 = type_seq.clone(); + let type_seq7 = type_seq.clone(); + let type_seq8 = type_seq.clone(); + let type_seq9 = type_seq.clone(); + let type_seq10 = type_seq.clone(); + let type_seq11 = type_seq.clone(); + let type_seq12 = type_seq.clone(); + + let type_with_bounds_seq: Vec<_> = (0..num_fields) + .map(|i| { + let item_type = format_ident!("Item{}", i); + quote! { #item_type: Default + Clone + Debug + Sync + Send + + 'static} + }) + .collect(); + + let expanded = quote! { + + /// ZipX actor, which zips X inbound channels into one outbound channel. + pub type #zip_struct = Actor< + NullProp, + #inbound_struct, + #state_struct, + #outbound_struct, + NullRequest, + >; + + impl< + Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + + Sync + Send + 'static, + #( #type_with_bounds_seq ),* + > + FromPropState< + NullProp, + #inbound_struct, + #state_struct, + #outbound_struct, + #inbound_message_enum, + NullRequest, + DefaultRunner< + NullProp, + #inbound_struct, + #state_struct, + #outbound_struct, + NullRequest, + >, + > for #zip_struct + { + fn name_hint(_prop: &NullProp) -> String { + stringify!(#zip_struct).to_owned() + } + } + + }; + + TokenStream::from(expanded) +} + +pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let inbound_struct = format_ident!("Zip{}Inbound", num_fields); + let state_struct = format_ident!("Zip{}State", num_fields); + let outbound_struct = format_ident!("Zip{}Outbound", num_fields); + let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + let type_seq4 = type_seq.clone(); + let type_seq5 = type_seq.clone(); + let type_seq6 = type_seq.clone(); + let type_seq7 = type_seq.clone(); + let type_seq8 = type_seq.clone(); + let type_seq9 = type_seq.clone(); + let type_seq10 = type_seq.clone(); + let type_seq12 = type_seq.clone(); + + let item_seq = (0..num_fields).map(|i| format_ident!("item{}", i)); + let item_seq2 = item_seq.clone(); + let item_seq3 = item_seq.clone(); + let item_seq4 = item_seq.clone(); + let item_seq5 = item_seq.clone(); + + let type_with_bounds_seq: Vec<_> = (0..num_fields) + .map(|i| { + let ident = format_ident!("Item{}", i); + quote! { #ident: Default + Clone + Debug + Sync + Send + + 'static} + }) + .collect(); + + let channel: Vec<_> = (0..num_fields) + .map(|i| { + let item_type = format_ident!("Item{}", i); + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + + quote! { + InboundChannel, + #inbound_message_enum> + } + }) + .collect(); + + let expanded = quote! { + + /// Inbound hub for the zip actor. + #[derive(Clone,Debug)] + pub struct #inbound_struct< + Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + + Sync + Send + 'static, + #( #type_with_bounds_seq ),* + > { + #( + /// Inbound channel for the Xth inbound channel. + pub #item_seq5: #channel + ),* + } + + impl< + Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + + Sync + Send + 'static, + #( #type_with_bounds_seq ),* + > + InboundHub< + NullProp, + #state_struct, + #outbound_struct, + NullRequest, + #inbound_message_enum, + > for #inbound_struct + { + fn from_builder( + builder: &mut crate::core::ActorBuilder< + NullProp, + #state_struct, + #outbound_struct, + NullRequest, + #inbound_message_enum, + >, + actor_name: &str, + ) -> Self { + #( + let #item_seq = InboundChannel::new( + builder.context, + actor_name, + &builder.sender, + stringify!(#type_seq12).to_owned(), + ); + builder + .forward + .insert(#item_seq2.name.clone(), Box::new(#item_seq3.clone())); + )* + + Self { #( #item_seq4 ),* } + } + } + }; + + TokenStream::from(expanded) +} + +pub(crate) fn zip_onmessage_n_impl(input: TokenStream) -> TokenStream { + let ZipInput { num_fields } = match parse2(input) { + Ok(input) => input, + Err(err) => return TokenStream::from(err.to_compile_error()), + }; + + let tuple_struct = format_ident!("Tuple{}", num_fields); + let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + + let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); + + let type_with_bounds_seq: Vec<_> = (0..num_fields) + .map(|i| { + let ident = format_ident!("Item{}", i); + quote! { + #ident: Default + Clone + Debug + Sync + Send + 'static + } + }) + .collect(); + + let front_seq = (0..num_fields).map(|i| format_ident!("front{}", i)); + + let item_seq = (0..num_fields).map(|i| format_ident!("item{}", i)); + let item_seq2 = item_seq.clone(); + let item_seq3 = item_seq.clone(); + + let item_heap = (0..num_fields).map(|i| format_ident!("item{}_heap", i)); + let item_heap2 = item_heap.clone(); + let item_heap3 = item_heap.clone(); + + let key_seq = (0..num_fields).map(|i| format_ident!("key{}", i)); + let key_seq2 = key_seq.clone(); + let key_seq3 = key_seq.clone(); + let key_seq4 = key_seq.clone(); + let key_seq5 = key_seq.clone(); + + let case: Vec<_> = (0..num_fields) + .map(|i| { + let item_type = format_ident!("Item{}", i); + + let item_heap = format_ident!("item{}_heap", i); + let item_heap_seq = (0..num_fields).map(|i| format_ident!("item{}_heap", i)); + + quote! { + #inbound_message_enum::#item_type(msg) => { + state.#item_heap.push(Reverse(msg)); + loop { + if #( state.#item_heap_seq.len() == 0 )||* + { + break; + } + check_and_send(state); + } + } + } + }) + .collect(); + + let expanded = quote! { + + impl OnMessage for #inbound_message_enum + { + fn on_message( + self, + _prop: &Self::Prop, + state: &mut Self::State, + outbound: &Self::OutboundHub, + _request: &Self::RequestHub) + { + let check_and_send = |s: &mut Self::State| { + #( + let #front_seq = s.#item_heap.peek().unwrap(); + let #key_seq2 = #front_seq.0.key.clone(); + )* + + let mut min = key0.clone(); + #( + min = std::cmp::min(#key_seq3.clone(), min.clone()); + )* + + if #(#key_seq4 == min) && * { + #(let #item_seq = s.#item_heap2.pop().unwrap();)* + + outbound.zipped.send(#tuple_struct { + key: min, + #(#item_seq2 : #item_seq3.0.value),* + }); + return; + } + #( + if #key_seq5 == min { + s.#item_heap3.pop(); + } + )* + }; + + match self { + #( #case )* + } + } + } + }; + + TokenStream::from(expanded) +} diff --git a/hollywood_macros/src/core.rs b/hollywood_macros/src/core.rs new file mode 100644 index 0000000..4600252 --- /dev/null +++ b/hollywood_macros/src/core.rs @@ -0,0 +1,492 @@ +use convert_case::{Case, Casing}; +use proc_macro2::TokenStream; +use quote::quote; +use syn::{ + parse::Parse, parse::ParseStream, parse2, Error, Fields, Generics, Ident, Item, ItemEnum, + ItemStruct, Path, PathArguments, Result, Token, Type, TypePath, +}; + +/// Documentation is in the hollywood crate. +pub(crate) fn actor_outputs_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let ast = match parse2::(item) { + Ok(ast) => ast, + Err(err) => return err.to_compile_error(), + }; + let struct_name = &ast.ident; + let generics = &ast.generics; + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + + let fields = match &ast.fields { + Fields::Named(fields_named) => &fields_named.named, + _ => panic!("`generate_outputs_trait` can only be used with structs with named fields"), + }; + + let output_assignments = fields.iter().map(|field| { + let field_name = &field.ident; + if let Some(inner_ty) = is_output_type(&field.ty) { + // if the field type is OutboundChannel, use OutboundChannel:: + quote! { + #field_name: OutboundChannel::<#inner_ty>::new( + context, + stringify!(#field_name).to_owned(), + actor_name, + ) + } + } else { + panic!("field type must be OutboundChannel."); + } + }); + + let output_extract = fields.iter().map(|field| { + let field_name = &field.ident; + + quote! { + #field_name: self.#field_name.extract() + } + }); + + let output_act = fields.iter().map(|field| { + let field_name = &field.ident; + + quote! { + self.#field_name.activate(); + } + }); + + let gen = quote! { + impl #impl_generics OutboundHub for #struct_name #ty_generics #where_clause { + fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { + Self { + #(#output_assignments),* + } + } + } + + impl #impl_generics Activate for #struct_name #ty_generics #where_clause { + fn extract(&mut self) -> Self { + Self { + #(#output_extract),* + } + } + + fn activate(&mut self) { + #(#output_act)* + } + } + + #ast + }; + + gen.into() +} + +// This function checks if the field's type is OutboundChannel and return T if it is +fn is_output_type(ty: &Type) -> Option<&Type> { + if let Type::Path(TypePath { + path: Path { segments, .. }, + .. + }) = ty + { + if segments.len() == 1 && segments[0].ident == "OutboundChannel" { + if let PathArguments::AngleBracketed(args) = &segments[0].arguments { + if args.args.len() == 1 { + if let syn::GenericArgument::Type(inner_ty) = args.args.first().unwrap() { + return Some(inner_ty); + } + } + } + } + } + None +} + +/// Documentation is in the hollywood crate. +pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let ast = match parse2::(item) { + Ok(ast) => ast, + Err(err) => return err.to_compile_error(), + }; + let struct_name = &ast.ident; + let generics = &ast.generics; + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + + let fields = match &ast.fields { + Fields::Named(fields_named) => &fields_named.named, + _ => panic!("`generate_outputs_trait` can only be used with structs with named fields"), + }; + + let request_assignments = fields.iter().map(|field| { + let field_name = &field.ident; + quote! { + #field_name: RequestChannel::new( + stringify!(#field_name).to_owned(), + actor_name, + sender, + ) + } + }); + + let request_extract = fields.iter().map(|field| { + let field_name = &field.ident; + + quote! { + #field_name: self.#field_name.extract() + } + }); + + let output_act = fields.iter().map(|field| { + let field_name = &field.ident; + + quote! { + self.#field_name.activate(); + } + }); + + let field0 = fields.first().expect("Request struct must have at least one field"); + let m_type = is_request_type(&field0.ty).unwrap()[2]; + + let gen = quote! { + impl #impl_generics RequestHub<#m_type> for #struct_name #ty_generics #where_clause { + fn from_parent_and_sender( + actor_name: &str, sender: &tokio::sync::mpsc::Sender<#m_type> + ) -> Self { + Self { + #(#request_assignments),* + } + } + } + + impl #impl_generics Activate for #struct_name #ty_generics #where_clause { + fn extract(&mut self) -> Self { + Self { + #(#request_extract),* + } + } + + fn activate(&mut self) { + #(#output_act)* + } + } + + #ast + }; + + gen.into() +} + +// This function checks if the field's type is RequestChannel +fn is_request_type(ty: &Type) -> Option<[&Type; 3]> { + if let Type::Path(TypePath { + path: Path { segments, .. }, + .. + }) = ty + { + if segments.len() == 1 && segments[0].ident == "RequestChannel" { + if let PathArguments::AngleBracketed(args) = &segments[0].arguments { + if args.args.len() == 3 { + let mut pop_iter = args.args.iter(); + if let syn::GenericArgument::Type(request_ty) = pop_iter.nth(0).unwrap() { + if let syn::GenericArgument::Type(reply_ty) = pop_iter.nth(0).unwrap() { + if let syn::GenericArgument::Type(m_ty) = pop_iter.nth(0).unwrap() { + return Some([request_ty, reply_ty, m_ty]); + } + } + } + } + } + } + } + None +} + +/// Documentation is in the hollywood crate. +pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream { + let ActorInbound { + struct_name, + prop_type, + state_type, + output_type, + request_type, + } = match parse2::(args) { + Ok(args) => args, + Err(err) => return err.to_compile_error(), + }; + let ast = match parse2::(inbound) { + Ok(ast) => ast, + Err(err) => return err.to_compile_error(), + }; + + let name = &ast.ident; + let generics = &ast.generics; + let fields = &ast.variants; + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + + let inbound = fields.iter().map(|variant| { + let variant_name = &variant.ident; + let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake); + let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span()); + let field_type = if let Fields::Unnamed(fields_unnamed) = &variant.fields { + &fields_unnamed.unnamed[0].ty + } else { + panic!("Enum variants must be tuples"); + }; + + let msg = format!( + "`{}` channel field - autogenerated by the [actor_inputs] macro.", + variant_name + ); + quote! { + #[doc = #msg] + pub #snake_case_variant_name: InboundChannel<#field_type, #name #ty_generics> + } + }); + + let match_arm = fields.iter().map(|variant| { + let variant_name = &variant.ident; + quote! { + #name::#variant_name(_) => { + stringify!(#variant_name).to_string() + } + } + }); + + let from_builder_inbounds = fields.iter().map(|variant| { + let variant_name = &variant.ident; + let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake); + let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span()); + + assert!( + generics.params.len() <= 1, + "Only zero or one generic parameter is supported, got {}", + generics.params.len() + ); + + let generic_ident = + if let Some(syn::GenericParam::Type(type_param)) = generics.params.first() { + // Extracts just the identifier of the type parameter (e.g., `T`) + Some(&type_param.ident) + } else { + None + }; + + let instantiation = if let Some(ident) = generic_ident { + // Use the extracted identifier directly + quote! { #name::#variant_name(#ident::default()) } + } else { + // When there are no generics + quote! { #name::#variant_name(Default::default()) } + }; + + quote! { + let #snake_case_variant_name = InboundChannel::new( + &mut builder.context, + actor_name.clone(), + &builder.sender, + #instantiation.inbound_channel(), + ); + builder.forward.insert( + #snake_case_variant_name.name.clone(), + Box::new(#snake_case_variant_name.clone()) + ); + } + }); + + let from_builder_init = fields.iter().map(|variant| { + let variant_name = &variant.ident; + let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake); + let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span()); + + quote! { + #snake_case_variant_name, + } + }); + + let gen = quote! { + #ast + + /// Auto-generated inbound hub for actor. + pub struct #struct_name #impl_generics #where_clause { + #(#inbound),* + } + + impl #impl_generics InboundMessage for #name #ty_generics #where_clause { + type Prop = #prop_type; + type State = #state_type; + type OutboundHub = #output_type; + type RequestHub = #request_type; + + fn inbound_channel(&self) -> String { + match self { + #(#match_arm),* + } + } + } + + impl #impl_generics InboundHub< + #prop_type, + #state_type, + #output_type, + #request_type, + #name #ty_generics> for #struct_name #ty_generics #where_clause + { + fn from_builder( + builder: &mut ActorBuilder< + #prop_type, + #state_type, + #output_type, + #request_type, + #name + #ty_generics + >, + actor_name: &str) -> Self + { + #(#from_builder_inbounds)* + + #struct_name { + #(#from_builder_init)* + } + } + } + + }; + + gen.into() +} + +struct ActorInbound { + struct_name: Ident, + prop_type: Ident, + state_type: Ident, + output_type: Ident, + request_type: Ident, +} + +impl Parse for ActorInbound { + fn parse(inbound: ParseStream) -> Result { + let struct_name: Ident = inbound.parse()?; + let _: Generics = inbound.parse()?; + let _: Token![,] = inbound.parse()?; + let content; + syn::braced!(content in inbound); + let prop_type: Ident = content.parse()?; + let _: Token![,] = content.parse()?; + let state_type: Ident = content.parse()?; + let _: Token![,] = content.parse()?; + let output_type: Ident = content.parse()?; + let _: Token![,] = content.parse()?; + let request_type: Ident = content.parse()?; + Ok(ActorInbound { + struct_name, + prop_type, + state_type, + output_type, + request_type, + }) + } +} + +struct ActorArgs { + message_type: Ident, +} + +impl Parse for ActorArgs { + fn parse(inbound_hub: ParseStream) -> Result { + let message_type: Ident = inbound_hub.parse()?; + Ok(ActorArgs { message_type }) + } +} + +/// Documentation is in the hollywood crate. +pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { + // parse inbound + let ActorArgs { message_type } = match parse2::(attr) { + Ok(args) => args, + Err(err) => return err.to_compile_error(), + }; + let inbound: Item = match parse2(item) { + Ok(inbound) => inbound, + Err(err) => return err.to_compile_error(), + }; + let inbound_clone = inbound.clone(); + + // Get actor name from the item + let actor_name = match inbound { + Item::Type(item) => item.ident, + _ => panic!("`actor` attribute can only be used with type aliases"), + }; + + let mut inbound_clone = inbound_clone.clone(); + let mut attrs = Vec::new(); + if let Item::Type(item_type) = &mut inbound_clone { + attrs.append(&mut item_type.attrs); + // ... + } + + let mut maybe_prop = None; + let mut maybe_inbounds = None; + let mut maybe_state = None; + let mut maybe_outputs = None; + let mut maybe_requests = None; + + if let Item::Type(item_type) = inbound_clone { + if let Type::Path(type_path) = *item_type.ty { + if type_path.path.segments.last().unwrap().ident != "Actor" { + return Error::new_spanned(&type_path, "Expected Actor<...>") + .to_compile_error() + .into(); + } + for segment in type_path.path.segments { + if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments { + if angle_bracketed_args.args.len() != 5 { + return Error::new_spanned( + &angle_bracketed_args, + concat!( + "Expected 5 type arguments:", + "Actor" + ), + ) + .to_compile_error() + .into(); + } + maybe_prop = Some(angle_bracketed_args.args[0].clone()); + maybe_inbounds = Some(angle_bracketed_args.args[1].clone()); + maybe_state = Some(angle_bracketed_args.args[2].clone()); + maybe_outputs = Some(angle_bracketed_args.args[3].clone()); + maybe_requests = Some(angle_bracketed_args.args[4].clone()); + } + } + } else { + return Error::new_spanned(&item_type.ty, "Expected a type path") + .to_compile_error() + .into(); + } + } else { + panic!("`actor` attribute can only be used with type aliases"); + } + + let prop = maybe_prop.unwrap(); + let inbound = maybe_inbounds.unwrap(); + let state_type = maybe_state.unwrap(); + let out = maybe_outputs.unwrap(); + let requests = maybe_requests.unwrap(); + + let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> }; + + let gen = quote! { + + /// + #( #attrs )* + pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>; + + impl FromPropState< + #prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type + > for #actor_name + { + fn name_hint(prop: &#prop) -> String { + stringify!(#actor_name).to_owned() + } + } + }; + + gen.into() +} diff --git a/hollywood_macros/src/lib.rs b/hollywood_macros/src/lib.rs index b32245a..d0f669e 100644 --- a/hollywood_macros/src/lib.rs +++ b/hollywood_macros/src/lib.rs @@ -2,352 +2,82 @@ //! Convenience macros for the Hollywood actor framework. +mod actors; +mod core; + extern crate proc_macro; -use convert_case::{Case, Casing}; use proc_macro::TokenStream; use quote::quote; -use syn::{ - parse::Parse, parse::ParseStream, parse_macro_input, Error, Fields, Ident, Item, ItemEnum, - ItemStruct, Path, PathArguments, Result, Token, Type, TypePath, -}; -/// Documentation is in the hollywood crate. +/// Documented in the root-level hollywood crate. #[proc_macro_attribute] -pub fn actor_outputs(_attr: TokenStream, item: TokenStream) -> TokenStream { - let ast = parse_macro_input!(item as ItemStruct); - - let struct_name = &ast.ident; - let fields = match &ast.fields { - Fields::Named(fields_named) => &fields_named.named, - _ => panic!("`generate_outputs_trait` can only be used with structs with named fields"), - }; - - let output_assignments = fields.iter().map(|field| { - let field_name = &field.ident; - if let Some(inner_ty) = is_output_type(&field.ty) { - // if the field type is OutboundChannel, use OutboundChannel:: - quote! { - #field_name: OutboundChannel::<#inner_ty>::new( - context, - stringify!(#field_name).to_owned(), - actor_name, - ) - } - } else { - panic!("field type must be OutboundChannel."); - } - }); - - let output_extract = fields.iter().map(|field| { - let field_name = &field.ident; - - quote! { - #field_name: self.#field_name.extract() - } - }); - - let output_act = fields.iter().map(|field| { - let field_name = &field.ident; - - quote! { - self.#field_name.activate(); - } - }); - - let gen = quote! { - #ast - - impl OutboundHub for #struct_name { - fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { - Self { - #(#output_assignments),* - } - } - } - - impl Morph for #struct_name { - fn extract(&mut self) -> Self { - Self { - #(#output_extract),* - } - } - - fn activate(&mut self) { - #(#output_act)* - } - } - - }; - - gen.into() +pub fn actor_outputs(attr: TokenStream, item: TokenStream) -> TokenStream { + core::actor_outputs_impl( + proc_macro2::TokenStream::from(attr), + proc_macro2::TokenStream::from(item), + ) + .into() } -// This function checks if the field's type is OutboundChannel and return T if it is -fn is_output_type(ty: &Type) -> Option<&Type> { - if let Type::Path(TypePath { - path: Path { segments, .. }, - .. - }) = ty - { - if segments.len() == 1 && segments[0].ident == "OutboundChannel" { - if let PathArguments::AngleBracketed(args) = &segments[0].arguments { - if args.args.len() == 1 { - if let syn::GenericArgument::Type(inner_ty) = args.args.first().unwrap() { - return Some(inner_ty); - } - } - } - } - } - None +/// Documented in the root-level hollywood crate. +#[proc_macro_attribute] +pub fn actor_requests(attr: TokenStream, item: TokenStream) -> TokenStream { + core::actor_requests_impl( + proc_macro2::TokenStream::from(attr), + proc_macro2::TokenStream::from(item), + ) + .into() } -/// Documentation is in the hollywood crate. +/// Documented in the root-level hollywood crate. #[proc_macro_attribute] pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream { - let args = parse_macro_input!(args as ActorInbound); - let ast = parse_macro_input!(inbound as ItemEnum); - - let name = &ast.ident; - let fields = &ast.variants; - - let struct_name = &args.struct_name; - let prop_type = &args.prop_type; - let state_type = &args.state_type; - let output_type = &args.output_type; - let request_type = &args.request_type; - - let inbound = fields.iter().map(|variant| { - let variant_name = variant.ident.clone(); - let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake); - let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span()); - let field_type = if let Fields::Unnamed(fields_unnamed) = &variant.fields { - &fields_unnamed.unnamed[0].ty - } else { - panic!("Enum variants must be tuples"); - }; - - let msg = format!( - "`{}` channel field - autogenerated by the [actor_inputs] macro.", - variant_name - ); - quote! { - #[doc = #msg] - pub #snake_case_variant_name: InboundChannel<#field_type, #name> - } - }); - - let match_arm = fields.iter().map(|variant| { - let variant_name = &variant.ident; - if let Fields::Unnamed(fields_unnamed) = &variant.fields { - &fields_unnamed.unnamed[0].ty - } else { - panic!("Enum variants must be tuples"); - }; - - quote! { - #name::#variant_name(msg) => { - stringify!(#variant_name).to_string() - } - } - }); - - let from_builder_inbounds = fields.iter().map(|variant| { - let variant_name = &variant.ident; - let snake_case_variant_name = variant.ident.clone().to_string().to_case(Case::Snake); - let snake_case_variant_name = Ident::new(&snake_case_variant_name, variant_name.span()); - let field_type = if let Fields::Unnamed(fields_unnamed) = &variant.fields { - &fields_unnamed.unnamed[0].ty - } else { - panic!("Enum variants must be tuples"); - }; - - quote! { - let #snake_case_variant_name = InboundChannel::<#field_type, #name>::new( - &mut builder.context, - actor_name.clone(), - &builder.sender, - #name::#variant_name(Default::default()).inbound_channel(), - ); - builder - .forward - .insert(#snake_case_variant_name.name.clone(), Box::new(#snake_case_variant_name.clone())); - } - }); - - let from_builder_init = fields.iter().map(|variant| { - let variant_name = variant.ident.clone(); - let snake_case_variant_name = variant_name.to_string().to_case(Case::Snake); - let snake_case_variant_name = Ident::new(&snake_case_variant_name, variant_name.span()); - - quote! { - #snake_case_variant_name, - } - }); - - let gen = quote! { - #ast - - /// Auto-generated inbound hub for actor. - pub struct #struct_name { - #(#inbound),* - } - - impl InboundMessage for #name { - type Prop = #prop_type; - type State = #state_type; - type OutboundHub = #output_type; - type RequestHub = #request_type; - - fn inbound_channel(&self) -> String { - match self { - #(#match_arm),* - } - } - } - - impl InboundHub<#prop_type, #state_type, #output_type, #request_type,#name> for #struct_name { - - fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type,#request_type, #name>, - actor_name: &str) -> Self { - #(#from_builder_inbounds)* - - #struct_name { - #(#from_builder_init)* - } - } - } - - }; - - gen.into() -} - -struct ActorArgs { - message_type: Ident, + core::actor_inputs_impl( + proc_macro2::TokenStream::from(args), + proc_macro2::TokenStream::from(inbound), + ) + .into() } -impl Parse for ActorArgs { - fn parse(inbound_hub: ParseStream) -> Result { - let message_type: Ident = inbound_hub.parse()?; - Ok(ActorArgs { message_type }) - } -} - -/// Documentation is in the hollywood crate. +/// Documented in the root-level hollywood crate. #[proc_macro_attribute] pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { - // parse inbound - let ActorArgs { message_type } = parse_macro_input!(attr as ActorArgs); - let inbound: Item = parse_macro_input!(item); - let inbound_clone = inbound.clone(); - - // Get actor name from the item - let actor_name = match inbound { - Item::Type(item) => item.ident, - _ => panic!("`actor` attribute can only be used with type aliases"), - }; - - let mut inbound_clone = inbound_clone.clone(); - let mut attrs = Vec::new(); - if let Item::Type(item_type) = &mut inbound_clone { - attrs.append(&mut item_type.attrs); - // ... - } - - let mut maybe_prop = None; - let mut maybe_inbounds = None; - let mut maybe_state = None; - let mut maybe_outputs = None; - let mut maybe_requests = None; - - if let Item::Type(item_type) = inbound_clone { - if let Type::Path(type_path) = *item_type.ty { - if type_path.path.segments.last().unwrap().ident != "Actor" { - return Error::new_spanned(&type_path, "Expected Actor<...>") - .to_compile_error() - .into(); - } - for segment in type_path.path.segments { - if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments { - if angle_bracketed_args.args.len() != 5 { - return Error::new_spanned( - &angle_bracketed_args, - "Expected 5 type arguments: Actor", - ) - .to_compile_error() - .into(); - } - maybe_prop = Some(angle_bracketed_args.args[0].clone()); - maybe_inbounds = Some(angle_bracketed_args.args[1].clone()); - maybe_state = Some(angle_bracketed_args.args[2].clone()); - maybe_outputs = Some(angle_bracketed_args.args[3].clone()); - maybe_requests = Some(angle_bracketed_args.args[4].clone()); - } - } - } else { - return Error::new_spanned(&item_type.ty, "Expected a type path") - .to_compile_error() - .into(); - } - } else { - panic!("`actor` attribute can only be used with type aliases"); - } - - let prop = maybe_prop.unwrap(); - let inbound = maybe_inbounds.unwrap(); - let state_type = maybe_state.unwrap(); - let out = maybe_outputs.unwrap(); - let requests = maybe_requests.unwrap(); - - let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> }; - - let gen = quote! { - - /// - #( #attrs )* - pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>; - - impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type> - for #actor_name - { - fn name_hint(prop: &#prop) -> String { - stringify!(#actor_name).to_owned() - } - } - }; - - gen.into() + core::actor_impl( + proc_macro2::TokenStream::from(attr), + proc_macro2::TokenStream::from(item), + ) + .into() } -struct ActorInbound { - struct_name: Ident, - prop_type: Ident, - state_type: Ident, - output_type: Ident, - request_type: Ident, -} +/// Documented in the root-level hollywood crate. +#[proc_macro] +pub fn zip_n(input: TokenStream) -> TokenStream { + let parsed = proc_macro2::TokenStream::from(input.clone()); + let parsed2 = parsed.clone(); + let parsed3 = parsed.clone(); + let parsed4 = parsed.clone(); + let parsed5 = parsed.clone(); + let parsed6 = parsed.clone(); + let parsed7 = parsed.clone(); + + let output_tuple_n = actors::zip::tuple_n_impl(parsed); + let output_zip_outbound_n = actors::zip::zip_outbound_n_impl(parsed2); + let output_zip_state_n = actors::zip::zip_state_n_impl(parsed3); + let output_inbound_message_n = actors::zip::zip_inbound_message_n_impl(parsed4); + let output_zip_n = actors::zip::zip_n_impl(parsed5); + let output_zip_inbound_n = actors::zip::zip_inbound_n_impl(parsed6); + let output_zip_onmessage_n_new = actors::zip::zip_onmessage_n_impl(parsed7); + + let combined_output = quote! { + #output_tuple_n + #output_zip_outbound_n + #output_zip_state_n + #output_inbound_message_n + #output_zip_n + #output_zip_inbound_n + #output_zip_onmessage_n_new + }; -impl Parse for ActorInbound { - fn parse(inbound: ParseStream) -> Result { - let struct_name: Ident = inbound.parse()?; - let _: Token![,] = inbound.parse()?; - let content; - syn::braced!(content in inbound); - let prop_type: Ident = content.parse()?; - let _: Token![,] = content.parse()?; - let state_type: Ident = content.parse()?; - let _: Token![,] = content.parse()?; - let output_type: Ident = content.parse()?; - let _: Token![,] = content.parse()?; - let request_type: Ident = content.parse()?; - Ok(ActorInbound { - struct_name, - prop_type, - state_type, - output_type, - request_type - }) - } + TokenStream::from(combined_output) } diff --git a/src/actors.rs b/src/actors.rs index 6f737dc..a21f4e2 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -5,3 +5,25 @@ pub use periodic::Periodic; /// Generic printer actor. pub mod printer; pub use printer::{Printer, PrinterProp}; + +/// Nudge actor. +pub mod nudge; +pub use nudge::Nudge; + +/// Zip actor. +pub mod zip; +pub use zip::Zip2; +pub use zip::Zip3; +pub use zip::Zip4; +pub use zip::Zip5; +pub use zip::Zip6; +pub use zip::Zip7; +pub use zip::Zip8; +pub use zip::Zip9; +pub use zip::Zip10; +pub use zip::Zip11; +pub use zip::Zip12; + +/// Egui actor. +#[cfg(feature="egui")] +pub mod egui; diff --git a/src/actors/nudge.rs b/src/actors/nudge.rs new file mode 100644 index 0000000..188c003 --- /dev/null +++ b/src/actors/nudge.rs @@ -0,0 +1,153 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use hollywood_macros::actor_outputs; + +use crate::compute::context::Context; +use crate::core::connection::ConnectionEnum; +use crate::core::NullState; + +use crate::core::request::NullRequest; +use crate::core::{ + actor::{ActorNode, FromPropState, GenericActor}, + inbound::{ForwardMessage, NullInbound, NullMessage}, + outbound::{Activate, OutboundChannel, OutboundHub}, + runner::Runner, +}; + +/// Prop for the nudge actor. +#[derive(Clone, Debug, Default)] +pub struct NudgeProp { + /// The attached item. + pub item: Item, +} + +/// A nudge actor. +/// +/// All it does is to send a nudge message containing the item once. +pub type Nudge = GenericActor< + NudgeProp, + NullInbound, + NullState, + NudgeOutbound, + NullRequest, + NudgeRunner, +>; + +impl Nudge { + /// Create a new nudge actor + pub fn new(context: &mut Context, item: Item) -> Nudge { + Nudge::from_prop_and_state(context, NudgeProp:: { item }, NullState::default()) + } +} + +impl + FromPropState< + NudgeProp, + NullInbound, + NullState, + NudgeOutbound, + NullMessage, NullState, NudgeOutbound, NullRequest>, + NullRequest, + NudgeRunner, + > for Nudge +{ + fn name_hint(_prop: &NudgeProp) -> String { + "Nudge".to_owned() + } +} + +/// Nudge outbound hub +#[actor_outputs] +pub struct NudgeOutbound { + /// Nudge outbound channel. + pub nudge: OutboundChannel, +} + + +/// The custom nudge runner +pub struct NudgeRunner {} + +impl + Runner< + NudgeProp, + NullInbound, + NullState, + NudgeOutbound, + NullRequest, + NullMessage, NullState, NudgeOutbound, NullRequest>, + > for NudgeRunner +{ + /// Create a new actor node. + fn new_actor_node( + name: String, + prop: NudgeProp, + state: NullState, + _receiver: tokio::sync::mpsc::Receiver< + NullMessage, NullState, NudgeOutbound, NullRequest>, + >, + _forward: std::collections::HashMap< + String, + Box< + dyn ForwardMessage< + NudgeProp, + NullState, + NudgeOutbound, + NullRequest, + NullMessage, NullState, NudgeOutbound, NullRequest>, + > + Send + + Sync, + >, + >, + outbound: NudgeOutbound, + _request: NullRequest, + ) -> Box { + Box::new(NudgeActor:: { + name: name.clone(), + prop, + init_state: state.clone(), + state: None, + outbound: Some(outbound), + }) + } +} + +/// The nudge actor. +pub struct NudgeActor { + name: String, + prop: NudgeProp, + init_state: NullState, + state: Option, + outbound: Option>, +} + +#[async_trait] +impl ActorNode + for NudgeActor +{ + fn name(&self) -> &String { + &self.name + } + + fn reset(&mut self) { + self.state = Some(self.init_state.clone()); + } + + async fn run(&mut self, mut _kill: tokio::sync::broadcast::Receiver<()>) { + let mut outbound = self.outbound.take().unwrap(); + outbound.activate(); + self.reset(); + + match &outbound.nudge.connection_register { + ConnectionEnum::Config(_) => { + panic!("Cannot extract connection config") + } + ConnectionEnum::Active(active) => { + for i in active.maybe_registers.as_ref().unwrap().iter() { + println!("NudgeActor: sending"); + i.send_impl(self.prop.item.clone()); + } + } + } + } +} diff --git a/src/actors/periodic.rs b/src/actors/periodic.rs index 4d9f5fc..64cd5d2 100644 --- a/src/actors/periodic.rs +++ b/src/actors/periodic.rs @@ -9,7 +9,7 @@ use crate::core::request::NullRequest; use crate::core::{ actor::{ActorNode, FromPropState, GenericActor}, inbound::{ForwardMessage, NullInbound, NullMessage}, - outbound::{Morph, OutboundChannel, OutboundHub}, + outbound::{Activate, OutboundChannel, OutboundHub}, runner::Runner, }; @@ -97,7 +97,7 @@ pub struct PeriodicOutbound { pub time_stamp: OutboundChannel, } -impl Morph for PeriodicOutbound { +impl Activate for PeriodicOutbound { fn extract(&mut self) -> Self { Self { time_stamp: self.time_stamp.extract(), diff --git a/src/actors/printer.rs b/src/actors/printer.rs index 0badfe3..5291c5b 100644 --- a/src/actors/printer.rs +++ b/src/actors/printer.rs @@ -1,5 +1,7 @@ use std::fmt::{Debug, Display}; +use hollywood_macros::actor_inputs; + use crate::core::{ request::NullRequest, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, InboundMessageNew, NullOutbound, NullState, OnMessage, @@ -22,12 +24,15 @@ impl Default for PrinterProp { /// Inbound message for the printer actor. #[derive(Clone, Debug)] -pub enum PrinterInboundMessage { +#[actor_inputs(PrinterInbound, {PrinterProp, NullState, NullOutbound, NullRequest})] +pub enum PrinterInboundMessage { /// Printable message. Printable(T), } -impl OnMessage for PrinterInboundMessage { +impl OnMessage + for PrinterInboundMessage +{ fn on_message( self, prop: &PrinterProp, @@ -43,7 +48,7 @@ impl OnMessage for PrinterIn } } -impl InboundMessageNew +impl InboundMessageNew for PrinterInboundMessage { fn new(_inbound_name: String, msg: T) -> Self { @@ -51,7 +56,7 @@ impl InboundMessageNew } } -/// Generic printer actor. +/// Printer actor. pub type Printer = Actor, NullState, NullOutbound, NullRequest>; impl @@ -69,50 +74,3 @@ impl format!("Printer({})", prop.topic) } } - -/// Builder for the generic printer. -pub struct PrinterInbound { - /// Inbound channel to receive printable messages. - pub printable: InboundChannel>, -} - -impl InboundMessage - for PrinterInboundMessage -{ - type Prop = PrinterProp; - type State = NullState; - type OutboundHub = NullOutbound; - type RequestHub = NullRequest; - - fn inbound_channel(&self) -> String { - match self { - PrinterInboundMessage::Printable(_) => "Printable".to_owned(), - } - } -} - -impl - InboundHub> - for PrinterInbound -{ - fn from_builder( - builder: &mut ActorBuilder< - PrinterProp, - NullState, - NullOutbound, - NullRequest, - PrinterInboundMessage, - >, - actor_name: &str, - ) -> Self { - let m = InboundChannel::new( - builder.context, - actor_name, - &builder.sender, - PrinterInboundMessage::Printable(T::default()).inbound_channel(), - ); - builder.forward.insert(m.name.clone(), Box::new(m.clone())); - - PrinterInbound { printable: m } - } -} diff --git a/src/actors/zip.rs b/src/actors/zip.rs new file mode 100644 index 0000000..8614e33 --- /dev/null +++ b/src/actors/zip.rs @@ -0,0 +1,62 @@ +use std::cmp::Reverse; +use std::fmt::Debug; +use std::fmt::Display; + +use hollywood_macros::zip_n; + +use crate::compute::Context; +use crate::core::request::NullRequest; +use crate::core::Activate; +use crate::core::Actor; +use crate::core::DefaultRunner; +use crate::core::FromPropState; +use crate::core::InboundChannel; +use crate::core::InboundHub; +use crate::core::InboundMessage; +use crate::core::InboundMessageNew; +use crate::core::NullProp; +use crate::core::OnMessage; +use crate::core::OutboundChannel; +use crate::core::OutboundHub; + +/// Type of the Xth inbound channel for the zip actor. +#[derive(Clone, Debug, Default)] +pub struct ZipPair { + /// Key to associate message from different inbound channels with. + pub key: Key, + /// The value to be zipped. + pub value: Value, +} + +impl PartialEq for ZipPair { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for ZipPair {} + +impl PartialOrd for ZipPair { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ZipPair { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.key.cmp(&other.key) + } +} + + +zip_n!(2); +zip_n!(3); +zip_n!(4); +zip_n!(5); +zip_n!(6); +zip_n!(7); +zip_n!(8); +zip_n!(9); +zip_n!(10); +zip_n!(11); +zip_n!(12); diff --git a/src/compute.rs b/src/compute.rs index 96bbd13..9df60cc 100644 --- a/src/compute.rs +++ b/src/compute.rs @@ -4,12 +4,10 @@ pub use context::Context; /// The compute graph of actors. pub mod pipeline; -pub use pipeline::Pipeline; pub(crate) use pipeline::CancelRequest; +pub use pipeline::Pipeline; /// The graph topology. pub mod topology; -pub(crate) use topology::Topology; pub(crate) use topology::ActorNode; - - +pub(crate) use topology::Topology; diff --git a/src/compute/context.rs b/src/compute/context.rs index d648967..fb6d4c2 100644 --- a/src/compute/context.rs +++ b/src/compute/context.rs @@ -2,9 +2,7 @@ use std::marker::PhantomData; use std::sync::Arc; use crate::compute::{CancelRequest, Pipeline, Topology}; -use crate::core::{ - InboundChannel, InboundMessage, OutboundChannel, OutboundConnection, ActorNode, -}; +use crate::core::{ActorNode, InboundChannel, InboundMessage, OutboundChannel, OutboundConnection}; /// The context of the compute graph which is used to configure the network topology. /// @@ -35,7 +33,7 @@ impl Context { self.cancel_request_sender_template.clone() } - /// Registers an outbound channel for cancel request. + /// Registers an outbound channel for cancel request. /// /// Upon receiving a cancel request the registered outbound channel, the execution of the /// pipeline will be stopped. @@ -49,7 +47,6 @@ impl Context { })); } - fn new() -> Self { let (cancel_request_sender_template, cancel_request_receiver) = tokio::sync::mpsc::channel(1); @@ -84,13 +81,14 @@ impl Context { } pub(crate) fn connect_impl< - T: Clone + std::fmt::Debug + Sync + Send + 'static, + T0: Clone + std::fmt::Debug + Sync + Send + 'static, + T1: Clone + std::fmt::Debug + Sync + Send + 'static, M: InboundMessage, >( &mut self, - outbound: &mut OutboundChannel, - inbound: &mut InboundChannel, + outbound: &mut OutboundChannel, + inbound: &mut InboundChannel, ) { self.topology.connect(outbound, inbound); - } + } } diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs index 20256f9..a50b34a 100644 --- a/src/compute/pipeline.rs +++ b/src/compute/pipeline.rs @@ -65,8 +65,8 @@ impl Pipeline { /// Executes the compute graph. /// - /// It consumes the self, starts execution of the pipeline and returns a future (since it is - /// an async function) that resolves to the pipeline itself. The future is completed when all + /// It consumes the self, starts execution of the pipeline and returns a future (since it is + /// an async function) that resolves to the pipeline itself. The future is completed when all /// actors have completed their execution. /// /// In particular, [ActorNode::run()] is called for each actor in the pipeline in a dedicated diff --git a/src/compute/topology.rs b/src/compute/topology.rs index b401bd9..b5c0b2a 100644 --- a/src/compute/topology.rs +++ b/src/compute/topology.rs @@ -149,12 +149,13 @@ impl Topology { } pub(crate) fn connect< - T: Clone + std::fmt::Debug + Sync + Send + 'static, + T0: Clone + std::fmt::Debug + Sync + Send + 'static, + T1: Clone + std::fmt::Debug + Sync + Send + 'static, M: InboundMessage, >( &mut self, - outbound: &mut OutboundChannel, - inbound: &mut InboundChannel, + outbound: &mut OutboundChannel, + inbound: &mut InboundChannel, ) { let output_parent_idx = self .unique_idx_name_pairs diff --git a/src/core.rs b/src/core.rs index 2047f92..f4d50dc 100644 --- a/src/core.rs +++ b/src/core.rs @@ -2,8 +2,8 @@ /// Actor pub mod actor; -pub use actor::{Actor, FromPropState}; pub(crate) use actor::ActorNode; +pub use actor::{Actor, FromPropState}; /// Actor builder pub mod actor_builder; @@ -17,13 +17,14 @@ pub use inbound::{ OnMessage, }; -/// Outbound +/// Outbound pub mod outbound; pub(crate) use outbound::OutboundConnection; -pub use outbound::{Morph, NullOutbound, OutboundChannel, OutboundHub}; +pub use outbound::{Activate, NullOutbound, OutboundChannel, OutboundHub}; /// Request pub mod request; +pub use request::{NullRequest, RequestHub}; /// Connection pub mod connection; diff --git a/src/core/actor.rs b/src/core/actor.rs index f67f8ba..8aef207 100644 --- a/src/core/actor.rs +++ b/src/core/actor.rs @@ -80,7 +80,7 @@ pub trait FromPropState< let mut builder = ActorBuilder::new(context, &actor_name, prop, initial_state); - let request = Request::from_context_and_parent(&actor_name, &builder.sender); + let request = Request::from_parent_and_sender(&actor_name, &builder.sender); let inbound = Inbound::from_builder(&mut builder, &actor_name); builder.build::(inbound, out, request) diff --git a/src/core/actor_builder.rs b/src/core/actor_builder.rs index 7c6c995..c7ffe82 100644 --- a/src/core/actor_builder.rs +++ b/src/core/actor_builder.rs @@ -1,5 +1,3 @@ - - use crate::compute::context::Context; use crate::core::{ actor::GenericActor, @@ -14,14 +12,7 @@ use super::request::RequestHub; /// Creates actor from its components. /// /// Used in [`InboundHub::from_builder`] public interface. -pub struct ActorBuilder< - 'a, - Prop, - State, - OutboundHub, - Request: RequestHub, - M: InboundMessage, -> { +pub struct ActorBuilder<'a, Prop, State, OutboundHub, Request: RequestHub, M: InboundMessage> { /// unique identifier of the actor pub actor_name: String, prop: Prop, diff --git a/src/core/connection/outbound_connection.rs b/src/core/connection/outbound_connection.rs index 533a800..9c9a4f6 100644 --- a/src/core/connection/outbound_connection.rs +++ b/src/core/connection/outbound_connection.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use crate::core::{outbound::GenericConnection, Morph}; +use crate::core::{outbound::GenericConnection, Activate}; -use super::{ConnectionRegister, ConnectionEnum}; +use super::{ConnectionEnum, ConnectionRegister}; pub(crate) struct ConnectionConfig { pub connection_register: ConnectionRegister, @@ -37,7 +37,6 @@ pub(crate) struct ActiveConnection { pub maybe_register_landing_pad: Option>>, } - impl ConnectionEnum { pub fn new() -> Self { Self::Config(ConnectionConfig::new()) @@ -68,7 +67,7 @@ impl ConnectionEnum { } } -impl Morph for ConnectionEnum { +impl Activate for ConnectionEnum { fn extract(&mut self) -> Self { match self { Self::Config(config) => Self::Active(ActiveConnection { diff --git a/src/core/connection/request_connection.rs b/src/core/connection/request_connection.rs index 5b4d8da..fc640a4 100644 --- a/src/core/connection/request_connection.rs +++ b/src/core/connection/request_connection.rs @@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::Arc}; use tokio::sync::mpsc::error::SendError; -use crate::core::{InboundMessage, InboundMessageNew, Morph}; +use crate::core::{InboundMessage, InboundMessageNew, Activate}; use super::{RequestConnectionEnum, RequestConnectionRegister}; @@ -103,7 +103,7 @@ impl RequestConnectionEnum { } } -impl Morph for RequestConnectionEnum { +impl Activate for RequestConnectionEnum { fn extract(&mut self) -> Self { match self { Self::Config(config) => Self::Active(ActiveRequestConnection { diff --git a/src/core/outbound.rs b/src/core/outbound.rs index 8f26d3f..2a279ab 100644 --- a/src/core/outbound.rs +++ b/src/core/outbound.rs @@ -1,13 +1,13 @@ use std::{marker::PhantomData, sync::Arc}; use tokio::sync::mpsc::error::SendError; +use super::connection::ConnectionEnum; use crate::compute::context::Context; use crate::core::inbound::{InboundChannel, InboundMessage, InboundMessageNew}; - -use super::connection::ConnectionEnum; +use std::fmt::{Debug, Formatter}; /// OutboundHub is a collection of outbound channels for the actor. -pub trait OutboundHub: Send + Sync + 'static + Morph { +pub trait OutboundHub: Send + Sync + 'static + Activate { /// Creates the OutboundHub from context and the actor name. fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self; } @@ -16,7 +16,7 @@ pub trait OutboundHub: Send + Sync + 'static + Morph { #[derive(Debug, Clone)] pub struct NullOutbound {} -impl Morph for NullOutbound { +impl Activate for NullOutbound { fn extract(&mut self) -> Self { Self {} } @@ -39,7 +39,7 @@ pub struct OutboundChannel { pub(crate) connection_register: ConnectionEnum, } -impl OutboundChannel { +impl OutboundChannel { /// Create a new outbound for actor in provided context. pub fn new(context: &mut Context, name: String, actor_name: &str) -> Self { context.assert_unique_outbound_name(name.clone(), actor_name); @@ -52,35 +52,56 @@ impl OutboundChann } /// Connect the outbound channel from this actor to the inbound channel of another actor. - pub fn connect>( + pub fn connect>( &mut self, ctx: &mut Context, - inbound: &mut InboundChannel, + inbound: &mut InboundChannel, ) { ctx.connect_impl(self, inbound); - self.connection_register.push(Arc::new(OutboundConnection { - sender: inbound.sender.clone(), - inbound_channel: inbound.name.clone(), - phantom: PhantomData {}, - })); + self.connection_register + .push(Arc::new(OutboundConnection:: { + sender: inbound.sender.clone(), + inbound_channel: inbound.name.clone(), + phantom: PhantomData, + })); + } + + /// Connect the outbound channel of type OutT to the inbound channel of another type InT. + /// The user provided adapter function is used to convert from OutT to InT. + pub fn connect_with_adapter< + InT: Default + Clone + Send + Sync + std::fmt::Debug + 'static, + M: InboundMessageNew, + >( + &mut self, + ctx: &mut Context, + adapter: fn(OutT) -> InT, + inbound: &mut InboundChannel, + ) { + ctx.connect_impl(self, inbound); + self.connection_register + .push(Arc::new(OutboundConnectionWithAdapter:: { + sender: inbound.sender.clone(), + inbound_channel: inbound.name.clone(), + adapter, + })); } /// Send a message to the connected inbound channels to other actors. - pub fn send(&self, msg: T) { + pub fn send(&self, msg: OutT) { self.connection_register.send(msg); } } -/// Trait for morphing state of an outbound channel. -pub trait Morph { - /// Extract outbound channel and returns it. +/// Outbound/request channel activation +pub trait Activate { + /// Extract outbound/request channel and returns it. fn extract(&mut self) -> Self; - /// Activates the outbound channel to be used. + /// Activates the outbound/request channel to be used. fn activate(&mut self); } -impl Morph for OutboundChannel { +impl Activate for OutboundChannel { fn activate(&mut self) { self.connection_register.activate(); } @@ -94,19 +115,36 @@ impl Morph for OutboundChannel { } } -#[derive(Debug, Clone)] -pub(crate) struct OutboundConnection { +#[derive(Clone, Debug)] +pub(crate) struct OutboundConnection { pub(crate) sender: tokio::sync::mpsc::Sender, pub(crate) inbound_channel: String, - pub(crate) phantom: PhantomData, + pub(crate) phantom: std::marker::PhantomData, +} + +#[derive(Clone)] +pub(crate) struct OutboundConnectionWithAdapter { + pub(crate) sender: tokio::sync::mpsc::Sender, + pub(crate) inbound_channel: String, + pub(crate) adapter: fn(Out) -> InT, +} + +impl Debug for OutboundConnectionWithAdapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OutboundConnection") + .field("inbound_channel", &self.inbound_channel) + .finish() + } } pub(crate) trait GenericConnection: Send + Sync { fn send_impl(&self, msg: T); } -impl> GenericConnection for OutboundConnection { - fn send_impl(&self, msg: T) { +impl> GenericConnection + for OutboundConnection +{ + fn send_impl(&self, msg: Out) { let msg = M::new(self.inbound_channel.clone(), msg); let c = self.sender.clone(); let handler = tokio::spawn(async move { @@ -120,3 +158,21 @@ impl> GenericConnection for OutboundC std::mem::drop(handler); } } + +impl> GenericConnection + for OutboundConnectionWithAdapter +{ + fn send_impl(&self, msg: Out) { + let msg = M::new(self.inbound_channel.clone(), (self.adapter)(msg)); + let c = self.sender.clone(); + let handler = tokio::spawn(async move { + match c.send(msg).await { + Ok(_) => {} + Err(SendError(_)) => { + println!("SendError"); + } + } + }); + std::mem::drop(handler); + } +} diff --git a/src/core/request.rs b/src/core/request.rs index 4e27fa1..44c074f 100644 --- a/src/core/request.rs +++ b/src/core/request.rs @@ -5,12 +5,12 @@ use crate::compute::Context; use super::connection::request_connection::RequestConnection; use super::connection::RequestConnectionEnum; -use super::{InboundChannel, InboundMessage, InboundMessageNew, Morph}; +use super::{InboundChannel, InboundMessage, InboundMessageNew, Activate}; -/// A request hub is used to send requests to other actors. -pub trait RequestHub: Send + Sync + 'static + Morph { +/// A request hub is used to send requests to other actors which will reply later. +pub trait RequestHub: Send + Sync + 'static + Activate { /// Create a new request hub for an actor. - fn from_context_and_parent(actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self; + fn from_parent_and_sender(actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self; } /// A request message with a reply channel. @@ -70,18 +70,18 @@ pub struct ReplyMessage { pub reply: Reply, } -/// OutboundChannel is a connections for messages which are sent to a downstream actor. +/// RequestChannel is a connections for messages which are sent to a downstream actor. pub struct RequestChannel { - /// Unique name of the outbound. + /// Unique name of the request channel. pub name: String, - /// Name of the actor that sends the outbound messages. + /// Name of the actor that sends the request messages. pub actor_name: String, - pub(crate) connection_register: RequestConnectionEnum>, + pub(crate) connection_register: RequestConnectionEnum>, pub(crate) sender: tokio::sync::mpsc::Sender, } -impl Morph for RequestChannel { +impl Activate for RequestChannel { fn extract(&mut self) -> Self { Self { name: self.name.clone(), @@ -102,7 +102,7 @@ impl< M: InboundMessageNew>, > RequestChannel { - /// Create a new outbound for actor in provided context. + /// Create a new request channel for actor in provided context. pub fn new(name: String, actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self { Self { name: name.clone(), @@ -112,7 +112,7 @@ impl< } } - /// Connect the outbound channel from this actor to the inbound channel of another actor. + /// Connect the request channel from this actor to the inbound channel of another actor. pub fn connect>>( &mut self, _ctx: &mut Context, @@ -125,7 +125,7 @@ impl< })); } - /// Send a message to the connected inbound channels to other actors. + /// Send a message to the connected inbound channels of other actors. pub fn send_request(&self, msg: Request) { let (sender, receiver) = tokio::sync::oneshot::channel(); let msg = RequestMessage { @@ -149,12 +149,12 @@ impl< pub struct NullRequest {} impl RequestHub for NullRequest { - fn from_context_and_parent(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender) -> Self { + fn from_parent_and_sender(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender) -> Self { Self {} } } -impl Morph for NullRequest { +impl Activate for NullRequest { fn extract(&mut self) -> Self { Self {} } diff --git a/src/core/runner.rs b/src/core/runner.rs index f753491..357c2de 100644 --- a/src/core/runner.rs +++ b/src/core/runner.rs @@ -1,5 +1,3 @@ - - use crate::core::{ inbound::{InboundHub, InboundMessage}, outbound::OutboundHub, diff --git a/src/core/value.rs b/src/core/value.rs index 2ef32ff..29c8218 100644 --- a/src/core/value.rs +++ b/src/core/value.rs @@ -2,8 +2,6 @@ #[derive(Clone, Debug, Default)] pub struct NullState {} - /// Empty prop - for actors without props. #[derive(Clone, Debug, Default)] pub struct NullProp {} - diff --git a/src/example_actors.rs b/src/example_actors.rs index de0975d..a34ea83 100644 --- a/src/example_actors.rs +++ b/src/example_actors.rs @@ -4,7 +4,7 @@ pub mod moving_average; /// One dimensional robot Kalman filter example. -/// +/// /// ```text /// * Periodic_0 * /// | time_stamp | @@ -27,5 +27,3 @@ pub mod moving_average; /// *Printer(filter s* * DrawActor_0 * /// ``` pub mod one_dim_robot; - - diff --git a/src/example_actors/moving_average.rs b/src/example_actors/moving_average.rs index 7da0c3e..ce90990 100644 --- a/src/example_actors/moving_average.rs +++ b/src/example_actors/moving_average.rs @@ -3,7 +3,7 @@ use crate::macros::*; // needed for actor_outputs macro pub use crate::compute::Context; use crate::core::request::NullRequest; -pub use crate::core::{Morph, OutboundChannel, OutboundHub}; +pub use crate::core::{Activate, OutboundChannel, OutboundHub}; // needed for actor_inputs macro pub use crate::core::{ @@ -11,7 +11,7 @@ pub use crate::core::{ }; // needed for actor macro -pub use crate::core::{Actor, FromPropState, DefaultRunner}; +pub use crate::core::{Actor, DefaultRunner, FromPropState}; /// Outbound hub for the MovingAverage. #[actor_outputs] diff --git a/src/example_actors/one_dim_robot.rs b/src/example_actors/one_dim_robot.rs index f52e873..4696920 100644 --- a/src/example_actors/one_dim_robot.rs +++ b/src/example_actors/one_dim_robot.rs @@ -11,9 +11,9 @@ pub mod filter; pub use filter::{Filter, NamedFilterState}; /// Drawing actor for the one dimensional robot. -/// +/// /// Draws "ascii art" of the robot and the filter state to the console. -/// +/// /// ```text /// time:2.25 /// ⡏⢹⢀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⡀ diff --git a/src/example_actors/one_dim_robot/draw.rs b/src/example_actors/one_dim_robot/draw.rs index 98282dc..c85e17c 100644 --- a/src/example_actors/one_dim_robot/draw.rs +++ b/src/example_actors/one_dim_robot/draw.rs @@ -1,3 +1,4 @@ +use crate::actors::zip::Tuple3; use crate::core::request::NullRequest; use crate::core::InboundMessageNew; use crate::core::NullOutbound; @@ -11,12 +12,8 @@ use drawille::Canvas; #[derive(Clone, Debug)] #[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound,NullRequest})] pub enum DrawInboundMessage { - /// True position of the robot. - TruePos(Stamped), - /// True range measurement. - TrueRange(Stamped), - /// Filter estimate of the robot's position. - FilterEst(NamedFilterState), + /// Tuple of true pos, true range and filter state + Zipped(Tuple3, Stamped, NamedFilterState>), } /// Draw actor for one-dim-robot example. @@ -29,45 +26,26 @@ impl OnMessage for DrawInboundMessage { self, _prop: &NullProp, state: &mut Self::State, - outbound: &Self::OutboundHub, + _outbound: &Self::OutboundHub, _request: &Self::RequestHub, ) { match self { - DrawInboundMessage::TruePos(msg) => { - state.true_pos(msg.clone(), outbound); + DrawInboundMessage::Zipped(msg) => { + state.draw(msg.item0.clone(), msg.item1.clone(), msg.item2.clone()); } - DrawInboundMessage::TrueRange(msg) => { - state.true_range(msg.clone(), outbound); - } - DrawInboundMessage::FilterEst(msg) => { - state.filter_est(msg.clone(), outbound); - } - } - } -} - -impl InboundMessageNew> for DrawInboundMessage { - fn new(inbound_channel: String, msg: Stamped) -> Self { - match inbound_channel.as_str() { - "TruePos" => DrawInboundMessage::TruePos(msg), - _ => panic!("Unknown inbound name {}", inbound_channel), } } } -impl InboundMessageNew> for DrawInboundMessage { - fn new(inbound_channel: String, msg: Stamped) -> Self { +impl InboundMessageNew, Stamped, NamedFilterState>> + for DrawInboundMessage +{ + fn new( + inbound_channel: String, + msg: Tuple3, Stamped, NamedFilterState>, + ) -> Self { match inbound_channel.as_str() { - "TrueRange" => DrawInboundMessage::TrueRange(msg), - _ => panic!("Unknown inbound name {}", inbound_channel), - } - } -} - -impl InboundMessageNew for DrawInboundMessage { - fn new(inbound_channel: String, msg: NamedFilterState) -> Self { - match inbound_channel.as_str() { - "FilterEst" => DrawInboundMessage::FilterEst(msg), + "Zipped" => DrawInboundMessage::Zipped(msg), _ => panic!("Unknown inbound name {}", inbound_channel), } } @@ -75,39 +53,17 @@ impl InboundMessageNew for DrawInboundMessage { /// State of the draw actor. #[derive(Clone, Debug, Default)] -pub struct DrawState { - /// The most recent true position. - pub true_robot: Option>, - /// The most recent true range measurement. - pub true_range: Option>, - /// The most recent filter estimate. - pub filter_est: Option, -} +pub struct DrawState {} impl DrawState { - /// Visualize the true robot state by saving it to the state and invoking [DrawState::draw]. - pub fn true_pos(&mut self, p: Stamped, _: &NullOutbound) { - self.true_robot = Some(p); - - self.draw(); - } - - /// Visualize the range measurement by saving it to the state and invoking [DrawState::draw]. - pub fn true_range(&mut self, p: Stamped, _: &NullOutbound) { - self.true_range = Some(p); - - self.draw(); - } - - /// Visualize the filter estimate by saving it to the state and invoking [DrawState::draw]. - pub fn filter_est(&mut self, p: NamedFilterState, _: &NullOutbound) { - self.filter_est = Some(p); - self.draw(); - } - /// Draw the current state to the console if all information of the most recent timestamp is /// available. - pub fn draw(&mut self) { + pub fn draw( + &mut self, + true_robot: Stamped, + true_range: Stamped, + filter_est: NamedFilterState, + ) { let factor = 6.0; let width_pixels: u32 = 100; @@ -124,71 +80,65 @@ impl DrawState { let mut canvas = Canvas::new(width_pixels, height_pixels); - if let Some(p) = &self.true_robot { - let true_x = p.value.position; - if let Some(r) = &self.true_range { - if let Some(est) = &self.filter_est { - if r.time == p.time && r.time == est.state.time { - let x_left_ground = pixels_from_meter(true_x - 0.25, 0.0); - let x_right_ground = pixels_from_meter(true_x + 0.25, 0.0); - let x_left_up = pixels_from_meter(true_x - 0.25, 1.5); - let x_right_up = pixels_from_meter(true_x + 0.25, 1.5); - canvas.line_colored( - x_left_ground.0, - x_left_ground.1, - x_left_up.0, - x_left_up.1, - drawille::PixelColor::Blue, - ); - canvas.line_colored( - x_left_up.0, - x_left_up.1, - x_right_up.0, - x_right_up.1, - drawille::PixelColor::Blue, - ); - canvas.line_colored( - x_right_ground.0, - x_right_ground.1, - x_right_up.0, - x_right_up.1, - drawille::PixelColor::Blue, - ); - - let range_start = pixels_from_meter(p.value.position + 0.5, 1.0); - let range_endpoint = pixels_from_meter(p.value.position + r.value, 1.0); - canvas.line_colored( - range_start.0, - range_start.1, - range_endpoint.0, - range_endpoint.1, - drawille::PixelColor::Red, - ); - - let std = est.state.robot_position.covariance.sqrt(); - - let believe_min = pixels_from_meter(p.value.position - 3.0 * std, 1.0); - let believe_max = pixels_from_meter(p.value.position + 3.0 * std, 1.0); - - canvas.line_colored( - believe_min.0, - 5, - believe_min.0, - 10, - drawille::PixelColor::Green, - ); - canvas.line_colored( - believe_max.0, - 5, - believe_max.0, - 10, - drawille::PixelColor::Green, - ); - - println!("time:{}\n{}", r.time, canvas.frame()); - } - } - } - } + let true_x = true_robot.value.position; + let x_left_ground = pixels_from_meter(true_x - 0.25, 0.0); + let x_right_ground = pixels_from_meter(true_x + 0.25, 0.0); + let x_left_up = pixels_from_meter(true_x - 0.25, 1.5); + let x_right_up = pixels_from_meter(true_x + 0.25, 1.5); + canvas.line_colored( + x_left_ground.0, + x_left_ground.1, + x_left_up.0, + x_left_up.1, + drawille::PixelColor::Blue, + ); + canvas.line_colored( + x_left_up.0, + x_left_up.1, + x_right_up.0, + x_right_up.1, + drawille::PixelColor::Blue, + ); + canvas.line_colored( + x_right_ground.0, + x_right_ground.1, + x_right_up.0, + x_right_up.1, + drawille::PixelColor::Blue, + ); + + let p = filter_est.state.clone(); + let r = true_range; + let range_start = pixels_from_meter(p.pos_vel_acc.mean.x + 0.5, 1.0); + let range_endpoint = pixels_from_meter(p.pos_vel_acc.mean.x + r.value, 1.0); + canvas.line_colored( + range_start.0, + range_start.1, + range_endpoint.0, + range_endpoint.1, + drawille::PixelColor::Red, + ); + + let std = filter_est.state.pos_vel_acc.covariance[(0, 0)].sqrt(); + + let believe_min = pixels_from_meter(p.pos_vel_acc.mean.x - 3.0 * std, 1.0); + let believe_max = pixels_from_meter(p.pos_vel_acc.mean.x + 3.0 * std, 1.0); + + canvas.line_colored( + believe_min.0, + 5, + believe_min.0, + 10, + drawille::PixelColor::Green, + ); + canvas.line_colored( + believe_max.0, + 5, + believe_max.0, + 10, + drawille::PixelColor::Green, + ); + + println!("time:{}\n{}", r.time, canvas.frame()); } } diff --git a/src/example_actors/one_dim_robot/filter.rs b/src/example_actors/one_dim_robot/filter.rs index 1e75c0a..bb10f31 100644 --- a/src/example_actors/one_dim_robot/filter.rs +++ b/src/example_actors/one_dim_robot/filter.rs @@ -3,8 +3,8 @@ use std::fmt::{Debug, Display}; use crate::compute::Context; use crate::core::request::{NullRequest, RequestMessage}; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, - InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, + Activate, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, + InboundMessage, InboundMessageNew, NullProp, OnMessage, OutboundChannel, OutboundHub, }; use crate::example_actors::one_dim_robot::{RangeMeasurementModel, Stamped}; use hollywood_macros::{actor, actor_inputs, actor_outputs}; @@ -13,7 +13,7 @@ use super::sim::PingPong; /// Inbound channels for the filter actor. #[derive(Clone, Debug)] -#[actor_inputs(FilterInbound,{NullProp, FilterState,FilterOutbound,NullRequest})] +#[actor_inputs(FilterInbound,{NullProp, FilterState, FilterOutbound, NullRequest})] pub enum FilterInboundMessage { /// noisy velocity measurements NoisyVelocity(Stamped), @@ -76,8 +76,10 @@ impl InboundMessageNew> for FilterInboundMessage { pub struct FilterState { /// time of the last prediction or update pub time: f64, + /// Monotonically increasing sequence number + pub seq: u64, /// belief about the robot's position - pub robot_position: PositionBelieve, + pub pos_vel_acc: PositionBelieve, } impl Display for FilterState { @@ -85,7 +87,7 @@ impl Display for FilterState { write!( f, "(time: {}, robot_position: {})", - self.time, self.robot_position.mean + self.time, self.pos_vel_acc.mean ) } } @@ -116,16 +118,20 @@ impl Display for NamedFilterState { #[derive(Clone, Debug)] pub struct PositionBelieve { /// mean of the position believe - pub mean: f64, + pub mean: nalgebra::Vector3, /// covariance of the position believe - pub covariance: f64, + pub covariance: nalgebra::Matrix3, } impl Default for PositionBelieve { fn default() -> Self { Self { - mean: 0.0, - covariance: 100.0, + mean: nalgebra::Vector3::new(0.0, 0.0, 0.0), + covariance: nalgebra::Matrix3::new( + 100.0, 0.0, 0.0, // + 0.0, 100.0, 0.0, // + 0.0, 0.0, 100.0, + ), } } } @@ -140,9 +146,43 @@ impl FilterState { let dt = noisy_velocity.time - self.time; self.time = noisy_velocity.time; - self.robot_position.mean += noisy_velocity.value * dt; - const VELOCITY_STD_DEV: f64 = 0.1; - self.robot_position.covariance += VELOCITY_STD_DEV * VELOCITY_STD_DEV * dt; + // 1. Random-walk acceleration motion model + self.pos_vel_acc.mean[0] += + self.pos_vel_acc.mean[1] * dt + 0.5 * self.pos_vel_acc.mean[2] * dt * dt; + self.pos_vel_acc.mean[1] += self.pos_vel_acc.mean[2] * dt; + let f = nalgebra::Matrix3::new(1.0, dt, 0.5 * dt * dt, 0.0, 1.0, dt, 0.0, 0.0, 1.0); + let acceleration_noise_variance = 0.5; + let q = nalgebra::Matrix3::new( + 0.25 * dt.powi(4), + 0.5 * dt.powi(3), + 0.5 * dt.powi(2) * acceleration_noise_variance, + 0.5 * dt.powi(3), + dt.powi(2), + dt * acceleration_noise_variance, + 0.5 * dt.powi(2) * acceleration_noise_variance, + dt * acceleration_noise_variance, + acceleration_noise_variance, + ); + self.pos_vel_acc.covariance = f * self.pos_vel_acc.covariance * f.transpose() + q; + + // 2. Update velocity based on the velocity measurement + // (strictly speaking this is an update, not a prediction) + let h_velocity = nalgebra::Matrix1x3::new(0.0, 1.0, 0.0); + let predicted_velocity = self.pos_vel_acc.mean[1]; + let innovation_velocity = noisy_velocity.value - predicted_velocity; + const VELOCITY_MEASUREMENT_NOISE: f64 = 0.1; + let r_velocity = + nalgebra::Matrix1::new(VELOCITY_MEASUREMENT_NOISE * VELOCITY_MEASUREMENT_NOISE); + let kalman_gain_velocity = self.pos_vel_acc.covariance + * h_velocity.transpose() + * (h_velocity * self.pos_vel_acc.covariance * h_velocity.transpose() + r_velocity) + .try_inverse() + .unwrap(); + self.pos_vel_acc.mean += kalman_gain_velocity * innovation_velocity; + let identity = nalgebra::Matrix3::identity(); + self.pos_vel_acc.covariance = + (identity - kalman_gain_velocity * h_velocity) * self.pos_vel_acc.covariance; + outbound.predicted_state.send(NamedFilterState::new( "Predicted: ".to_owned(), self.clone(), @@ -153,17 +193,21 @@ impl FilterState { /// /// Updates the robot's position based on the range measurement. pub fn update(&mut self, noisy_range: &Stamped, outbound: &FilterOutbound) { - let predicted_range = Self::RANGE_MODEL.range(self.robot_position.mean); - const RANGE_STD_DEV: f64 = 1.5 * RangeMeasurementModel::RANGE_STD_DEV; - - let innovation = noisy_range.value - predicted_range; - - let mat_h = Self::RANGE_MODEL.dx_range(); - let innovation_covariance = self.robot_position.covariance + RANGE_STD_DEV * RANGE_STD_DEV; - let kalman_gain = mat_h * self.robot_position.covariance / innovation_covariance; - self.robot_position.mean += kalman_gain * innovation; - self.robot_position.covariance *= 1.0 - kalman_gain * mat_h; - + // Update position based on the range measurement + let h = nalgebra::Matrix1x3::new(1.0, 0.0, 0.0); + let predicted_range = Self::RANGE_MODEL.range(self.pos_vel_acc.mean[0]); + let innovation = predicted_range - noisy_range.value; + const RANGE_STD_DEV: f64 = RangeMeasurementModel::RANGE_STD_DEV; + let r = nalgebra::Matrix1::new(RANGE_STD_DEV * RANGE_STD_DEV); + let kalman_gain = self.pos_vel_acc.covariance + * h.transpose() + * (h * self.pos_vel_acc.covariance * h.transpose() + r) + .try_inverse() + .unwrap(); + self.pos_vel_acc.mean += kalman_gain * innovation; + let identity = nalgebra::Matrix3::identity(); + self.pos_vel_acc.covariance = (identity - kalman_gain * h) * self.pos_vel_acc.covariance; + self.seq += 1; outbound .updated_state .send(NamedFilterState::new("Updated: ".to_owned(), self.clone())); diff --git a/src/example_actors/one_dim_robot/model.rs b/src/example_actors/one_dim_robot/model.rs index d3a032a..9c89455 100644 --- a/src/example_actors/one_dim_robot/model.rs +++ b/src/example_actors/one_dim_robot/model.rs @@ -5,15 +5,18 @@ use std::fmt::{Debug, Display}; pub struct Stamped { /// Timestamp of the value. pub time: f64, + /// Monotonic sequence counter + pub seq: u64, /// The value. pub value: T, } impl Stamped { /// Creates a new value with a timestamp. - pub fn from_stamp_and_value(time: f64, value: &T) -> Self { + pub fn from_stamp_counter_and_value(time: f64, seq: u64, value: &T) -> Self { Self { time, + seq, value: value.clone(), } } diff --git a/src/example_actors/one_dim_robot/sim.rs b/src/example_actors/one_dim_robot/sim.rs index 9c7058a..70ab8c3 100644 --- a/src/example_actors/one_dim_robot/sim.rs +++ b/src/example_actors/one_dim_robot/sim.rs @@ -5,15 +5,18 @@ use rand_distr::{Distribution, Normal}; use crate::compute::Context; use crate::core::request::{ReplyMessage, RequestChannel, RequestHub}; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, - InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, + Activate, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, + InboundMessage, InboundMessageNew, NullProp, OnMessage, OutboundChannel, OutboundHub, }; use crate::example_actors::one_dim_robot::{RangeMeasurementModel, Robot, Stamped}; use crate::macros::*; +/// Ping-pong request message. #[derive(Clone, Debug, Default)] pub struct PingPong { + /// time-stamp of the request message pub ping: f64, + /// time-stamp of the reply message pub pong: f64, } @@ -73,6 +76,8 @@ pub struct SimState { pub shutdown_time: f64, /// Current time. pub time: f64, + /// Monotonic sequence counter + pub seq: u64, /// True position and velocity of the robot. pub true_robot: Robot, } @@ -82,14 +87,16 @@ impl SimState { /// One step of the simulation. pub fn process_time_stamp(&mut self, time: f64, outbound: &SimOutbound, request: &SimRequest) { + let dt = time - self.time; self.time = time; - self.true_robot.position += self.true_robot.velocity * time; - self.true_robot.velocity = 0.25 * (0.25 * time).cos(); + self.true_robot.position += self.true_robot.velocity * dt; + self.true_robot.velocity = 2.5 * (0.25 * time).cos(); let true_range = Self::RANGE_MODEL.range(self.true_robot.position); const RANGE_STD_DEV: f64 = RangeMeasurementModel::RANGE_STD_DEV; let range_normal = Normal::new(0.0, RANGE_STD_DEV).unwrap(); - let noisy_range = true_range + range_normal.sample(&mut rand::thread_rng()); + let s = range_normal.sample(&mut rand::thread_rng()); + let noisy_range = true_range + s; const VELOCITY_STD_DEV: f64 = 0.01; let noisy_velocity = self.true_robot.velocity @@ -99,20 +106,41 @@ impl SimState { outbound .true_robot - .send(Stamped::from_stamp_and_value(time, &self.true_robot)); + .send(Stamped::from_stamp_counter_and_value( + time, + self.seq, + &self.true_robot, + )); outbound .true_range - .send(Stamped::from_stamp_and_value(time, &true_range)); + .send(Stamped::from_stamp_counter_and_value( + time, + self.seq, + &true_range, + )); outbound .noisy_range - .send(Stamped::from_stamp_and_value(time, &noisy_range)); - outbound.true_velocity.send(Stamped::from_stamp_and_value( - time, - &self.true_robot.velocity, - )); + .send(Stamped::from_stamp_counter_and_value( + time, + self.seq, + &noisy_range, + )); + outbound + .true_velocity + .send(Stamped::from_stamp_counter_and_value( + time, + self.seq, + &self.true_robot.velocity, + )); outbound .noisy_velocity - .send(Stamped::from_stamp_and_value(time, &noisy_velocity)); + .send(Stamped::from_stamp_counter_and_value( + time, + self.seq, + &noisy_velocity, + )); + + self.seq += 1; if time == 5.0 { request.ping_pong.send_request(time); @@ -138,30 +166,8 @@ pub struct SimOutbound { } /// Request of the simulation actor. +#[actor_requests] pub struct SimRequest { /// Check time-stamp of receiver pub ping_pong: RequestChannel, } - -impl RequestHub for SimRequest { - fn from_context_and_parent( - actor_name: &str, - sender: &tokio::sync::mpsc::Sender, - ) -> Self { - Self { - ping_pong: RequestChannel::new(actor_name.to_owned(), "ping_pong", sender), - } - } -} - -impl Morph for SimRequest { - fn extract(&mut self) -> Self { - Self { - ping_pong: self.ping_pong.extract(), - } - } - - fn activate(&mut self) { - self.ping_pong.activate(); - } -} diff --git a/src/introspect.rs b/src/introspect.rs index eebf112..c66013d 100644 --- a/src/introspect.rs +++ b/src/introspect.rs @@ -1,3 +1,2 @@ /// The flow graph. pub mod flow_graph; - diff --git a/src/introspect/flow_graph.rs b/src/introspect/flow_graph.rs index 1b74bbd..9fceddc 100644 --- a/src/introspect/flow_graph.rs +++ b/src/introspect/flow_graph.rs @@ -297,8 +297,9 @@ impl FlowGraph { let node_id = self .topology - .unique_idx_name_pairs.get_node_idx(&super_node.actor).unwrap(); - + .unique_idx_name_pairs + .get_node_idx(&super_node.actor) + .unwrap(); for i in self .topology diff --git a/src/lib.rs b/src/lib.rs index 66dd242..733cd80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![deny(missing_docs)] + //! # Hollywood //! //! Hollywood is an actor framework for Rust. @@ -20,18 +22,19 @@ //! //! The library is organized in the following modules: //! -//! - The [macros] module contains the three macros that are used to define an actors with -//! minimal boilerplate. //! - The [core] module contains the core structs and traits of the library. [Actor](core::Actor) //! is a generic struct that represents an actor. [InboundHub](core::InboundHub) is the trait //! which represents the collection of inbound channels of an actor. Similarly, //! [OutboundHub](core::OutboundHub) is the trait which represents the collection of outbound //! channels of an actor. -//! +//! //! Most importantly, [OnMessage](core::OnMessage) is the main entry point for user code and sets //! the behavior of a user-defines actor. [OnMessage::on_message()](core::OnMessage::on_message()) //! processes incoming messages, updates the actor's state and sends messages to downstream actors //! in the pipeline. +//! +//! - The [macros] module contains the three macros that are used to define an actors with +//! minimal boilerplate. //! //! - The [compute] module contains the [Context](compute::Context) and //! [Pipeline](compute::Pipeline) which are used to configure a set of actors, connect @@ -39,10 +42,10 @@ //! //! - The [actors] module contains a set of predefined actors that can be used as part of a compute //! pipelines. -//! +//! //! - The [introspect] module contains a some visualization tools to inspect the compute pipeline. //! -//! - The [examples] module contains a set of examples actors that demonstrate how to use the library. +//! - The [example_actors] module contains a set of examples actors that demonstrate how to use the library. //! //! ## Example: moving average //! @@ -98,7 +101,7 @@ //! Note that MovingAverageState implements [Default] trait through the derive macro, and hence //! moving_average is initialized to 0.0 which is the default value for f64. An explicit //! implementation of the [Default] trait can be used to set the values of member fields as done for -//! the [examples::moving_average::MovingAverageProp] struct here. +//! the [example_actors::moving_average::MovingAverageProp] struct here. //! //! ### Inbound hub //! @@ -303,8 +306,9 @@ pub mod example_actors; /// following order: /// /// 1. [actor_outputs](macros::actor_outputs) -/// 2. [actor_inputs](macros::actor_inputs) which depends on 1. -/// 3. [actor](macros::actor) which depends on 1. and 2. +/// 2. [actor_requests](macros::actor_requests) +/// 3. [actor_inputs](macros::actor_inputs) which depends on 1. and 2. +/// 4. [actor](macros::actor) which depends on 1., 2. and 3. /// /// The documentation in this module is rather technical. For a more practical introduction, please /// refer to the examples in the root of the [crate](crate#example-moving-average). @@ -330,13 +334,36 @@ pub mod macros { /// user-specified name CHANNEL* and a user specified type TYPE*. /// /// Effect: The macro generates the [OutboundHub](crate::core::OutboundHub) and - /// [Morph](crate::core::Morph) implementations for the provided struct OUTBOUND. + /// [Activate](crate::core::Activate) implementations for the provided struct OUTBOUND. /// /// This is the first of three macros to define an actor. The other two are [macro@actor_inputs] /// and [macro@actor]. /// pub use hollywood_macros::actor_outputs; + /// This macro generates the boilerplate for the request hub struct it is applied to. + /// + /// Macro template: + /// + /// ``` text + /// #[actor_requests] + /// pub struct REQUEST { + /// pub CHANNEL0: RequestChannel, + /// pub CHANNEL1: RequestChannel, + /// ... + /// } + /// ``` + /// + /// Here, REQUEST is the user-specified name of the struct. The struct shall be defined right + /// after the macro invocation. The request struct consists of one or more request channels. + /// Each request channel has name CHANNEL*, a request type REQ_TYPE*, a reply type REPL_TYPE*, + /// and a message type M*. + /// + /// Effect: The macro generates the [RequestHub](crate::core::RequestHub) and + /// [Activate](crate::core::Activate) implementations for the provided struct REQUEST. + /// + pub use hollywood_macros::actor_requests; + /// This macro generates the boilerplate for the inbound hub of an actor. /// /// Macro template: @@ -357,7 +384,9 @@ pub mod macros { /// /// Prerequisite: /// - The OUTBOUND struct is defined and implements [OutboundHub](crate::core::OutboundHub) - /// and [Morph](crate::core::Morph), typically using the [macro@actor_outputs] macro. + /// and [Activate](crate::core::Activate), typically using the [macro@actor_outputs] macro. + /// - The REQUEST struct is defined and implements [RequestHub](crate::core::RequestHub) and + /// [Activate](crate::core::Activate), e.g. using the [actor_requests] macro. /// - The PROP and STATE structs are defined. /// /// Effects: @@ -366,8 +395,6 @@ pub mod macros { /// [InboundHub](crate::core::InboundHub) trait for it. /// - Implements the [InboundMessage](crate::core::InboundMessage) trait for INBOUND_MESSAGE. /// - /// This is the second of three macros to define an actor. The other two are - /// [macro@actor_outputs] and [macro@actor]. pub use hollywood_macros::actor_inputs; /// This macro generates the boilerplate to define an new actor type. @@ -376,7 +403,7 @@ pub mod macros { /// /// ``` text /// #[actor(INBOUND_MESSAGE)] - /// type ACTOR = Actor; + /// type ACTOR = Actor; /// ``` /// /// Here, ACTOR is the user-specified name of the actor type. The actor type shall be defined @@ -384,7 +411,9 @@ pub mod macros { /// /// Prerequisites: /// - The OUTBOUND struct is defined and implements (OutboundHub)[crate::core::OutboundHub] - /// and [Morph](crate::core::Morph), e.g. using the [actor_outputs] macro. + /// and [Activate](crate::core::Activate), e.g. using the [actor_outputs] macro. + /// - The REQUEST struct is defined and implements [RequestHub](crate::core::RequestHub) and + /// [Activate](crate::core::Activate), e.g. using the [actor_requests] macro. /// - The INBOUND_MESSAGE enum is defined and implements /// [InboundMessage](crate::core::InboundMessage), as well as the INBOUND /// struct is defined and implements the [InboundHub](crate::core::InboundHub) trait, e.g. @@ -395,7 +424,6 @@ pub mod macros { /// - This macro implements the [FromPropState](crate::core::FromPropState) trait for the ACTOR /// type. /// - /// This is the last of the three macros that need to be used to define a new actor type. The - /// first one is [macro@actor_outputs], the second one is [macro@actor_inputs]. pub use hollywood_macros::actor; + }