Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.2.2 #3

Merged
merged 3 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "README.md"
repository = "https://github.com/farm-ng/hollywood/"
version = "0.2.1"
version = "0.2.2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
15 changes: 10 additions & 5 deletions examples/moving_average.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
use hollywood::actors::printer::PrinterProp;
use hollywood::actors::{Periodic, Printer};
use hollywood::compute::Context;
use hollywood::core::ActorFacade;
use hollywood::core::{FromPropState, NullState};

use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp};
use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState};

///
pub async fn run_moving_average_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 1.0);
let mut moving_average = MovingAverage::new_default_init_state(
let mut moving_average = MovingAverage::from_prop_and_state(
context,
MovingAverageProp {
alpha: 0.3,
..Default::default()
},
MovingAverageState {
moving_average: 0.0,
},
);
let mut time_printer = Printer::<f64>::new_default_init_state(
let mut time_printer = Printer::<f64>::from_prop_and_state(
context,
PrinterProp {
topic: "time".to_string(),
},
NullState {},
);
let mut average_printer = Printer::<f64>::new_default_init_state(
let mut average_printer = Printer::<f64>::from_prop_and_state(
context,
PrinterProp {
topic: "average".to_string(),
},
NullState {},
);
timer
.outbound
Expand Down
15 changes: 10 additions & 5 deletions examples/one_dim_robot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use hollywood::actors::Periodic;
use hollywood::actors::Printer;
use hollywood::compute::Context;
use hollywood::core::*;
use hollywood::examples::one_dim_robot::draw::DrawState;
use hollywood::examples::one_dim_robot::filter::FilterState;
use hollywood::examples::one_dim_robot::{
DrawActor, Filter, NamedFilterState, Robot, Sim, SimState, Stamped,
};

async fn run_robot_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 0.25);
let mut sim = Sim::new_with_state(
let mut sim = Sim::from_prop_and_state(
context,
NullProp {},
SimState {
Expand All @@ -22,21 +24,24 @@ async fn run_robot_example() {
},
},
);
let mut filter = Filter::new_default_init_state(context, NullProp {});
let mut filter_state_printer = Printer::<NamedFilterState>::new_default_init_state(
let mut filter = Filter::from_prop_and_state(context, NullProp {}, FilterState::default());
let mut filter_state_printer = Printer::<NamedFilterState>::from_prop_and_state(
context,
PrinterProp {
topic: "filter state".to_owned(),
},
NullState::default(),
);
let mut truth_printer = Printer::<Stamped<Robot>>::new_default_init_state(
let mut truth_printer = Printer::<Stamped<Robot>>::from_prop_and_state(
context,
PrinterProp {
topic: "truth".to_owned(),
},
NullState::default(),
);

let mut draw_actor = DrawActor::new_default_init_state(context, NullProp {});
let mut draw_actor =
DrawActor::from_prop_and_state(context, NullProp {}, DrawState::default());

timer
.outbound
Expand Down
36 changes: 36 additions & 0 deletions examples/print_ticks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use hollywood::actors::printer::PrinterProp;
use hollywood::actors::{Periodic, Printer};
use hollywood::compute::Context;
use hollywood::core::*;

///
pub async fn run_tick_print_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 1.0);
let mut time_printer = Printer::<f64>::from_prop_and_state(
context,
PrinterProp {
topic: "time".to_string(),
},
NullState::default(),
);
timer
.outbound
.time_stamp
.connect(context, &mut time_printer.inbound.printable);

});

pipeline.print_flow_graph();
pipeline.run().await;
}

fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
run_tick_print_example().await;
})
}
2 changes: 1 addition & 1 deletion hollywood_macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "../README.md"
repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros"
version = "0.2.1"
version = "0.2.2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
Expand Down
2 changes: 1 addition & 1 deletion hollywood_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
#( #attrs )*
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>;

impl ActorFacade<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
for #actor_name
{
fn name_hint(prop: &#prop) -> String {
Expand Down
42 changes: 14 additions & 28 deletions src/actors/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@ use async_trait::async_trait;

use crate::compute::context::Context;
use crate::core::{
actor::{ActorFacade, ActorNode, DormantActorNode, GenericActor},
actor::{FromPropState, ActorNode, DormantActorNode, GenericActor},
inbound::{ForwardMessage, NullInbound, NullMessage},
outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub},
runner::Runner,
value::Value,
};
use crate::macros::*;


/// Outbound hub of periodic actor, which consists of a single outbound channel.
#[actor_outputs]
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}


