Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: request reply & refactor #4

Merged
merged 1 commit into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,18 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "README.md"
repository = "https://github.com/farm-ng/hollywood/"
version = "0.2.2"
version = "0.3.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# executor feature needed
tokio = { version = "1.28.0", features = ["full"] }
tokio-stream = "0.1.14"
enum-map = "3.0.0-0.gat.0"
async-trait = "0.1.51"
rand = "0.8.4"
drawille = "0.3.0"
grid = "0.13.0"
hollywood_macros = { version = "0.3.0", path = "hollywood_macros" }
petgraph = "0.6.3"
uuid = { version = "1.3.3", features = ["v4"] }
strum_macros = "0.25"
strum = { version = "0.25", features = ["derive"] }
hollywood_macros = { version = "0.2.1", path = "hollywood_macros" }
rand = "0.8.4"
rand_distr = "0.4.3"
nalgebra = "0.32.2"
grid = "0.11.0"
drawille = "0.3.0"
# executor feature needed
tokio = { version = "1.28.0", features = ["full"] }
tokio-stream = "0.1.14"
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# hollywood
# hollywood

Hollywood is an actor framework, with focus on representing actors with heterogeneous
inputs and outputs which are arranged in a non-cyclic compute graph/pipeline. The design
intend is simplicity and minimal boilerplate code.
2 changes: 1 addition & 1 deletion examples/moving_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hollywood::actors::{Periodic, Printer};
use hollywood::compute::Context;
use hollywood::core::{FromPropState, NullState};

use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState};
use hollywood::example_actors::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState};

