diff --git a/chorus_lib/examples/bookseller.rs b/chorus_lib/examples/bookseller.rs index 39d7c20..05e3906 100644 --- a/chorus_lib/examples/bookseller.rs +++ b/chorus_lib/examples/bookseller.rs @@ -5,9 +5,9 @@ use std::thread; use chrono::NaiveDate; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation}; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector}; use chorus_lib::transport::local::LocalTransport; -use chorus_lib::{projector, LocationSet}; +use chorus_lib::LocationSet; fn get_book(title: &str) -> Option<(i32, NaiveDate)> { match title.trim() { @@ -73,9 +73,9 @@ impl Choreography for BooksellerChoreography { } fn main() { - let transport = LocalTransport::from(&[Seller::name(), Buyer::name()]); - let seller_projector = projector!(LocationSet!(Buyer, Seller), Seller, transport.clone()); - let buyer_projector = projector!(LocationSet!(Buyer, Seller), Buyer, transport.clone()); + let transport = LocalTransport::::new(); + let seller_projector = Projector::new(Seller, transport.clone()); + let buyer_projector = Projector::new(Buyer, transport.clone()); let mut handles: Vec> = Vec::new(); handles.push(thread::spawn(move || { diff --git a/chorus_lib/examples/bookseller2.rs b/chorus_lib/examples/bookseller2.rs index ca6f68f..3355fda 100644 --- a/chorus_lib/examples/bookseller2.rs +++ b/chorus_lib/examples/bookseller2.rs @@ -3,11 +3,11 @@ extern crate chorus_lib; use std::thread; use std::{collections::HashMap, sync::Arc}; +use chorus_lib::LocationSet; use chorus_lib::{ - core::{ChoreoOp, Choreography, ChoreographyLocation, Located}, + core::{ChoreoOp, Choreography, ChoreographyLocation, Located, Projector}, transport::local::LocalTransport, }; -use chorus_lib::{projector, LocationSet}; use chrono::NaiveDate; #[derive(ChoreographyLocation)] @@ -142,22 +142,10 @@ fn main() { i }; - let transport = LocalTransport::from(&[Seller::name(), Buyer1::name(), Buyer2::name()]); - let seller_projector = Arc::new(projector!( - LocationSet!(Seller, Buyer1, Buyer2), - Seller, - transport.clone() - )); - let buyer1_projector = Arc::new(projector!( - LocationSet!(Seller, Buyer1, Buyer2), - Buyer1, - transport.clone() - )); - let buyer2_projector = Arc::new(projector!( - LocationSet!(Seller, Buyer1, Buyer2), - Buyer2, - transport.clone() - )); + let transport = LocalTransport::::new(); + let seller_projector = Arc::new(Projector::new(Seller, transport.clone())); + let buyer1_projector = Arc::new(Projector::new(Buyer1, transport.clone())); + let buyer2_projector = Arc::new(Projector::new(Buyer2, transport.clone())); println!("Tries to buy HoTT with one buyer"); type OneBuyerBooksellerChoreography = BooksellerChoreography; diff --git a/chorus_lib/examples/hello.rs b/chorus_lib/examples/hello.rs index bac0aff..dd2a416 100644 --- a/chorus_lib/examples/hello.rs +++ b/chorus_lib/examples/hello.rs @@ -2,9 +2,9 @@ extern crate chorus_lib; use std::thread; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation}; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector}; use chorus_lib::transport::local::LocalTransport; -use chorus_lib::{projector, LocationSet}; +use chorus_lib::LocationSet; // --- Define two locations (Alice and Bob) --- @@ -40,20 +40,20 @@ impl Choreography for HelloWorldChoreography { fn main() { let mut handles: Vec> = Vec::new(); // Create a local transport - let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); + let transport = LocalTransport::::new(); // Run the choreography in two threads { let transport = transport.clone(); handles.push(thread::spawn(move || { - let p = projector!(LocationSet!(Alice, Bob), Alice, transport); + let p = Projector::new(Alice, transport); p.epp_and_run(HelloWorldChoreography); })); } { let transport = transport.clone(); handles.push(thread::spawn(move || { - let p = projector!(LocationSet!(Alice, Bob), Bob, transport); + let p = Projector::new(Bob, transport); p.epp_and_run(HelloWorldChoreography); })); } diff --git a/chorus_lib/examples/loc-poly.rs b/chorus_lib/examples/loc-poly.rs index 88a56a9..2ee491f 100644 --- a/chorus_lib/examples/loc-poly.rs +++ b/chorus_lib/examples/loc-poly.rs @@ -2,9 +2,11 @@ extern crate chorus_lib; use std::fmt::Debug; use std::thread; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Located, Portable}; +use chorus_lib::core::{ + ChoreoOp, Choreography, ChoreographyLocation, Located, Portable, Projector, +}; use chorus_lib::transport::local::LocalTransport; -use chorus_lib::{projector, LocationSet}; +use chorus_lib::LocationSet; #[derive(ChoreographyLocation)] struct Alice; @@ -55,12 +57,12 @@ impl Choreography> for MainChoreography { } fn main() { - let transport = LocalTransport::from(&[Alice::name(), Bob::name(), Carol::name()]); + let transport = LocalTransport::::new(); let mut handles = vec![]; { let transport = transport.clone(); handles.push(thread::spawn(|| { - let p = projector!(LocationSet!(Alice, Bob), Alice, transport); + let p = Projector::new(Alice, transport); let v = p.epp_and_run(MainChoreography); assert_eq!(p.unwrap(v), 110); })); @@ -68,7 +70,7 @@ fn main() { { let transport = transport.clone(); handles.push(thread::spawn(|| { - let p = projector!(LocationSet!(Alice, Bob), Bob, transport); + let p = Projector::new(Bob, transport); p.epp_and_run(MainChoreography); })); } diff --git a/chorus_lib/examples/tic-tac-toe.rs b/chorus_lib/examples/tic-tac-toe.rs index a92da03..581d86b 100644 --- a/chorus_lib/examples/tic-tac-toe.rs +++ b/chorus_lib/examples/tic-tac-toe.rs @@ -2,13 +2,15 @@ extern crate chorus_lib; use chorus_lib::{ - core::{ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Located, Serialize}, - projector, + core::{ + ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Located, Projector, Serialize, + }, + http_config, transport::http::HttpTransport, LocationSet, }; use clap::Parser; -use std::{collections::HashMap, io::Write}; +use std::io::Write; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; #[derive(Serialize, Deserialize, Debug)] @@ -292,28 +294,32 @@ fn main() { match args.player { 'X' => { - let mut config = HashMap::new(); - config.insert(PlayerX::name(), (args.hostname.as_str(), args.port)); - config.insert( - PlayerO::name(), - (args.opponent_hostname.as_str(), args.opponent_port), - ); - let transport = HttpTransport::new(PlayerX::name(), &config); - let projector = projector!(LocationSet!(PlayerX, PlayerO), PlayerX, transport); + // let mut config = HttpConfig::::new(); + // config.insert(PlayerX, (args.hostname.as_str(), args.port)); + // config.insert( + // PlayerO, + // (args.opponent_hostname.as_str(), args.opponent_port), + // ); + let config = http_config!(PlayerX: (args.hostname.as_str(), args.port), + PlayerO: (args.opponent_hostname.as_str(), args.opponent_port)); + let transport = HttpTransport::new(PlayerX, &config); + let projector = Projector::new(PlayerX, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.local(brain), brain_for_o: projector.remote(PlayerO), }); } 'O' => { - let mut config = HashMap::new(); - config.insert(PlayerO::name(), (args.hostname.as_str(), args.port)); - config.insert( - PlayerX::name(), - (args.opponent_hostname.as_str(), args.opponent_port), - ); - let transport = HttpTransport::new(PlayerO::name(), &config); - let projector = projector!(LocationSet!(PlayerX, PlayerO), PlayerO, transport); + // let mut config = HttpConfig::::new(); + // config.insert(PlayerO, (args.hostname.as_str(), args.port)); + // config.insert( + // PlayerX, + // (args.opponent_hostname.as_str(), args.opponent_port), + // ); + let config = http_config!(PlayerO: (args.hostname.as_str(), args.port), + PlayerX: (args.opponent_hostname.as_str(), args.opponent_port)); + let transport = HttpTransport::new(PlayerO, &config); + let projector = Projector::new(PlayerO, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.remote(PlayerX), brain_for_o: projector.local(brain), diff --git a/chorus_lib/src/core.rs b/chorus_lib/src/core.rs index d59bf41..e7a985e 100644 --- a/chorus_lib/src/core.rs +++ b/chorus_lib/src/core.rs @@ -97,8 +97,11 @@ pub trait HList { fn to_string_list() -> Vec<&'static str>; } /// end of HList +#[derive(Clone)] pub struct HNil; /// An element of HList + +#[derive(Clone)] pub struct HCons(Head, Tail); impl HList for HNil { @@ -125,7 +128,7 @@ where macro_rules! LocationSet { () => { $crate::core::HNil }; ($head:ty $(,)*) => { $crate::core::HCons<$head, $crate::core::HNil> }; - ($head:ty, $($tail:tt)*) => { $crate::core::HCons<$head, LocationSet!($($tail)*)> }; + ($head:ty, $($tail:tt)*) => { $crate::core::HCons<$head, $crate::LocationSet!($($tail)*)> }; } /// Marker @@ -261,7 +264,7 @@ pub trait Choreography { /// Provides methods to send and receive messages. /// /// The trait provides methods to send and receive messages between locations. Implement this trait to define a custom transport. -pub trait Transport { +pub trait Transport { /// Returns a list of locations. fn locations(&self) -> Vec; /// Sends a message from `from` to `to`. @@ -271,7 +274,7 @@ pub trait Transport { } /// Provides a method to perform end-point projection. -pub struct Projector +pub struct Projector, Index> where L1: Member, { @@ -281,15 +284,7 @@ where index: PhantomData, } -/// Macro to make Projector -#[macro_export] -macro_rules! projector { - ($al_type:ty, $target:expr, $transport:expr) => { - $crate::core::Projector::<$al_type, _, _, _>::new($target, $transport) - }; -} - -impl Projector +impl, Index> Projector where L1: Member, { @@ -338,13 +333,16 @@ where where L: Subset, { - struct EppOp<'a, L: HList, L1: ChoreographyLocation, B: Transport> { + struct EppOp<'a, L: HList, L1: ChoreographyLocation, LS: HList, B: Transport> { target: PhantomData, transport: &'a B, locations: Vec, marker: PhantomData, + location_set: PhantomData, } - impl<'a, L: HList, T: ChoreographyLocation, B: Transport> ChoreoOp for EppOp<'a, L, T, B> { + impl<'a, L: HList, T: ChoreographyLocation, LS: HList, B: Transport> ChoreoOp + for EppOp<'a, L, T, LS, B> + { fn locally( &self, _location: L1, @@ -406,11 +404,12 @@ where where M: HList + Subset, { - let op: EppOp<'a, M, T, B> = EppOp { + let op: EppOp<'a, M, T, LS, B> = EppOp { target: PhantomData::, transport: &self.transport, locations: self.transport.locations(), marker: PhantomData::, + location_set: PhantomData::, }; choreo.run(&op) } @@ -429,6 +428,7 @@ where transport: self.transport, locations: locs_vec.clone(), marker: PhantomData::, + location_set: PhantomData::, }; return choreo.run(&op); } @@ -436,11 +436,12 @@ where R::remote() } } - let op: EppOp<'a, L, L1, B> = EppOp { + let op: EppOp<'a, L, L1, LS, B> = EppOp { target: PhantomData::, transport: &self.transport, locations: self.transport.locations(), marker: PhantomData::, + location_set: PhantomData::, }; choreo.run(&op) } diff --git a/chorus_lib/src/transport/http.rs b/chorus_lib/src/transport/http.rs index ef98f84..4406444 100644 --- a/chorus_lib/src/transport/http.rs +++ b/chorus_lib/src/transport/http.rs @@ -3,6 +3,8 @@ use std::thread; use std::{collections::HashMap, sync::Arc}; +use core::marker::PhantomData; + use retry::{ delay::{jitter, Fixed}, retry, @@ -11,31 +13,72 @@ use tiny_http::Server; use ureq::{Agent, AgentBuilder}; use crate::{ - core::{Portable, Transport}, + core::{ChoreographyLocation, HList, Member, Portable, Transport}, utils::queue::BlockingQueue, }; /// The header name for the source location. const HEADER_SRC: &str = "X-CHORUS-SOURCE"; +/// A wrapper for HashMap +#[derive(Clone)] +pub struct HttpConfig { + info: HashMap, + location_set: PhantomData, +} + +impl HttpConfig { + /// Creates a new `HttpConfig`. + pub fn new() -> Self { + Self { + info: HashMap::new(), + location_set: PhantomData, + } + } + + /// Inserts new information about a location into the config. + pub fn insert(&mut self, _loc: C, (host, port): (&str, u16)) + where + C: Member, + { + self.info + .insert(C::name().to_string(), (host.to_string(), port)); + } +} + +/// This macro makes a `HttpConfig`. +#[macro_export] +macro_rules! http_config { + ( $( $loc:ident : ( $host:expr, $port:expr ) ),* $(,)? ) => { + { + let mut config = $crate::transport::http::HttpConfig::<$crate::LocationSet!($( $loc ),*)>::new(); + $( + config.insert($loc, ($host, $port)); + )* + config + } + }; +} + /// The HTTP transport. -pub struct HttpTransport { +pub struct HttpTransport { config: HashMap, agent: Agent, queue_map: Arc>>, server: Arc, join_handle: Option>, + location_set: PhantomData, } -impl HttpTransport { +impl HttpTransport { /// Creates a new `HttpTransport` instance from the projection target and a configuration. - pub fn new(at: &'static str, config: &HashMap<&str, (&str, u16)>) -> Self { - let config = HashMap::from_iter( - config - .iter() - .map(|(k, (hostname, port))| (k.to_string(), (hostname.to_string(), *port))), - ); - let locs = Vec::from_iter(config.keys().map(|s| s.clone())); + pub fn new(_loc: C, http_config: &HttpConfig) -> Self + where + C: Member, + { + let info = &http_config.info; + let at = C::name(); + let locs = Vec::from_iter(info.keys().map(|s| s.clone())); let queue_map = { let mut m = HashMap::new(); @@ -45,7 +88,7 @@ impl HttpTransport { Arc::new(m) }; - let (hostname, port) = config.get(at).unwrap(); + let (hostname, port) = info.get(at).unwrap(); let server = Arc::new(Server::http(format!("{}:{}", hostname, port)).unwrap()); let join_handle = Some({ let server = server.clone(); @@ -80,23 +123,24 @@ impl HttpTransport { let agent = AgentBuilder::new().build(); Self { - config, + config: info.clone(), agent, queue_map, join_handle, server, + location_set: PhantomData, } } } -impl Drop for HttpTransport { +impl Drop for HttpTransport { fn drop(&mut self) { self.server.unblock(); self.join_handle.take().map(thread::JoinHandle::join); } } -impl Transport for HttpTransport { +impl Transport for HttpTransport { fn locations(&self) -> Vec { Vec::from_iter(self.config.keys().map(|s| s.clone())) } @@ -136,23 +180,26 @@ mod tests { #[test] fn test_http_transport() { let v = 42; - let mut config = HashMap::new(); + let (signal, wait) = mpsc::channel::<()>(); - config.insert(Alice::name(), ("localhost", 9010)); - config.insert(Bob::name(), ("localhost", 9011)); + let config = http_config!( + Alice: ("localhost", 9010), + Bob: ("localhost", 9011) + ); + let mut handles = Vec::new(); { let config = config.clone(); handles.push(thread::spawn(move || { wait.recv().unwrap(); // wait for Bob to start - let transport = HttpTransport::new(Alice::name(), &config); + let transport = HttpTransport::new(Alice, &config); transport.send::(Alice::name(), Bob::name(), &v); })); } { let config = config.clone(); handles.push(thread::spawn(move || { - let transport = HttpTransport::new(Bob::name(), &config); + let transport = HttpTransport::new(Bob, &config); signal.send(()).unwrap(); let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); @@ -166,16 +213,19 @@ mod tests { #[test] fn test_http_transport_retry() { let v = 42; - let mut config = HashMap::new(); let (signal, wait) = mpsc::channel::<()>(); - config.insert(Alice::name(), ("localhost", 9020)); - config.insert(Bob::name(), ("localhost", 9021)); + + let config = http_config!( + Alice: ("localhost", 9020), + Bob: ("localhost", 9021) + ); + let mut handles = Vec::new(); { let config = config.clone(); handles.push(thread::spawn(move || { signal.send(()).unwrap(); - let transport = HttpTransport::new(Alice::name(), &config); + let transport = HttpTransport::new(Alice, &config); transport.send::(Alice::name(), Bob::name(), &v); })); } @@ -185,7 +235,7 @@ mod tests { // wait for Alice to start, which forces Alice to retry wait.recv().unwrap(); sleep(Duration::from_millis(100)); - let transport = HttpTransport::new(Bob::name(), &config); + let transport = HttpTransport::new(Bob, &config); let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); })); diff --git a/chorus_lib/src/transport/local.rs b/chorus_lib/src/transport/local.rs index ac08261..154cfdc 100644 --- a/chorus_lib/src/transport/local.rs +++ b/chorus_lib/src/transport/local.rs @@ -5,7 +5,9 @@ use std::sync::Arc; use serde_json; -use crate::core::{Portable, Transport}; +use core::marker::PhantomData; + +use crate::core::{HList, Portable, Transport}; use crate::utils::queue::BlockingQueue; type QueueMap = HashMap>>; @@ -16,34 +18,38 @@ type QueueMap = HashMap>>; /// /// Unlike network-based transports, all locations must share the same `LocalTransport` instance. The struct implements `Clone` so that it can be shared across threads. #[derive(Clone)] -pub struct LocalTransport { +pub struct LocalTransport { internal_locations: Vec, queue_map: Arc, + location_set: PhantomData, } -impl LocalTransport { +impl LocalTransport { /// Creates a new `LocalTransport` instance from a list of locations. - pub fn from(locations: &[&str]) -> Self { + pub fn new() -> Self { let mut queue_map: QueueMap = HashMap::new(); - for sender in locations.clone() { + let locations_list = L::to_string_list(); + + for sender in locations_list.clone() { let mut n = HashMap::new(); - for receiver in locations.clone() { + for receiver in locations_list.clone() { n.insert(receiver.to_string(), BlockingQueue::new()); } queue_map.insert(sender.to_string(), n); } let mut locations_vec = Vec::new(); - for loc in locations.clone() { + for loc in locations_list.clone() { locations_vec.push(loc.to_string()); } LocalTransport { queue_map: Arc::new(queue_map), internal_locations: locations_vec, + location_set: PhantomData, } } } -impl Transport for LocalTransport { +impl Transport for LocalTransport { fn locations(&self) -> Vec { return self.internal_locations.clone(); } @@ -79,7 +85,7 @@ mod tests { #[test] fn test_local_transport() { let v = 42; - let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); + let transport = LocalTransport::::new(); let mut handles = Vec::new(); { let transport = transport.clone();