diff --git a/chorus_book/src/guide-input-and-output.md b/chorus_book/src/guide-input-and-output.md index 25bd8ea..82a1954 100644 --- a/chorus_book/src/guide-input-and-output.md +++ b/chorus_book/src/guide-input-and-output.md @@ -43,7 +43,7 @@ let choreo = DemoChoreography { input: "World".to_string(), }; -let projector = projector!(LocationSet!(Alice), Alice, transport); +let projector = Projector::new(Alice, alice_transport); projector.epp_and_run(choreo); ``` @@ -93,7 +93,7 @@ To run the sample choreography above at Alice, we use the `local` method to cons # }); # } # } -let projector_for_alice = projector!(LocationSet!(Alice), Alice, transport); +let projector_for_alice = Projector::new(Alice, alice_transport); // Because the target of the projector is Alice, the located value is available at Alice. let string_at_alice: Located = projector_for_alice.local("Hello, World!".to_string()); // Instantiate the choreography with the located value @@ -120,7 +120,7 @@ For Bob, we use the `remote` method to construct the located value. # }); # } # } -let projector_for_bob = projector!(LocationSet!(Alice, Bob), Bob, transport); +let projector_for_bob = Projector::new(Bob, bob_transport); // Construct a remote located value at Alice. The actual value is not required. let string_at_alice = projector_for_bob.remote(Alice); // Instantiate the choreography with the located value @@ -161,7 +161,7 @@ impl Choreography for DemoChoreography { # } # } let choreo = DemoChoreography; -let projector = projector!(LocationSet!(Alice), Alice, transport); +let projector = Projector::new(Alice, alice_transport); let output = projector.epp_and_run(choreo); assert_eq!(output, "Hello, World!".to_string()); ``` @@ -183,7 +183,7 @@ impl Choreography> for DemoChoreography { } } -let projector = projector!(LocationSet!(Alice), Alice, transport); +let projector = Projector::new(Alice, alice_transport); let output = projector.epp_and_run(DemoChoreography); let string_at_alice = projector.unwrap(output); assert_eq!(string_at_alice, "Hello, World!".to_string()); diff --git a/chorus_book/src/guide-projector.md b/chorus_book/src/guide-projector.md index d8abf63..0a84c40 100644 --- a/chorus_book/src/guide-projector.md +++ b/chorus_book/src/guide-projector.md @@ -4,21 +4,20 @@ Projector is responsible for performing the end-point projection and executing t ## Creating a Projector -To create a `Projector`, you need to provide the set of locations it can work with, the target location, and the transport. You should use the `projector!` macro instead of directly instantiating a Projector. +To create a `Projector`, you need to provide the target location and the transport. ```rust # extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; # use chorus_lib::core::{ChoreographyLocation, Projector}; -# use chorus_lib::{LocationSet, projector}; -# let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); +# use chorus_lib::{LocationSet}; +# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); +# let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] # struct Bob; -# - -let projector = projector!(LocationSet!(Alice, Bob), Alice, transport); +let projector = Projector::new(Alice, alice_transport); ``` Notice that the `Projector` is parameterized by its target location type. You will need one projector for each location to execute choreography. @@ -29,10 +28,11 @@ To execute a choreography, you need to call the `epp_and_run` method on the `Pro ```rust # extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; # use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp}; -# use chorus_lib::{LocationSet, projector}; -# let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); +# use chorus_lib::{LocationSet}; +# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); +# let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] @@ -43,35 +43,8 @@ To execute a choreography, you need to call the `epp_and_run` method on the `Pro # fn run(self, op: &impl ChoreoOp) { # } # } -# - -# let projector = projector!(LocationSet!(Alice), Alice, transport); +# let projector = Projector::new(Alice, alice_transport); projector.epp_and_run(HelloWorldChoreography); ``` If the choreography has a return value, the `epp_and_run` method will return the value. We will discuss the return values in the [Input and Output](./guide-input-and-output.md) section. - -### Note on the location set of the Choreography - -Keep in mind that when calling `epp_and_run`, you will get a compile error if the location set of the `Choreography` is not a subset of the location set of the `Projector`. In other words, the `Projector` should be allowed to do end-point projection into every `ChoreographyLocation` that `Choreography` can talk about. So this will fail: - -```rust, compile_fail -# extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; -# use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp}; -# use chorus_lib::{LocationSet, projector}; -# let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); -# #[derive(ChoreographyLocation)] -# struct Alice; -# #[derive(ChoreographyLocation)] -# struct Bob; -struct HelloWorldChoreography; -impl Choreography for HelloWorldChoreography { - type L = LocationSet!(Alice, Bob); - fn run(self, op: &impl ChoreoOp) { - } -} - -let projector = projector!(LocationSet!(Alice), Alice, transport); -projector.epp_and_run(HelloWorldChoreography); -``` diff --git a/chorus_book/src/guide-transport.md b/chorus_book/src/guide-transport.md index 310a533..504030e 100644 --- a/chorus_book/src/guide-transport.md +++ b/chorus_book/src/guide-transport.md @@ -8,25 +8,46 @@ ChoRus provides two built-in transports: `local` and `http`. ### The Local Transport -The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. - -To use the `local` transport, import the `LocalTransport` struct from the `chorus_lib` crate. +The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. Each `local` transport is defined over `LocalTransportChannel`, which contains the set of `ChoreographyLocation` that the `local` transport operates on. You can build a `LocalTransportChannel` by importing the `LocalTransportChannel` struct from the `chorus_lib` crate. ```rust # extern crate chorus_lib; -use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::core::{ChoreographyLocation}; +# use chorus_lib::{LocationSet}; +# #[derive(ChoreographyLocation)] +# struct Alice; +# #[derive(ChoreographyLocation)] +# struct Bob; +use chorus_lib::transport::local::LocalTransportChannel; + +let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); ``` -You can construct a `LocalTransport` instance by passing a slice of locations to the `from` method. +To use the `local` transport, first import the `LocalTransport` struct from the `chorus_lib` crate. + +Then build the transport by using the `LocalTransport::new` associated function, which takes a target location (explained in the [Projector section](./guide-projector.md)) and the `LocalTransportChannel`. -Because of the nature of the `Local` transport, you must use the same `LocalTransport` instance for all locations. You can `clone` the `LocalTransport` instance and pass it to the threads. +```rust +# extern crate chorus_lib; +# use chorus_lib::core::{ChoreographyLocation}; +# use chorus_lib::{LocationSet}; +# #[derive(ChoreographyLocation)] +# struct Alice; +# use chorus_lib::transport::local::LocalTransportChannel; +# let transport_channel = LocalTransportChannel::new().with(Alice); +use chorus_lib::transport::local::{LocalTransport}; + +let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); +``` + +Because of the nature of the `Local` transport, you must use the same `LocalTransportChannel` instance for all locations. You can `clone` the `LocalTransprotChannel` instance and pass it to each `Projector::new` constructor. ```rust # extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; # use std::thread; # use chorus_lib::core::{ChoreographyLocation, ChoreoOp, Choreography, Projector}; -# use chorus_lib::{LocationSet, projector}; +# use chorus_lib::{LocationSet}; # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] @@ -37,23 +58,21 @@ Because of the nature of the `Local` transport, you must use the same `LocalTran # fn run(self, op: &impl ChoreoOp) { # } # } - - +let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); let mut handles: Vec> = Vec::new(); -let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); { - // create a clone for Alice - let transport = transport.clone(); + // create a transport for Alice + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { - let p = projector!(LocationSet!(Alice, Bob), Alice, transport); + let p = Projector::new(Alice, transport); p.epp_and_run(HelloWorldChoreography); })); } { // create another for Bob - let transport = transport.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { - let p = projector!(LocationSet!(Alice, Bob), Bob, transport); + let p = Projector::new(Bob, transport); p.epp_and_run(HelloWorldChoreography); })); } @@ -63,27 +82,64 @@ let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); The `http` transport is used to execute choreographies on different machines. This is useful for executing choreographies in a distributed system. -To use the `http` transport, import the `HttpTransport` struct from the `chorus_lib` crate. +To use the `http` transport, import the `HttpTransport` struct and the `HttpTransportConfig` type alias from the `chorus_lib` crate. ```rust # extern crate chorus_lib; -use chorus_lib::transport::http::HttpTransport; +use chorus_lib::transport::http::{HttpTransport, HttpTransportConfig}; ``` -The `new` constructor takes the name of the projection target and "configuration" of type `std::collections::HashMap<&'static str, (&'static str, u32)>`. The configuration is a map from location names to the hostname and port of the location. +The primary constructor requires an argument of type `HttpTransportConfig`. To create an instance of this configuration, start with `HttpTransportConfig::for_target(target_location, (hostname, port))`. It will create set a projection target and the hostname and port to listen on. Then, provide information to connect to other locations by method-chaining the `.with(other_location, (hostname, port))` method. You can think of `HttpTransportConfig` as a mapping from locations to their hostnames and ports. ```rust {{#include ./header.txt}} -# use chorus_lib::transport::http::HttpTransport; -# use std::collections::HashMap; -let mut config = HashMap::new(); -config.insert(Alice::name(), ("localhost", 8080)); -config.insert(Bob::name(), ("localhost", 8081)); -let transport = HttpTransport::new(Alice::name(), &config); +# use chorus_lib::transport::http::{HttpTransport, HttpTransportConfig}; +let config = HttpTransportConfig::for_target(Alice, ("localhost".to_string(), 8080)) + .with(Bob, ("localhost".to_string(), 8081)); + +let transport = HttpTransport::new(config); ``` In the above example, the transport will start the HTTP server on port 8080 on localhost. If Alice needs to send a message to Bob, it will use `http://localhost:8081` as the destination. ## Creating a Custom Transport -You can also create your own transport by implementing the `Transport` trait. See the API documentation for more details. +You can also create your own transport by implementing the `Transport` trait. It might be helpful first build a `TransportConfig` to have the the information that you need for each `ChoreographyLocation`, and then have a constructor that takes the `TransportConfig` and builds the `Transport` based on it. While the syntax is similar to `HttpTransportConfig`, the type of information for each `ChoreographyLocation` might diverge from the `(host_name, port)` format presented here. In some cases, the `target_information` could even have a different type than the following `other_location_information` types. But all the `other_location_information`s should have the same type. + +```rust +{{#include ./header.txt}} +# use chorus_lib::transport::TransportConfig; +let config = TransportConfig::for_target(Alice, ()) + .with(Bob, ("localhost".to_string(), 8081)) + .with(Carol, ("localhost".to_string(), 8082)); +``` + +See the API documentation for more details. + + +### Note on the location set of the Choreography + +Note that when calling `epp_and_run` on a `Projector`, you will get a compile error if the location set of the `Choreography` is not a subset of the location set of the `Transport`. In other words, the `Transport` should have information about every `ChoreographyLocation` that `Choreography` can talk about. So this will fail: + +```rust, compile_fail +# extern crate chorus_lib; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; +# use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp}; +# use chorus_lib::{LocationSet}; + +# #[derive(ChoreographyLocation)] +# struct Alice; +# #[derive(ChoreographyLocation)] +# struct Bob; +struct HelloWorldChoreography; +impl Choreography for HelloWorldChoreography { + type L = LocationSet!(Alice, Bob); + fn run(self, op: &impl ChoreoOp) { + } +} + +let transport_channel = LocalTransportChannel::::new(); +let transport = LocalTransport::new(Alice, transport_channel.clone()); +let projector = Projector::new(Alice, transport); +projector.epp_and_run(HelloWorldChoreography); +``` diff --git a/chorus_book/src/header.txt b/chorus_book/src/header.txt index 2115638..7231c30 100644 --- a/chorus_book/src/header.txt +++ b/chorus_book/src/header.txt @@ -1,11 +1,14 @@ # extern crate chorus_lib; # use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector, Located, Superposition, Runner}; -# use chorus_lib::transport::local::LocalTransport; -# use chorus_lib::{LocationSet, projector}; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; +# use chorus_lib::{LocationSet}; # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] # struct Bob; # #[derive(ChoreographyLocation)] # struct Carol; -# let transport = LocalTransport::from(&[Alice::name(), Bob::name(), Carol::name()]); +# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob).with(Carol); +# let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); +# let bob_transport = LocalTransport::new(Bob, transport_channel.clone()); +# let carol_transport = LocalTransport::new(Carol, transport_channel.clone()); \ No newline at end of file diff --git a/chorus_lib/examples/bookseller.rs b/chorus_lib/examples/bookseller.rs index 39d7c20..7889aa6 100644 --- a/chorus_lib/examples/bookseller.rs +++ b/chorus_lib/examples/bookseller.rs @@ -5,9 +5,9 @@ use std::thread; use chrono::NaiveDate; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation}; -use chorus_lib::transport::local::LocalTransport; -use chorus_lib::{projector, LocationSet}; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector}; +use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; +use chorus_lib::LocationSet; fn get_book(title: &str) -> Option<(i32, NaiveDate)> { match title.trim() { @@ -73,9 +73,12 @@ impl Choreography for BooksellerChoreography { } fn main() { - let transport = LocalTransport::from(&[Seller::name(), Buyer::name()]); - let seller_projector = projector!(LocationSet!(Buyer, Seller), Seller, transport.clone()); - let buyer_projector = projector!(LocationSet!(Buyer, Seller), Buyer, transport.clone()); + let transport_channel = LocalTransportChannel::new().with(Seller).with(Buyer); + let transport_seller = LocalTransport::new(Seller, transport_channel.clone()); + let transport_buyer = LocalTransport::new(Buyer, transport_channel.clone()); + + let seller_projector = Projector::new(Seller, transport_seller); + let buyer_projector = Projector::new(Buyer, transport_buyer); let mut handles: Vec> = Vec::new(); handles.push(thread::spawn(move || { diff --git a/chorus_lib/examples/bookseller2.rs b/chorus_lib/examples/bookseller2.rs index ca6f68f..729d5af 100644 --- a/chorus_lib/examples/bookseller2.rs +++ b/chorus_lib/examples/bookseller2.rs @@ -1,13 +1,14 @@ extern crate chorus_lib; +use std::collections::HashMap; +use std::sync::Arc; use std::thread; -use std::{collections::HashMap, sync::Arc}; +use chorus_lib::LocationSet; use chorus_lib::{ - core::{ChoreoOp, Choreography, ChoreographyLocation, Located}, - transport::local::LocalTransport, + core::{ChoreoOp, Choreography, ChoreographyLocation, Located, Projector}, + transport::local::{LocalTransport, LocalTransportChannel}, }; -use chorus_lib::{projector, LocationSet}; use chrono::NaiveDate; #[derive(ChoreographyLocation)] @@ -142,21 +143,22 @@ fn main() { i }; - let transport = LocalTransport::from(&[Seller::name(), Buyer1::name(), Buyer2::name()]); - let seller_projector = Arc::new(projector!( - LocationSet!(Seller, Buyer1, Buyer2), + let transport_channel = LocalTransportChannel::new() + .with(Seller) + .with(Buyer1) + .with(Buyer2); + + let seller_projector = Arc::new(Projector::new( Seller, - transport.clone() + LocalTransport::new(Seller, transport_channel.clone()), )); - let buyer1_projector = Arc::new(projector!( - LocationSet!(Seller, Buyer1, Buyer2), + let buyer1_projector = Arc::new(Projector::new( Buyer1, - transport.clone() + LocalTransport::new(Buyer1, transport_channel.clone()), )); - let buyer2_projector = Arc::new(projector!( - LocationSet!(Seller, Buyer1, Buyer2), + let buyer2_projector = Arc::new(Projector::new( Buyer2, - transport.clone() + LocalTransport::new(Buyer2, transport_channel.clone()), )); println!("Tries to buy HoTT with one buyer"); diff --git a/chorus_lib/examples/hello.rs b/chorus_lib/examples/hello.rs index bac0aff..ba8bcfd 100644 --- a/chorus_lib/examples/hello.rs +++ b/chorus_lib/examples/hello.rs @@ -2,9 +2,9 @@ extern crate chorus_lib; use std::thread; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation}; -use chorus_lib::transport::local::LocalTransport; -use chorus_lib::{projector, LocationSet}; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector}; +use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; +use chorus_lib::LocationSet; // --- Define two locations (Alice and Bob) --- @@ -39,21 +39,23 @@ impl Choreography for HelloWorldChoreography { fn main() { let mut handles: Vec> = Vec::new(); - // Create a local transport - let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); - + // Create a transport channel + // let transport_channel = LocalTransportChannel::::new(); + let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); // Run the choreography in two threads { - let transport = transport.clone(); + // let transport_channel = transport_channel.clone(); + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { - let p = projector!(LocationSet!(Alice, Bob), Alice, transport); + let p = Projector::new(Alice, transport); p.epp_and_run(HelloWorldChoreography); })); } { - let transport = transport.clone(); + // let transport_channel = transport_channel.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { - let p = projector!(LocationSet!(Alice, Bob), Bob, transport); + let p = Projector::new(Bob, transport); p.epp_and_run(HelloWorldChoreography); })); } diff --git a/chorus_lib/examples/loc-poly.rs b/chorus_lib/examples/loc-poly.rs index 88a56a9..e6b8e0b 100644 --- a/chorus_lib/examples/loc-poly.rs +++ b/chorus_lib/examples/loc-poly.rs @@ -2,9 +2,11 @@ extern crate chorus_lib; use std::fmt::Debug; use std::thread; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Located, Portable}; -use chorus_lib::transport::local::LocalTransport; -use chorus_lib::{projector, LocationSet}; +use chorus_lib::core::{ + ChoreoOp, Choreography, ChoreographyLocation, Located, Portable, Projector, +}; +use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; +use chorus_lib::LocationSet; #[derive(ChoreographyLocation)] struct Alice; @@ -55,20 +57,24 @@ impl Choreography> for MainChoreography { } fn main() { - let transport = LocalTransport::from(&[Alice::name(), Bob::name(), Carol::name()]); + let transport_channel = LocalTransportChannel::new() + .with(Alice) + .with(Bob) + .with(Carol); + let mut handles = vec![]; { - let transport = transport.clone(); + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(|| { - let p = projector!(LocationSet!(Alice, Bob), Alice, transport); + let p = Projector::new(Alice, transport); let v = p.epp_and_run(MainChoreography); assert_eq!(p.unwrap(v), 110); })); } { - let transport = transport.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(|| { - let p = projector!(LocationSet!(Alice, Bob), Bob, transport); + let p = Projector::new(Bob, transport); p.epp_and_run(MainChoreography); })); } diff --git a/chorus_lib/examples/tic-tac-toe.rs b/chorus_lib/examples/tic-tac-toe.rs index a92da03..79abb03 100644 --- a/chorus_lib/examples/tic-tac-toe.rs +++ b/chorus_lib/examples/tic-tac-toe.rs @@ -1,14 +1,17 @@ /// Choreographic tik-tak-toe game extern crate chorus_lib; +use chorus_lib::transport::http::HttpTransportConfig; use chorus_lib::{ - core::{ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Located, Serialize}, - projector, + core::{ + ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Located, Projector, Serialize, + }, transport::http::HttpTransport, LocationSet, }; + use clap::Parser; -use std::{collections::HashMap, io::Write}; +use std::io::Write; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; #[derive(Serialize, Deserialize, Debug)] @@ -292,28 +295,40 @@ fn main() { match args.player { 'X' => { - let mut config = HashMap::new(); - config.insert(PlayerX::name(), (args.hostname.as_str(), args.port)); - config.insert( - PlayerO::name(), - (args.opponent_hostname.as_str(), args.opponent_port), + let config = HttpTransportConfig::for_target( + PlayerX, + (args.hostname.as_str().to_string(), args.port), + ) + .with( + PlayerO, + ( + args.opponent_hostname.as_str().to_string(), + args.opponent_port, + ), ); - let transport = HttpTransport::new(PlayerX::name(), &config); - let projector = projector!(LocationSet!(PlayerX, PlayerO), PlayerX, transport); + + let transport = HttpTransport::new(config); + let projector = Projector::new(PlayerX, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.local(brain), brain_for_o: projector.remote(PlayerO), }); } 'O' => { - let mut config = HashMap::new(); - config.insert(PlayerO::name(), (args.hostname.as_str(), args.port)); - config.insert( - PlayerX::name(), - (args.opponent_hostname.as_str(), args.opponent_port), + let config = HttpTransportConfig::for_target( + PlayerO, + (args.hostname.as_str().to_string(), args.port), + ) + .with( + PlayerX, + ( + args.opponent_hostname.as_str().to_string(), + args.opponent_port, + ), ); - let transport = HttpTransport::new(PlayerO::name(), &config); - let projector = projector!(LocationSet!(PlayerX, PlayerO), PlayerO, transport); + + let transport = HttpTransport::new(config); + let projector = Projector::new(PlayerO, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.remote(PlayerX), brain_for_o: projector.local(brain), diff --git a/chorus_lib/src/core.rs b/chorus_lib/src/core.rs index d59bf41..f2ba43e 100644 --- a/chorus_lib/src/core.rs +++ b/chorus_lib/src/core.rs @@ -96,8 +96,10 @@ pub trait HList { /// returns fn to_string_list() -> Vec<&'static str>; } + /// end of HList pub struct HNil; + /// An element of HList pub struct HCons(Head, Tail); @@ -125,7 +127,7 @@ where macro_rules! LocationSet { () => { $crate::core::HNil }; ($head:ty $(,)*) => { $crate::core::HCons<$head, $crate::core::HNil> }; - ($head:ty, $($tail:tt)*) => { $crate::core::HCons<$head, LocationSet!($($tail)*)> }; + ($head:ty, $($tail:tt)*) => { $crate::core::HCons<$head, $crate::LocationSet!($($tail)*)> }; } /// Marker @@ -261,7 +263,11 @@ pub trait Choreography { /// Provides methods to send and receive messages. /// /// The trait provides methods to send and receive messages between locations. Implement this trait to define a custom transport. -pub trait Transport { +/// +/// The type parameter `L` is the location set that the transport is operating on. +/// +/// The type paramter `TargetLocation` is the target `ChoreographyLocation`. +pub trait Transport { /// Returns a list of locations. fn locations(&self) -> Vec; /// Sends a message from `from` to `to`. @@ -271,7 +277,7 @@ pub trait Transport { } /// Provides a method to perform end-point projection. -pub struct Projector +pub struct Projector, Index> where L1: Member, { @@ -281,15 +287,7 @@ where index: PhantomData, } -/// Macro to make Projector -#[macro_export] -macro_rules! projector { - ($al_type:ty, $target:expr, $transport:expr) => { - $crate::core::Projector::<$al_type, _, _, _>::new($target, $transport) - }; -} - -impl Projector +impl, Index> Projector where L1: Member, { @@ -338,13 +336,16 @@ where where L: Subset, { - struct EppOp<'a, L: HList, L1: ChoreographyLocation, B: Transport> { + struct EppOp<'a, L: HList, L1: ChoreographyLocation, LS: HList, B: Transport> { target: PhantomData, transport: &'a B, locations: Vec, marker: PhantomData, + projector_location_set: PhantomData, } - impl<'a, L: HList, T: ChoreographyLocation, B: Transport> ChoreoOp for EppOp<'a, L, T, B> { + impl<'a, L: HList, T: ChoreographyLocation, LS: HList, B: Transport> ChoreoOp + for EppOp<'a, L, T, LS, B> + { fn locally( &self, _location: L1, @@ -406,11 +407,12 @@ where where M: HList + Subset, { - let op: EppOp<'a, M, T, B> = EppOp { + let op: EppOp<'a, M, T, LS, B> = EppOp { target: PhantomData::, transport: &self.transport, locations: self.transport.locations(), marker: PhantomData::, + projector_location_set: PhantomData::, }; choreo.run(&op) } @@ -429,6 +431,7 @@ where transport: self.transport, locations: locs_vec.clone(), marker: PhantomData::, + projector_location_set: PhantomData::, }; return choreo.run(&op); } @@ -436,11 +439,12 @@ where R::remote() } } - let op: EppOp<'a, L, L1, B> = EppOp { + let op: EppOp<'a, L, L1, LS, B> = EppOp { target: PhantomData::, transport: &self.transport, locations: self.transport.locations(), marker: PhantomData::, + projector_location_set: PhantomData::, }; choreo.run(&op) } diff --git a/chorus_lib/src/transport.rs b/chorus_lib/src/transport.rs index a7f854b..9f8209a 100644 --- a/chorus_lib/src/transport.rs +++ b/chorus_lib/src/transport.rs @@ -2,3 +2,53 @@ pub mod http; pub mod local; + +use crate::core::{ChoreographyLocation, HCons, HList}; +use crate::LocationSet; +use std::collections::HashMap; +use std::marker::PhantomData; + +/// A generic struct for configuration of `Transport`. +#[derive(Clone)] +pub struct TransportConfig +{ + /// The information about locations + info: HashMap, + /// The information about the target choreography + target_info: (TargetLocation, TargetInfoType), + /// The struct is parametrized by the location set (`L`). + location_set: PhantomData, +} + +impl + TransportConfig +{ + /// A transport for a given target. + pub fn for_target(location: TargetLocation, info: TargetInfoType) -> Self { + Self { + info: HashMap::new(), + target_info: (location, info), + location_set: PhantomData, + } + } +} + +impl + TransportConfig +{ + /// Adds information about a new `ChoreographyLocation`. + pub fn with( + mut self, + _location: NewLocation, + info: InfoType, + ) -> TransportConfig, InfoType, TargetLocation, TargetInfoType> +where { + self.info.insert(NewLocation::name().to_string(), info); + + TransportConfig { + info: self.info, + target_info: self.target_info, + location_set: PhantomData, + } + } +} diff --git a/chorus_lib/src/transport/http.rs b/chorus_lib/src/transport/http.rs index ef98f84..0c20a58 100644 --- a/chorus_lib/src/transport/http.rs +++ b/chorus_lib/src/transport/http.rs @@ -3,6 +3,8 @@ use std::thread; use std::{collections::HashMap, sync::Arc}; +use std::marker::PhantomData; + use retry::{ delay::{jitter, Fixed}, retry, @@ -10,46 +12,51 @@ use retry::{ use tiny_http::Server; use ureq::{Agent, AgentBuilder}; +use crate::transport::TransportConfig; + use crate::{ - core::{Portable, Transport}, + core::{ChoreographyLocation, HList, Member, Portable, Transport}, utils::queue::BlockingQueue, }; +type QueueMap = HashMap>; +/// A type alias for `TransportConfig`s used for building `HttpTransport` +pub type HttpTransportConfig = TransportConfig; + /// The header name for the source location. const HEADER_SRC: &str = "X-CHORUS-SOURCE"; /// The HTTP transport. -pub struct HttpTransport { +pub struct HttpTransport { config: HashMap, agent: Agent, - queue_map: Arc>>, server: Arc, join_handle: Option>, + location_set: PhantomData, + queue_map: Arc, + target_location: PhantomData, } -impl HttpTransport { - /// Creates a new `HttpTransport` instance from the projection target and a configuration. - pub fn new(at: &'static str, config: &HashMap<&str, (&str, u16)>) -> Self { - let config = HashMap::from_iter( - config - .iter() - .map(|(k, (hostname, port))| (k.to_string(), (hostname.to_string(), *port))), - ); - let locs = Vec::from_iter(config.keys().map(|s| s.clone())); - - let queue_map = { +impl HttpTransport { + /// Creates a new `HttpTransport` instance from the configuration. + pub fn new(http_config: HttpTransportConfig) -> Self + where + TLocation: Member, + { + let queue_map: Arc = { let mut m = HashMap::new(); - for loc in &locs { + for loc in L::to_string_list() { m.insert(loc.to_string(), BlockingQueue::new()); } - Arc::new(m) + Arc::new(m.into()) }; - let (hostname, port) = config.get(at).unwrap(); + let (_, (hostname, port)) = &http_config.target_info; let server = Arc::new(Server::http(format!("{}:{}", hostname, port)).unwrap()); let join_handle = Some({ let server = server.clone(); let queue_map = queue_map.clone(); + thread::spawn(move || { for mut request in server.incoming_requests() { let mut body = String::new(); @@ -80,23 +87,27 @@ impl HttpTransport { let agent = AgentBuilder::new().build(); Self { - config, + config: http_config.info, agent, - queue_map, join_handle, server, + location_set: PhantomData, + queue_map, + target_location: PhantomData, } } } -impl Drop for HttpTransport { +impl Drop for HttpTransport { fn drop(&mut self) { self.server.unblock(); self.join_handle.take().map(thread::JoinHandle::join); } } -impl Transport for HttpTransport { +impl Transport + for HttpTransport +{ fn locations(&self) -> Vec { Vec::from_iter(self.config.keys().map(|s| s.clone())) } @@ -133,26 +144,32 @@ mod tests { #[derive(ChoreographyLocation)] struct Bob; + #[derive(ChoreographyLocation)] + struct Carol; + #[test] fn test_http_transport() { let v = 42; - let mut config = HashMap::new(); + let (signal, wait) = mpsc::channel::<()>(); - config.insert(Alice::name(), ("localhost", 9010)); - config.insert(Bob::name(), ("localhost", 9011)); + let mut handles = Vec::new(); { - let config = config.clone(); + let config = HttpTransportConfig::for_target(Alice, ("0.0.0.0".to_string(), 9010)) + .with(Bob, ("localhost".to_string(), 9011)); + handles.push(thread::spawn(move || { wait.recv().unwrap(); // wait for Bob to start - let transport = HttpTransport::new(Alice::name(), &config); + let transport = HttpTransport::new(config); transport.send::(Alice::name(), Bob::name(), &v); })); } { - let config = config.clone(); + let config = HttpTransportConfig::for_target(Bob, ("0.0.0.0".to_string(), 9011)) + .with(Alice, ("localhost".to_string(), 9010)); + handles.push(thread::spawn(move || { - let transport = HttpTransport::new(Bob::name(), &config); + let transport = HttpTransport::new(config); signal.send(()).unwrap(); let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); @@ -166,26 +183,28 @@ mod tests { #[test] fn test_http_transport_retry() { let v = 42; - let mut config = HashMap::new(); let (signal, wait) = mpsc::channel::<()>(); - config.insert(Alice::name(), ("localhost", 9020)); - config.insert(Bob::name(), ("localhost", 9021)); + let mut handles = Vec::new(); { - let config = config.clone(); + let config = HttpTransportConfig::for_target(Alice, ("0.0.0.0".to_string(), 9020)) + .with(Bob, ("localhost".to_string(), 9021)); + handles.push(thread::spawn(move || { signal.send(()).unwrap(); - let transport = HttpTransport::new(Alice::name(), &config); + let transport = HttpTransport::new(config); transport.send::(Alice::name(), Bob::name(), &v); })); } { - let config = config.clone(); + let config = HttpTransportConfig::for_target(Bob, ("0.0.0.0".to_string(), 9021)) + .with(Alice, ("localhost".to_string(), 9020)); + handles.push(thread::spawn(move || { // wait for Alice to start, which forces Alice to retry wait.recv().unwrap(); sleep(Duration::from_millis(100)); - let transport = HttpTransport::new(Bob::name(), &config); + let transport = HttpTransport::new(config); let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); })); diff --git a/chorus_lib/src/transport/local.rs b/chorus_lib/src/transport/local.rs index ac08261..3ba58b3 100644 --- a/chorus_lib/src/transport/local.rs +++ b/chorus_lib/src/transport/local.rs @@ -5,52 +5,107 @@ use std::sync::Arc; use serde_json; -use crate::core::{Portable, Transport}; +use std::marker::PhantomData; + +use crate::LocationSet; + +use crate::core::{ChoreographyLocation, HCons, HList, Portable, Transport}; use crate::utils::queue::BlockingQueue; type QueueMap = HashMap>>; -/// The local transport. -/// -/// This transport uses a blocking queue to allow for communication between threads. Each location must be executed in its thread. -/// -/// Unlike network-based transports, all locations must share the same `LocalTransport` instance. The struct implements `Clone` so that it can be shared across threads. -#[derive(Clone)] -pub struct LocalTransport { - internal_locations: Vec, +/// A Transport channel used between multiple `Transport`s. +pub struct LocalTransportChannel { + /// The location set where the channel is defined on. + location_set: std::marker::PhantomData, queue_map: Arc, } -impl LocalTransport { - /// Creates a new `LocalTransport` instance from a list of locations. - pub fn from(locations: &[&str]) -> Self { +impl Clone for LocalTransportChannel { + fn clone(&self) -> Self { + LocalTransportChannel { + location_set: PhantomData, + queue_map: self.queue_map.clone(), + } + } +} + +impl LocalTransportChannel { + /// Creates a new `LocalTransportChannel` instance + pub fn new() -> Self { + Self { + location_set: PhantomData, + queue_map: HashMap::new().into(), + } + } +} + +impl LocalTransportChannel { + /// Adds a new location to the set of locations in the `LocalTransportChannel`. + pub fn with( + self, + _location: NewLocation, + ) -> LocalTransportChannel> { let mut queue_map: QueueMap = HashMap::new(); - for sender in locations.clone() { + let mut str_list = L::to_string_list(); + str_list.push(NewLocation::name()); + for sender in &str_list { let mut n = HashMap::new(); - for receiver in locations.clone() { + for receiver in &str_list { n.insert(receiver.to_string(), BlockingQueue::new()); } queue_map.insert(sender.to_string(), n); } + + LocalTransportChannel { + location_set: PhantomData, + queue_map: queue_map.into(), + } + } +} + +/// The local transport. +/// +/// This transport uses a blocking queue to allow for communication between threads. Each location must be executed in its thread. +/// +/// Unlike network-based transports, all locations must share the same `LocalTransport` instance. The struct implements `Clone` so that it can be shared across threads. +pub struct LocalTransport { + internal_locations: Vec, + location_set: PhantomData, + local_channel: LocalTransportChannel, + target_location: PhantomData, +} + +impl LocalTransport { + /// Creates a new `LocalTransport` instance from a Target `ChoreographyLocation` and a `LocalTransportChannel`. + pub fn new(_target: TargetLocation, local_channel: LocalTransportChannel) -> Self { + let locations_list = L::to_string_list(); + let mut locations_vec = Vec::new(); - for loc in locations.clone() { + for loc in locations_list.clone() { locations_vec.push(loc.to_string()); } + LocalTransport { - queue_map: Arc::new(queue_map), internal_locations: locations_vec, + location_set: PhantomData, + local_channel, + target_location: PhantomData, } } } -impl Transport for LocalTransport { +impl Transport + for LocalTransport +{ fn locations(&self) -> Vec { return self.internal_locations.clone(); } fn send(&self, from: &str, to: &str, data: &T) -> () { let data = serde_json::to_string(data).unwrap(); - self.queue_map + self.local_channel + .queue_map .get(from) .unwrap() .get(to) @@ -59,7 +114,14 @@ impl Transport for LocalTransport { } fn receive(&self, from: &str, at: &str) -> T { - let data = self.queue_map.get(from).unwrap().get(at).unwrap().pop(); + let data = self + .local_channel + .queue_map + .get(from) + .unwrap() + .get(at) + .unwrap() + .pop(); serde_json::from_str(&data).unwrap() } } @@ -79,16 +141,18 @@ mod tests { #[test] fn test_local_transport() { let v = 42; - let transport = LocalTransport::from(&[Alice::name(), Bob::name()]); + + let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); + let mut handles = Vec::new(); { - let transport = transport.clone(); + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { transport.send::(Alice::name(), Bob::name(), &v); })); } { - let transport = transport.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2);