/// A periodic actor.
///
Expand All @@ -20,7 +31,7 @@ pub type Periodic =
impl Periodic {
/// Create a new periodic actor, with a period of `period` seconds.
pub fn new_with_period(context: &mut Context, period: f64) -> Periodic {
Periodic::new_with_state(
Periodic::from_prop_and_state(
context,
PeriodicProp {
period,
Expand All @@ -35,7 +46,7 @@ impl Periodic {
}

impl
ActorFacade<
FromPropState<
PeriodicProp,
NullInbound,
PeriodicState,
Expand Down Expand Up @@ -85,32 +96,7 @@ impl Default for PeriodicState {

impl Value for PeriodicState {}

/// Outbound hub of periodic actor, which consists of a single outbound channel.
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}

impl Morph for PeriodicOutbound {
fn extract(&mut self) -> Self {
Self {
time_stamp: self.time_stamp.extract(),
}
}

fn activate(&mut self) {
self.time_stamp.activate();
}
}

impl OutboundHub for PeriodicOutbound {
fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
Self {
time_stamp: OutboundChannel::<f64>::new(context, "time_stamp".to_owned(), actor_name),
}
}
}

/// The custom runner for the periodic actor.
pub struct PeriodicRunner {}
Expand Down
6 changes: 3 additions & 3 deletions src/actors/printer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::{Debug, Display};

use crate::core::{
Actor, ActorBuilder, DefaultRunner, ActorFacade, InboundChannel, InboundHub, InboundMessage,
Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage,
InboundMessageNew, NullOutbound, NullState, OnMessage, Value,
};

Expand Down Expand Up @@ -51,7 +51,7 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessageNew<T>
pub type Printer<T> = Actor<PrinterProp, PrinterInbound<T>, NullState, NullOutbound>;

impl<T: Clone + Sync + Send + 'static + Debug + Display + Default>
ActorFacade<
FromPropState<
PrinterProp,
PrinterInbound<T>,
NullState,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<T: Clone + Debug + Display + Default + Sync + Send + 'static>
) -> Self {
let m = InboundChannel::new(
builder.context,
actor_name.clone(),
actor_name,
&builder.sender,
PrinterInboundMessage::Printable(T::default()).inbound_channel(),
);
Expand Down
27 changes: 15 additions & 12 deletions src/compute/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ use crate::core::{
pub struct Context {
pub(crate) actors: Vec<Box<dyn DormantActorNode + Send>>,
pub(crate) topology: Topology,
pub(crate) cancel_request_request_inbound: InboundChannel<bool, CancelRequest>,
pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender<CancelRequest>,
pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver<CancelRequest>,
}

impl Context {
const CONTEXT_NAME: &str = "CONTEXT";

/// Create a new context.
///
/// This is the main entry point to configure the compute graph. The network topology is defined
Expand All @@ -33,27 +31,32 @@ impl Context {
///
/// Upon receiving a cancel request the registered outbound channel, the execution of the
/// pipeline will be stopped.
pub fn get_cancel_request_sender(&mut self) -> tokio::sync::mpsc::Sender<CancelRequest> {
self.cancel_request_sender_template.clone()
}

/// Registers an outbound channel for cancel request.
///
/// Upon receiving a cancel request the registered outbound channel, the execution of the
/// pipeline will be stopped.
pub fn register_cancel_requester(&mut self, outbound: &mut OutboundChannel<()>) {
outbound
.connection_register
.push(Arc::new(OutboundConnection {
sender: self.cancel_request_request_inbound.sender.clone(),
inbound_channel: self.cancel_request_request_inbound.name.clone(),
sender: self.cancel_request_sender_template.clone(),
inbound_channel: "CANCEL".to_string(),
phantom: PhantomData {},
}));
}


fn new() -> Self {
let (exit_request_sender, cancel_request_receiver) = tokio::sync::mpsc::channel(1);
let (cancel_request_sender_template, cancel_request_receiver) =
tokio::sync::mpsc::channel(1);
Self {
actors: vec![],
topology: Topology::new(),
cancel_request_request_inbound: InboundChannel::<bool, CancelRequest> {
name: CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL .to_owned(),
actor_name: Self::CONTEXT_NAME.to_owned(),
sender: exit_request_sender,
phantom: std::marker::PhantomData {},
},
cancel_request_sender_template,
cancel_request_receiver,
}
}
Expand Down
27 changes: 20 additions & 7 deletions src/compute/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum CancelRequest {
impl CancelRequest {
/// Unique name for cancel request inbound channel. This special inbound channel is not
/// associated with any actor but with the pipeline itself.
pub const CANCEL_REQUEST_INBOUND_CHANNEL: &str = "CANCEL";
pub const CANCEL_REQUEST_INBOUND_CHANNEL: &'static str = "CANCEL";
}

impl InboundMessage for CancelRequest {
Expand All @@ -41,6 +41,8 @@ impl InboundMessageNew<()> for CancelRequest {
pub struct Pipeline {
actors: Vec<Box<dyn ActorNode + Send>>,
topology: Topology,
/// We have this here to keep receiver alive
pub cancel_request_sender_template: Option<tokio::sync::mpsc::Sender<CancelRequest>>,
cancel_request_receiver: Option<tokio::sync::mpsc::Receiver<CancelRequest>>,
}

Expand All @@ -53,6 +55,7 @@ impl Pipeline {
let compute_graph = Pipeline {
actors: active,
topology: context.topology,
cancel_request_sender_template: Some(context.cancel_request_sender_template),
cancel_request_receiver: Some(context.cancel_request_receiver),
};
compute_graph.topology.analyze_graph_topology();
Expand Down Expand Up @@ -88,12 +91,17 @@ impl Pipeline {
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();

let h_exit = tokio::spawn(async move {
let msg = cancel_request_receiver.recv().await.unwrap();
match msg {
CancelRequest::Cancel(_) => {
match cancel_request_receiver.recv().await {
Some(msg) => {
println!("Cancel requested");

let _ = exit_tx.send(cancel_request_receiver);
match msg {
CancelRequest::Cancel(_) => {
let _ = exit_tx.send(cancel_request_receiver);
}
}
}
None => {
println!("Cancel request channel closed");
}
}
});
Expand All @@ -110,7 +118,12 @@ impl Pipeline {

handles.push(h);
}
h_exit.await.unwrap();
match h_exit.await {
Ok(_) => {}
Err(err) => {
println!("Error in cancel request handler: {}", err);
}
}
kill_sender.send(()).unwrap();
for h in handles {
h.await.unwrap();
Expand Down
Loading
Loading