///
pub async fn run_moving_average_example() {
Expand Down
9 changes: 6 additions & 3 deletions examples/one_dim_robot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use hollywood::actors::Periodic;
use hollywood::actors::Printer;
use hollywood::compute::Context;
use hollywood::core::*;
use hollywood::examples::one_dim_robot::draw::DrawState;
use hollywood::examples::one_dim_robot::filter::FilterState;
use hollywood::examples::one_dim_robot::{
use hollywood::example_actors::one_dim_robot::draw::DrawState;
use hollywood::example_actors::one_dim_robot::filter::FilterState;
use hollywood::example_actors::one_dim_robot::{
DrawActor, Filter, NamedFilterState, Robot, Sim, SimState, Stamped,
};

Expand Down Expand Up @@ -63,6 +63,9 @@ async fn run_robot_example() {
sim.outbound
.true_robot
.connect(context, &mut truth_printer.inbound.printable);


sim.request.ping_pong.connect(context, &mut filter.inbound.ping_pong_request);
context.register_cancel_requester(&mut sim.outbound.cancel_request);

filter
Expand Down
7 changes: 4 additions & 3 deletions hollywood_macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "../README.md"
repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros"
version = "0.2.2"
version = "0.3.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
proc-macro = true

[dependencies]
syn = { version = "2.0.18", features = ["full"] }
quote = "1.0.9"
convert_case = "0.6.0"
quote = "1.0.9"
syn = { version = "2.0.18", features = ["full"] }

24 changes: 17 additions & 7 deletions hollywood_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
let prop_type = &args.prop_type;
let state_type = &args.state_type;
let output_type = &args.output_type;
let request_type = &args.request_type;

let inbound = fields.iter().map(|variant| {
let variant_name = variant.ident.clone();
Expand Down Expand Up @@ -197,6 +198,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
type Prop = #prop_type;
type State = #state_type;
type OutboundHub = #output_type;
type RequestHub = #request_type;

fn inbound_channel(&self) -> String {
match self {
Expand All @@ -205,9 +207,9 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
}
}

impl InboundHub<#prop_type, #state_type, #output_type, #name> for #struct_name {
impl InboundHub<#prop_type, #state_type, #output_type, #request_type,#name> for #struct_name {

fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type, #name>,
fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type,#request_type, #name>,
actor_name: &str) -> Self {
#(#from_builder_inbounds)*

Expand Down Expand Up @@ -258,6 +260,8 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
let mut maybe_inbounds = None;
let mut maybe_state = None;
let mut maybe_outputs = None;
let mut maybe_requests = None;

if let Item::Type(item_type) = inbound_clone {
if let Type::Path(type_path) = *item_type.ty {
if type_path.path.segments.last().unwrap().ident != "Actor" {
Expand All @@ -267,10 +271,10 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
}
for segment in type_path.path.segments {
if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments {
if angle_bracketed_args.args.len() != 4 {
if angle_bracketed_args.args.len() != 5 {
return Error::new_spanned(
&angle_bracketed_args,
"Expected three type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS>",
"Expected 5 type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS, REQUESTS>",
)
.to_compile_error()
.into();
Expand All @@ -279,6 +283,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
maybe_inbounds = Some(angle_bracketed_args.args[1].clone());
maybe_state = Some(angle_bracketed_args.args[2].clone());
maybe_outputs = Some(angle_bracketed_args.args[3].clone());
maybe_requests = Some(angle_bracketed_args.args[4].clone());
}
}
} else {
Expand All @@ -294,16 +299,17 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
let inbound = maybe_inbounds.unwrap();
let state_type = maybe_state.unwrap();
let out = maybe_outputs.unwrap();
let requests = maybe_requests.unwrap();

let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out> };
let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> };

let gen = quote! {

///
#( #attrs )*
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>;
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>;

impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type>
for #actor_name
{
fn name_hint(prop: &#prop) -> String {
Expand All @@ -320,6 +326,7 @@ struct ActorInbound {
prop_type: Ident,
state_type: Ident,
output_type: Ident,
request_type: Ident,
}

impl Parse for ActorInbound {
Expand All @@ -333,11 +340,14 @@ impl Parse for ActorInbound {
let state_type: Ident = content.parse()?;
let _: Token![,] = content.parse()?;
let output_type: Ident = content.parse()?;
let _: Token![,] = content.parse()?;
let request_type: Ident = content.parse()?;
Ok(ActorInbound {
struct_name,
prop_type,
state_type,
output_type,
request_type
})
}
}
File renamed without changes.
108 changes: 56 additions & 52 deletions src/actors/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::compute::context::Context;
use crate::core::connection::ConnectionEnum;

use crate::core::request::NullRequest;
use crate::core::{
actor::{FromPropState, ActorNode, DormantActorNode, GenericActor},
actor::{ActorNode, FromPropState, GenericActor},
inbound::{ForwardMessage, NullInbound, NullMessage},
outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub},
outbound::{Morph, OutboundChannel, OutboundHub},
runner::Runner,
value::Value,
};
use crate::macros::*;


/// Outbound hub of periodic actor, which consists of a single outbound channel.
#[actor_outputs]
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}


/// A periodic actor.
///
/// This is an actor that periodically sends a message to its outbound.
pub type Periodic =
GenericActor<PeriodicProp, NullInbound, PeriodicState, PeriodicOutbound, PeriodicRunner>;
pub type Periodic = GenericActor<
PeriodicProp,
NullInbound,
PeriodicState,
PeriodicOutbound,
NullRequest,
PeriodicRunner,
>;

impl Periodic {
/// Create a new periodic actor, with a period of `period` seconds.
Expand All @@ -51,7 +48,8 @@ impl
NullInbound,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
NullRequest,
PeriodicRunner,
> for Periodic
{
Expand All @@ -76,8 +74,6 @@ impl Default for PeriodicProp {
}
}

impl Value for PeriodicProp {}

/// State of the periodic actor.
#[derive(Clone, Debug)]
pub struct PeriodicState {
Expand All @@ -94,9 +90,32 @@ impl Default for PeriodicState {
}
}

impl Value for PeriodicState {}
/// Outbound hub of periodic actor, which consists of a single outbound channel.
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}

impl Morph for PeriodicOutbound {
fn extract(&mut self) -> Self {
Self {
time_stamp: self.time_stamp.extract(),
}
}

fn activate(&mut self) {
self.time_stamp.activate();
}
}

impl OutboundHub for PeriodicOutbound {
fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
Self {
time_stamp: OutboundChannel::<f64>::new(context, "time_stamp".to_owned(), actor_name),
}
}
}

/// The custom runner for the periodic actor.
pub struct PeriodicRunner {}
Expand All @@ -107,16 +126,17 @@ impl
NullInbound,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullRequest,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
> for PeriodicRunner
{
/// Create a new dormant actor.
fn new_dormant_actor(
/// Create a new actor node.
fn new_actor_node(
name: String,
prop: PeriodicProp,
state: PeriodicState,
_receiver: tokio::sync::mpsc::Receiver<
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
>,
_forward: std::collections::HashMap<
String,
Expand All @@ -125,54 +145,36 @@ impl
PeriodicProp,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullRequest,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
> + Send
+ Sync,
>,
>,
outbound: PeriodicOutbound,
) -> Box<dyn DormantActorNode + Send + Sync> {
Box::new(DormantPeriodic {
_request: NullRequest,
) -> Box<dyn ActorNode + Send + Sync> {
Box::new(PeriodicActor {
name: name.clone(),
prop,
init_state: state.clone(),
outbound,
})
}
}

/// The dormant periodic actor.
pub struct DormantPeriodic {
name: String,
prop: PeriodicProp,
init_state: PeriodicState,
outbound: PeriodicOutbound,
}

impl DormantActorNode for DormantPeriodic {
fn activate(mut self: Box<Self>) -> Box<dyn ActorNode + Send> {
self.outbound.activate();
Box::new(ActivePeriodic {
name: self.name.clone(),
prop: self.prop.clone(),
init_state: self.init_state.clone(),
state: None,
outbound: Arc::new(self.outbound),
outbound: Some(outbound),
})
}
}

/// The active periodic actor.
pub struct ActivePeriodic {
pub struct PeriodicActor {
name: String,
prop: PeriodicProp,
init_state: PeriodicState,
state: Option<PeriodicState>,
outbound: Arc<PeriodicOutbound>,
outbound: Option<PeriodicOutbound>,
}

#[async_trait]
impl ActorNode for ActivePeriodic {
impl ActorNode for PeriodicActor {
fn name(&self) -> &String {
&self.name
}
Expand All @@ -182,6 +184,8 @@ impl ActorNode for ActivePeriodic {
}

async fn run(&mut self, mut kill: tokio::sync::broadcast::Receiver<()>) {
let mut outbound = self.outbound.take().unwrap();
outbound.activate();
self.reset();

let state = self.state.as_mut().unwrap();
Expand All @@ -190,7 +194,7 @@ impl ActorNode for ActivePeriodic {
(1000.0 * self.prop.period) as u64,
));

let conns = Arc::new(self.outbound.clone());
let conns = Arc::new(outbound);

loop {
interval.tick().await;
Expand Down
Loading
Loading