diff --git a/Cargo.toml b/Cargo.toml index fc39199..deedead 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["chorus_lib", "chorus_derive"] [workspace.package] -version = "0.1.3" +version = "0.2.0" edition = "2021" authors = ["Shun Kashiwa "] homepage = "https://lsd-ucsc.github.io/ChoRus/" diff --git a/chorus_book/src/guide-choreography.md b/chorus_book/src/guide-choreography.md index 1a5db7f..1480948 100644 --- a/chorus_book/src/guide-choreography.md +++ b/chorus_book/src/guide-choreography.md @@ -9,7 +9,8 @@ struct HelloWorldChoreography; // 2. Implement the `Choreography` trait impl Choreography for HelloWorldChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice); + fn run(self, op: &impl ChoreoOp) { // 3. Use the `op` parameter to access operators op.locally(Alice, |_| { println!("Hello, World!"); @@ -20,6 +21,8 @@ impl Choreography for HelloWorldChoreography { `Choreography` must implement the `run` method which defines the behavior of the system. The `run` method takes a reference to an object that implements the `ChoreoOp` trait. The `ChoreoOp` trait provides choreographic operators such as `locally` and `comm`. +Also, each `Choreography` has an associated `LocationSet` type, `L`; this is the `LocationSet` that the `Choreography` can operate on. + ## Choreographic Operators Inside the `run` method, you can use the `op` parameter to access choreographic operators. @@ -33,7 +36,8 @@ The `locally` operator is used to perform a computation at a single location. It # # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { op.locally(Alice, |_| { println!("Hello, World!"); }); @@ -48,7 +52,8 @@ The closure can return a value to create a located value. Located values are val # # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { // This value is only available at Alice let num_at_alice: Located = op.locally(Alice, |_| { 42 @@ -64,7 +69,8 @@ The computation closure takes `Unwrapper`. Using the `Unwrapper`, you can get a # # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { let num_at_alice: Located = op.locally(Alice, |_| { 42 }); @@ -79,12 +85,13 @@ op.locally(Alice, |un| { Note that you can unwrap a located value only at the location where the located value is available. If you try to unwrap a located value at a different location, the program will fail to compile. -```rust,compile_fail +```rust, compile_fail {{#include ./header.txt}} # # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice, Bob); +# fn run(self, op: &impl ChoreoOp) { // This code will fail to compile let num_at_alice = op.locally(Alice, |_| { 42 }); op.locally(Bob, |un| { @@ -106,7 +113,8 @@ The `comm` operator is used to perform a communication between two locations. It # # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice, Bob); +# fn run(self, op: &impl ChoreoOp) { // This value is only available at Alice let num_at_alice: Located = op.locally(Alice, |_| { 42 @@ -131,7 +139,8 @@ The `broadcast` operator is used to perform a broadcast from a single location t # # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { // This value is only available at Alice let num_at_alice: Located = op.locally(Alice, |_| { 42 @@ -144,10 +153,33 @@ let num: i32 = op.broadcast(Alice, num_at_alice); Because all locations receive the value, the return type of the `broadcast` operator is a normal value, not a located value. This means that the value can be used for control flow. -```rust,ignore +```rust, ignore if num == 42 { println!("The number is 42!"); } else { println!("The number is not 42!"); } ``` + +### Note on invalid values for Choreography::L + +You'll get a compile error if you try to work with a `ChoreographyLocation` that is not a member of `L`. + +```rust, compile_fail +# {{#include ./header.txt}} +# // 1. Define a struct +# struct HelloWorldChoreography; + +# // 2. Implement the `Choreography` trait +// ... +impl Choreography for HelloWorldChoreography { + type L = LocationSet!(Alice); + fn run(self, op: &impl ChoreoOp) { + // this will fail + op.locally(Bob, |_| { + println!("Hello, World!"); + }); + } +} +``` + diff --git a/chorus_book/src/guide-colocally.md b/chorus_book/src/guide-colocally.md index e65beb8..d563bc5 100644 --- a/chorus_book/src/guide-colocally.md +++ b/chorus_book/src/guide-colocally.md @@ -20,7 +20,8 @@ This protocol can be implemented as follows: struct DemoChoreography; impl Choreography for DemoChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice, Bob, Carol); + fn run(self, op: &impl ChoreoOp) { let x_at_alice = op.locally(Alice, |_| { get_random_number() }); @@ -51,7 +52,8 @@ struct BobCarolChoreography { x_at_bob: Located, }; impl Choreography for BobCarolChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Bob, Carol); + fn run(self, op: &impl ChoreoOp) { let is_even_at_bob: Located = op.locally(Bob, |un| { let x = un.unwrap(&self.x_at_bob); x % 2 == 0 @@ -68,7 +70,7 @@ impl Choreography for BobCarolChoreography { } ``` -Notice that the `BobCarolChoreography` only describes the behavior of Bob and Carol. Since Alice does not appear in this choreography, we can use the `colocally` operator in the main choreography to execute the sub-choreography only on Bob and Carol. +Notice that `BobCarolChoreography` only describes the behavior of Bob and Carol (see its location set `L`). `colocally` is an operator to execute a choreography only at locations that is included in the location set. In this case, if we invoke `BobCarolChoreography` with `colocally` in the main choreography, it will only be executed at Bob and Carol and not at Alice. ```rust {{#include ./header.txt}} @@ -79,7 +81,8 @@ Notice that the `BobCarolChoreography` only describes the behavior of Bob and Ca # x_at_bob: Located, # }; # impl Choreography for BobCarolChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Bob, Carol); +# fn run(self, op: &impl ChoreoOp) { # let is_even_at_bob: Located = op.locally(Bob, |un| { # let x = un.unwrap(&self.x_at_bob); # x % 2 == 0 @@ -96,12 +99,13 @@ Notice that the `BobCarolChoreography` only describes the behavior of Bob and Ca # } struct MainChoreography; impl Choreography for MainChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice, Bob, Carol); + fn run(self, op: &impl ChoreoOp) { let x_at_alice = op.locally(Alice, |_| { get_random_number() }); let x_at_bob = op.comm(Alice, Bob, &x_at_alice); - op.colocally(&[Bob.name(), Carol.name()], BobCarolChoreography { + op.colocally(BobCarolChoreography { x_at_bob, }); } @@ -131,7 +135,8 @@ struct BobCarolChoreography { }; impl Choreography for BobCarolChoreography { - fn run(self, op: &impl ChoreoOp) -> BobCarolResult { + type L = LocationSet!(Bob, Carol); + fn run(self, op: &impl ChoreoOp) -> BobCarolResult { let is_even_at_bob: Located = op.locally(Bob, |un| { let x = un.unwrap(&self.x_at_bob); x % 2 == 0 @@ -154,7 +159,8 @@ impl Choreography for BobCarolChoreography { struct MainChoreography; impl Choreography for MainChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice, Bob, Carol); + fn run(self, op: &impl ChoreoOp) { let x_at_alice = op.locally(Alice, |_| { get_random_number() }); @@ -162,7 +168,7 @@ impl Choreography for MainChoreography { let BobCarolResult { is_even_at_bob, is_even_at_carol, - } = op.colocally(&[Bob.name(), Carol.name()], BobCarolChoreography { + } = op.colocally(BobCarolChoreography { x_at_bob, }); // can access is_even_at_bob and is_even_at_carol using `locally` on Bob and Carol diff --git a/chorus_book/src/guide-higher-order-choreography.md b/chorus_book/src/guide-higher-order-choreography.md index 0d75d77..d7c1750 100644 --- a/chorus_book/src/guide-higher-order-choreography.md +++ b/chorus_book/src/guide-higher-order-choreography.md @@ -22,8 +22,9 @@ When you implement the `Choreography` trait, you have access to the `sub_choreo` # struct HigherOrderChoreography { # sub_choreo: C, # }; -impl Choreography for HigherOrderChoreography { - fn run(self, op: &impl ChoreoOp) { +impl> Choreography for HigherOrderChoreography { + type L = LocationSet!(Alice, Bob); + fn run(self, op: &impl ChoreoOp) { op.call(self.sub_choreo); } } @@ -45,8 +46,9 @@ struct HigherOrderChoreography> + SubChoreo _marker: PhantomData, }; -impl> + SubChoreography> Choreography for HigherOrderChoreography { - fn run(self, op: &impl ChoreoOp) { +impl, L = LocationSet!(Alice)> + SubChoreography> Choreography for HigherOrderChoreography { + type L = LocationSet!(Alice); + fn run(self, op: &impl ChoreoOp) { let num_at_alice = op.locally(Alice, |_| { 42 }); diff --git a/chorus_book/src/guide-input-and-output.md b/chorus_book/src/guide-input-and-output.md index 24d177d..82a1954 100644 --- a/chorus_book/src/guide-input-and-output.md +++ b/chorus_book/src/guide-input-and-output.md @@ -18,7 +18,8 @@ struct DemoChoreography { } impl Choreography for DemoChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(); + fn run(self, op: &impl ChoreoOp) { println!("Input: {}", self.input); } } @@ -32,7 +33,8 @@ You can construct an instance of the choreography with the input and pass it to # input: String, # } # impl Choreography for DemoChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { # println!("Input: {}", self.input); # } # } @@ -40,7 +42,8 @@ You can construct an instance of the choreography with the input and pass it to let choreo = DemoChoreography { input: "World".to_string(), }; -let projector = Projector::new(Alice, transport); + +let projector = Projector::new(Alice, alice_transport); projector.epp_and_run(choreo); ``` @@ -55,7 +58,8 @@ struct DemoChoreography { } impl Choreography for DemoChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice); + fn run(self, op: &impl ChoreoOp) { op.locally(Alice, |un| { let input = un.unwrap(&self.input); println!("Input at Alice: {}", input); @@ -81,14 +85,15 @@ To run the sample choreography above at Alice, we use the `local` method to cons # } # # impl Choreography for DemoChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { # op.locally(Alice, |un| { # let input = un.unwrap(&self.input); # println!("Input at Alice: {}", input); # }); # } # } -let projector_for_alice = Projector::new(Alice, transport); +let projector_for_alice = Projector::new(Alice, alice_transport); // Because the target of the projector is Alice, the located value is available at Alice. let string_at_alice: Located = projector_for_alice.local("Hello, World!".to_string()); // Instantiate the choreography with the located value @@ -107,14 +112,15 @@ For Bob, we use the `remote` method to construct the located value. # } # # impl Choreography for DemoChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice, Bob); +# fn run(self, op: &impl ChoreoOp) { # op.locally(Alice, |un| { # let input = un.unwrap(&self.input); # println!("Input at Alice: {}", input); # }); # } # } -let projector_for_bob = Projector::new(Bob, transport); +let projector_for_bob = Projector::new(Bob, bob_transport); // Construct a remote located value at Alice. The actual value is not required. let string_at_alice = projector_for_bob.remote(Alice); // Instantiate the choreography with the located value @@ -135,7 +141,8 @@ To do so, we specify the output type to the `Choreography` trait and return the struct DemoChoreography; impl Choreography for DemoChoreography { - fn run(self, op: &impl ChoreoOp) -> String { + type L = LocationSet!(); + fn run(self, op: &impl ChoreoOp) -> String { "Hello, World!".to_string() } } @@ -148,12 +155,13 @@ impl Choreography for DemoChoreography { # struct DemoChoreography; # # impl Choreography for DemoChoreography { -# fn run(self, op: &impl ChoreoOp) -> String { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) -> String { # "Hello, World!".to_string() # } # } let choreo = DemoChoreography; -let projector = Projector::new(Alice, transport); +let projector = Projector::new(Alice, alice_transport); let output = projector.epp_and_run(choreo); assert_eq!(output, "Hello, World!".to_string()); ``` @@ -167,14 +175,15 @@ You can use the `Located` as a return type of the `run` method to return struct DemoChoreography; impl Choreography> for DemoChoreography { - fn run(self, op: &impl ChoreoOp) -> Located { + type L = LocationSet!(Alice); + fn run(self, op: &impl ChoreoOp) -> Located { op.locally(Alice, |_| { "Hello, World!".to_string() }) } } -let projector = Projector::new(Alice, transport); +let projector = Projector::new(Alice, alice_transport); let output = projector.epp_and_run(DemoChoreography); let string_at_alice = projector.unwrap(output); assert_eq!(string_at_alice, "Hello, World!".to_string()); diff --git a/chorus_book/src/guide-location-polymorphism.md b/chorus_book/src/guide-location-polymorphism.md index 057aec7..ef0ef1f 100644 --- a/chorus_book/src/guide-location-polymorphism.md +++ b/chorus_book/src/guide-location-polymorphism.md @@ -11,7 +11,8 @@ struct LocationPolymorphicChoreography { } impl Choreography for LocationPolymorphicChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(L1); + fn run(self, op: &impl ChoreoOp) { op.locally(self.location, |_| { println!("Hello, World!"); }); diff --git a/chorus_book/src/guide-locations.md b/chorus_book/src/guide-locations.md index 8de42e9..f26ffe9 100644 --- a/chorus_book/src/guide-locations.md +++ b/chorus_book/src/guide-locations.md @@ -26,6 +26,23 @@ The `ChoreographyLocation` trait provides the `name` method, which returns the n # #[derive(ChoreographyLocation)] # struct Bob; # -let name = Alice.name(); +let name = Alice::name(); assert_eq!(name, "Alice"); ``` + +## Location Set + +A `LocationSet` is a special type representing a set of `ChoreographyLocation` types. It's used to ensure type safety within the system, and you'll see its application in future sections. To build a `LocationSet` type, you can use the `LocationSet` macro from the `chorus_lib` crate. + +```rust +# extern crate chorus_lib; +# use chorus_lib::core::ChoreographyLocation; +# #[derive(ChoreographyLocation)] +# struct Alice; +# +# #[derive(ChoreographyLocation)] +# struct Bob; +use chorus_lib::core::LocationSet; + +type L = LocationSet!(Alice, Bob); +``` diff --git a/chorus_book/src/guide-projector.md b/chorus_book/src/guide-projector.md index ba78909..23bd0c2 100644 --- a/chorus_book/src/guide-projector.md +++ b/chorus_book/src/guide-projector.md @@ -4,22 +4,22 @@ Projector is responsible for performing the end-point projection and executing t ## Creating a Projector -To create a `Projector`, you need to provide the location and the transport. +To create a `Projector`, you need to provide the target location and the transport. ```rust # extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; -# use chorus_lib::core::{ChoreographyLocation, Projector}; -# let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; +# use chorus_lib::core::{ChoreographyLocation, Projector, LocationSet}; # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] # struct Bob; -# -let projector = Projector::new(Alice, transport); +# let transport_channel = LocalTransportChannelBuilder::new().with(Alice).with(Bob).build(); +# let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); +let projector = Projector::new(Alice, alice_transport); ``` -Notice that the `Projector` is parameterized by the location type. You will need one projector for each location to execute choreography. +Notice that the `Projector` is parameterized by its target location type. You will need one projector for each location to execute choreography. ## Executing a Choreography @@ -27,20 +27,21 @@ To execute a choreography, you need to call the `epp_and_run` method on the `Pro ```rust # extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; -# use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp}; -# let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; +# use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp, LocationSet}; +# let transport_channel = LocalTransportChannelBuilder::new().with(Alice).with(Bob).build(); +# let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] # struct Bob; # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice); +# fn run(self, op: &impl ChoreoOp) { # } # } -# -# let projector = Projector::new(Alice, transport); +# let projector = Projector::new(Alice, alice_transport); projector.epp_and_run(HelloWorldChoreography); ``` diff --git a/chorus_book/src/guide-runner.md b/chorus_book/src/guide-runner.md index f92428c..1f501ff 100644 --- a/chorus_book/src/guide-runner.md +++ b/chorus_book/src/guide-runner.md @@ -8,7 +8,8 @@ To use `Runner`, construct an instance using the `new` constructor, and then cal {{#include ./header.txt}} # struct DemoChoreography; # impl Choreography for DemoChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(); +# fn run(self, op: &impl ChoreoOp) { # } # } let runner = Runner::new(); @@ -26,7 +27,8 @@ struct SumChoreography { y_at_bob: Located, } impl Choreography> for SumChoreography { - fn run(self, op: &impl ChoreoOp) -> Located { + type L = LocationSet!(Alice, Bob, Carol); + fn run(self, op: &impl ChoreoOp) -> Located { let x_at_carol = op.comm(Alice, Carol, &self.x_at_alice); let y_at_carol = op.comm(Bob, Carol, &self.y_at_bob); op.locally(Carol, |un| { diff --git a/chorus_book/src/guide-transport.md b/chorus_book/src/guide-transport.md index e4ec268..363ede8 100644 --- a/chorus_book/src/guide-transport.md +++ b/chorus_book/src/guide-transport.md @@ -10,36 +10,64 @@ ChoRus provides two built-in transports: `local` and `http`. The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. -To use the `local` transport, import the `LocalTransport` struct from the `chorus_lib` crate. +To use the local transport, we first need to create a `LocalTransportChannel`, which works as a channel between threads and allows them to send messages to each other. To do so, we use the `LocalTransportChannelBuilder` struct from the `chorus_lib` crate. ```rust # extern crate chorus_lib; -use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::core::{ChoreographyLocation, LocationSet}; +# #[derive(ChoreographyLocation)] +# struct Alice; +# #[derive(ChoreographyLocation)] +# struct Bob; +use chorus_lib::transport::local::LocalTransportChannelBuilder; + +let transport_channel = LocalTransportChannelBuilder::new() + .with(Alice) + .with(Bob) + .build(); ``` -You can construct a `LocalTransport` instance by passing a slice of locations to the `from` method. +Using the `with` method, we add locations to the channel. When we call `build`, it will create an instance of `LocalTransportChannel`. -Because of the nature of the `Local` transport, you must use the same `LocalTransport` instance for all locations. You can `clone` the `LocalTransport` instance and pass it to the threads. +Then, create a transport by using the `LocalTransport::new` function, which takes a target location (explained in the [Projector section](./guide-projector.md)) and the `LocalTransportChannel`. ```rust # extern crate chorus_lib; -# use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::core::{ChoreographyLocation, LocationSet}; +# #[derive(ChoreographyLocation)] +# struct Alice; +# use chorus_lib::transport::local::LocalTransportChannelBuilder; +# let transport_channel = LocalTransportChannelBuilder::new().with(Alice).build(); +use chorus_lib::transport::local::{LocalTransport}; + +let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); +``` + +Because of the nature of the `Local` transport, you must use the same `LocalTransportChannel` instance for all locations. You can `clone` the `LocalTransportChannel` instance and pass it to each `Projector::new` constructor. + +```rust +# extern crate chorus_lib; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; # use std::thread; -# use chorus_lib::core::{ChoreographyLocation, ChoreoOp, Choreography, Projector}; +# use chorus_lib::core::{ChoreographyLocation, ChoreoOp, Choreography, Projector, LocationSet}; # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] # struct Bob; # struct HelloWorldChoreography; # impl Choreography for HelloWorldChoreography { -# fn run(self, op: &impl ChoreoOp) { +# type L = LocationSet!(Alice, Bob); +# fn run(self, op: &impl ChoreoOp) { # } # } +let transport_channel = LocalTransportChannelBuilder::new() + .with(Alice) + .with(Bob) + .build(); let mut handles: Vec> = Vec::new(); -let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); { - // create a clone for Alice - let transport = transport.clone(); + // create a transport for Alice + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { let p = Projector::new(Alice, transport); p.epp_and_run(HelloWorldChoreography); @@ -47,7 +75,7 @@ let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); } { // create another for Bob - let transport = transport.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { let p = Projector::new(Bob, transport); p.epp_and_run(HelloWorldChoreography); @@ -59,27 +87,65 @@ let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); The `http` transport is used to execute choreographies on different machines. This is useful for executing choreographies in a distributed system. -To use the `http` transport, import the `HttpTransport` struct from the `chorus_lib` crate. +To use the `http` transport, import `HttpTransport` and `HttpTransportConfigBuilder` from the `chorus_lib` crate. ```rust # extern crate chorus_lib; -use chorus_lib::transport::http::HttpTransport; +use chorus_lib::transport::http::{HttpTransport, HttpTransportConfigBuilder}; ``` -The `new` constructor takes the name of the projection target and "configuration" of type `std::collections::HashMap<&'static str, (&'static str, u32)>`. The configuration is a map from location names to the hostname and port of the location. +We need to construct a `HttpTransportConfig` using the `HttpTransportConfigBuilder`. First, we specify the target location and the hostname and port to listen on using the `for_target` method. Then, we specify the other locations and their `(hostname, port)` pairs using the `with` method. ```rust {{#include ./header.txt}} -# use chorus_lib::transport::http::HttpTransport; -# use std::collections::HashMap; -let mut config = HashMap::new(); -config.insert(Alice.name(), ("localhost", 8080)); -config.insert(Bob.name(), ("localhost", 8081)); -let transport = HttpTransport::new(Alice.name(), &config); +# use chorus_lib::transport::http::{HttpTransport, HttpTransportConfigBuilder}; +// `Alice` listens on port 8080 on localhost +let config = HttpTransportConfigBuilder::for_target(Alice, ("localhost", 8080)) + // Connect to `Bob` on port 8081 on localhost + .with(Bob, ("localhost", 8081)) + .build(); +let transport = HttpTransport::new(config); ``` In the above example, the transport will start the HTTP server on port 8080 on localhost. If Alice needs to send a message to Bob, it will use `http://localhost:8081` as the destination. ## Creating a Custom Transport -You can also create your own transport by implementing the `Transport` trait. See the API documentation for more details. +You can also create your own transport by implementing the `Transport` trait. It might be helpful to first build a `TransportConfig` to have the the information that you need for each `ChoreographyLocation`, and then have a constructor that takes the `TransportConfig` and builds the `Transport` based on it. While the syntax is similar to `HttpTransportConfig`, which is `HttpTransportConfigBuilder::for_target(target_location, target_information)`, chained with information about other locations using the `.with(other_location, other_location_information)`, the type of information for each `ChoreographyLocation` might diverge from the `(host_name, port)` format presented in `HttpTransport`. In some cases, the `target_information` could even have a different type than the following `other_location_information` types. But all the `other_location_information`s should have the same type. + +```rust +{{#include ./header.txt}} +# use chorus_lib::transport::TransportConfigBuilder; +let config = TransportConfigBuilder::for_target(Alice, ()) + .with(Bob, ("localhost", 8081)) + .with(Carol, ("localhost", 8082)) + .build(); +``` + +See the API documentation for more details. + +### Note on the location set of the Choreography + +Note that when calling `epp_and_run` on a `Projector`, you will get a compile error if the location set of the `Choreography` is not a subset of the location set of the `Transport`. In other words, the `Transport` should have information about every `ChoreographyLocation` that `Choreography` can talk about. So this will fail: + +```rust, compile_fail +# extern crate chorus_lib; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; +# use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp, LocationSet}; + +# #[derive(ChoreographyLocation)] +# struct Alice; +# #[derive(ChoreographyLocation)] +# struct Bob; +struct HelloWorldChoreography; +impl Choreography for HelloWorldChoreography { + type L = LocationSet!(Alice, Bob); + fn run(self, op: &impl ChoreoOp) { + } +} + +let transport_channel = LocalTransportChannelBuilder::new().with(Alice).build(); +let transport = LocalTransport::new(Alice, transport_channel.clone()); +let projector = Projector::new(Alice, transport); +projector.epp_and_run(HelloWorldChoreography); +``` diff --git a/chorus_book/src/header.txt b/chorus_book/src/header.txt index c5ecb20..f24bfe6 100644 --- a/chorus_book/src/header.txt +++ b/chorus_book/src/header.txt @@ -1,10 +1,13 @@ # extern crate chorus_lib; -# use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector, Located, Superposition, Runner}; -# use chorus_lib::transport::local::LocalTransport; +# use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector, Located, Superposition, Runner, LocationSet}; +# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; # #[derive(ChoreographyLocation)] # struct Alice; # #[derive(ChoreographyLocation)] # struct Bob; # #[derive(ChoreographyLocation)] # struct Carol; -# let transport = LocalTransport::from(&[Alice.name(), Bob.name(), Carol.name()]); \ No newline at end of file +# let transport_channel = LocalTransportChannelBuilder::new().with(Alice).with(Bob).with(Carol).build(); +# let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); +# let bob_transport = LocalTransport::new(Bob, transport_channel.clone()); +# let carol_transport = LocalTransport::new(Carol, transport_channel.clone()); \ No newline at end of file diff --git a/chorus_derive/src/lib.rs b/chorus_derive/src/lib.rs index 3f7278d..371427e 100644 --- a/chorus_derive/src/lib.rs +++ b/chorus_derive/src/lib.rs @@ -7,7 +7,7 @@ pub fn derive_choreography_location(input: TokenStream) -> TokenStream { let DeriveInput { ident, .. } = parse_macro_input!(input); let output = quote! { impl ChoreographyLocation for #ident { - fn name(&self) -> &'static str { + fn name() -> &'static str { stringify!(#ident) } } diff --git a/chorus_lib/Cargo.toml b/chorus_lib/Cargo.toml index 4634724..560c75c 100644 --- a/chorus_lib/Cargo.toml +++ b/chorus_lib/Cargo.toml @@ -13,7 +13,7 @@ keywords = ["choreography"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -chorus_derive = { version = "0.1.0", path = "../chorus_derive" } +chorus_derive = { version = "0.2.0", path = "../chorus_derive" } retry = "2.0.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.104" diff --git a/chorus_lib/examples/bookseller.rs b/chorus_lib/examples/bookseller.rs index 4db81fa..2237be3 100644 --- a/chorus_lib/examples/bookseller.rs +++ b/chorus_lib/examples/bookseller.rs @@ -3,9 +3,10 @@ extern crate chorus_lib; use std::io; use std::thread; +use chorus_lib::transport::local::LocalTransportChannelBuilder; use chrono::NaiveDate; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector}; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, LocationSet, Projector}; use chorus_lib::transport::local::LocalTransport; fn get_book(title: &str) -> Option<(i32, NaiveDate)> { @@ -26,7 +27,8 @@ struct Buyer; struct BooksellerChoreography; impl Choreography for BooksellerChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Seller, Buyer); + fn run(self, op: &impl ChoreoOp) { let title_at_buyer = op.locally(Buyer, |_| { println!("Enter the title of the book to buy (TAPL or HoTT)"); let mut title = String::new(); @@ -71,9 +73,15 @@ impl Choreography for BooksellerChoreography { } fn main() { - let transport = LocalTransport::from(&[Seller.name(), Buyer.name()]); - let seller_projector = Projector::new(Seller, transport.clone()); - let buyer_projector = Projector::new(Buyer, transport.clone()); + let transport_channel = LocalTransportChannelBuilder::new() + .with(Seller) + .with(Buyer) + .build(); + let transport_seller = LocalTransport::new(Seller, transport_channel.clone()); + let transport_buyer = LocalTransport::new(Buyer, transport_channel.clone()); + + let seller_projector = Projector::new(Seller, transport_seller); + let buyer_projector = Projector::new(Buyer, transport_buyer); let mut handles: Vec> = Vec::new(); handles.push(thread::spawn(move || { diff --git a/chorus_lib/examples/bookseller2.rs b/chorus_lib/examples/bookseller2.rs index 9b73086..b62c0da 100644 --- a/chorus_lib/examples/bookseller2.rs +++ b/chorus_lib/examples/bookseller2.rs @@ -1,11 +1,12 @@ extern crate chorus_lib; +use std::collections::HashMap; +use std::sync::Arc; use std::thread; -use std::{collections::HashMap, sync::Arc}; use chorus_lib::{ - core::{ChoreoOp, Choreography, ChoreographyLocation, Located, Projector}, - transport::local::LocalTransport, + core::{ChoreoOp, Choreography, ChoreographyLocation, Located, LocationSet, Projector}, + transport::local::{LocalTransport, LocalTransportChannelBuilder}, }; use chrono::NaiveDate; @@ -35,7 +36,8 @@ impl Decider for OneBuyerDecider { } impl Choreography> for OneBuyerDecider { - fn run(self, op: &impl ChoreoOp) -> Located { + type L = LocationSet!(Buyer1, Buyer2); + fn run(self, op: &impl ChoreoOp) -> Located { let price = op.broadcast(Buyer1, self.price); return op.locally(Buyer1, |_| { const BUYER1_BUDGET: i32 = 100; @@ -55,7 +57,8 @@ impl Decider for TwoBuyerDecider { } impl Choreography> for TwoBuyerDecider { - fn run(self, op: &impl ChoreoOp) -> Located { + type L = LocationSet!(Buyer1, Buyer2); + fn run(self, op: &impl ChoreoOp) -> Located { let remaining = op.locally(Buyer1, |un| { const BUYER1_BUDGET: i32 = 100; return un.unwrap(&self.price) - BUYER1_BUDGET; @@ -76,10 +79,11 @@ struct BooksellerChoreography>> { title: Located, } -impl> + Decider> +impl, L = LocationSet!(Buyer1, Buyer2)> + Decider> Choreography, Buyer1>> for BooksellerChoreography { - fn run(self, op: &impl ChoreoOp) -> Located, Buyer1> { + type L = LocationSet!(Buyer1, Buyer2, Seller); + fn run(self, op: &impl ChoreoOp) -> Located, Buyer1> { let title_at_seller = op.comm(Buyer1, Seller, &self.title); let price_at_seller = op.locally(Seller, |un| { let inventory = un.unwrap(&self.inventory); @@ -90,8 +94,7 @@ impl> + Decider> return i32::MAX; }); let price_at_buyer1 = op.comm(Seller, Buyer1, &price_at_seller); - let decision_at_buyer1 = - op.colocally(&[Buyer1.name(), Buyer2.name()], D::new(price_at_buyer1)); + let decision_at_buyer1 = op.colocally(D::new(price_at_buyer1)); struct GetDeliveryDateChoreography { inventory: Located, @@ -99,7 +102,8 @@ impl> + Decider> decision_at_buyer1: Located, } impl Choreography, Buyer1>> for GetDeliveryDateChoreography { - fn run(self, op: &impl ChoreoOp) -> Located, Buyer1> { + type L = LocationSet!(Buyer1, Seller); + fn run(self, op: &impl ChoreoOp) -> Located, Buyer1> { let decision = op.broadcast(Buyer1, self.decision_at_buyer1); if decision { let delivery_date_at_seller = op.locally(Seller, |un| { @@ -116,14 +120,11 @@ impl> + Decider> } } - return op.colocally( - &[Seller.name(), Buyer1.name()], - GetDeliveryDateChoreography { - inventory: self.inventory.clone(), - title_at_seller: title_at_seller.clone(), - decision_at_buyer1, - }, - ); + return op.colocally(GetDeliveryDateChoreography { + inventory: self.inventory.clone(), + title_at_seller: title_at_seller.clone(), + decision_at_buyer1, + }); } } @@ -141,10 +142,24 @@ fn main() { i }; - let transport = LocalTransport::from(&[Seller.name(), Buyer1.name(), Buyer2.name()]); - let seller_projector = Arc::new(Projector::new(Seller, transport.clone())); - let buyer1_projector = Arc::new(Projector::new(Buyer1, transport.clone())); - let buyer2_projector = Arc::new(Projector::new(Buyer2, transport.clone())); + let transport_channel = LocalTransportChannelBuilder::new() + .with(Seller) + .with(Buyer1) + .with(Buyer2) + .build(); + + let seller_projector = Arc::new(Projector::new( + Seller, + LocalTransport::new(Seller, transport_channel.clone()), + )); + let buyer1_projector = Arc::new(Projector::new( + Buyer1, + LocalTransport::new(Buyer1, transport_channel.clone()), + )); + let buyer2_projector = Arc::new(Projector::new( + Buyer2, + LocalTransport::new(Buyer2, transport_channel.clone()), + )); println!("Tries to buy HoTT with one buyer"); type OneBuyerBooksellerChoreography = BooksellerChoreography; diff --git a/chorus_lib/examples/hello.rs b/chorus_lib/examples/hello.rs index 70b65dc..f861ed2 100644 --- a/chorus_lib/examples/hello.rs +++ b/chorus_lib/examples/hello.rs @@ -2,8 +2,8 @@ extern crate chorus_lib; use std::thread; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Projector}; -use chorus_lib::transport::local::LocalTransport; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, LocationSet, Projector}; +use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; // --- Define two locations (Alice and Bob) --- @@ -18,7 +18,11 @@ struct HelloWorldChoreography; // Implement the `Choreography` trait for `HelloWorldChoreography` impl Choreography for HelloWorldChoreography { - fn run(self, op: &impl ChoreoOp) { + // Define the set of locations involved in the choreography. + // In this case, the set consists of `Alice` and `Bob` and + // the choreography can use theses locations. + type L = LocationSet!(Alice, Bob); + fn run(self, op: &impl ChoreoOp) { // Create a located value at Alice let msg_at_alice = op.locally(Alice, |_| { println!("Hello from Alice!"); @@ -37,18 +41,21 @@ impl Choreography for HelloWorldChoreography { fn main() { let mut handles: Vec> = Vec::new(); - // Create a local transport - let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); + // Create a transport channel + let transport_channel = LocalTransportChannelBuilder::new() + .with(Alice) + .with(Bob) + .build(); // Run the choreography in two threads { - let transport = transport.clone(); + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { let p = Projector::new(Alice, transport); p.epp_and_run(HelloWorldChoreography); })); } { - let transport = transport.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { let p = Projector::new(Bob, transport); p.epp_and_run(HelloWorldChoreography); diff --git a/chorus_lib/examples/input-output.rs b/chorus_lib/examples/input-output.rs index 47c875d..7664a65 100644 --- a/chorus_lib/examples/input-output.rs +++ b/chorus_lib/examples/input-output.rs @@ -1,5 +1,5 @@ extern crate chorus_lib; -use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Located}; +use chorus_lib::core::{ChoreoOp, Choreography, ChoreographyLocation, Located, LocationSet}; #[derive(ChoreographyLocation)] struct Alice; #[derive(ChoreographyLocation)] @@ -12,7 +12,8 @@ struct DemoChoreography { } impl Choreography for DemoChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice); + fn run(self, op: &impl ChoreoOp) { op.locally(Alice, |un| { let s = un.unwrap(&self.input); println!("Alice received: {}", s); diff --git a/chorus_lib/examples/loc-poly.rs b/chorus_lib/examples/loc-poly.rs new file mode 100644 index 0000000..b12b3af --- /dev/null +++ b/chorus_lib/examples/loc-poly.rs @@ -0,0 +1,84 @@ +extern crate chorus_lib; +use std::fmt::Debug; +use std::thread; + +use chorus_lib::core::{ + ChoreoOp, Choreography, ChoreographyLocation, Located, LocationSet, Portable, Projector, +}; +use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; + +#[derive(ChoreographyLocation)] +struct Alice; +#[derive(ChoreographyLocation)] +struct Bob; +#[derive(ChoreographyLocation)] +struct Carol; + +struct CommAndPrint { + sender: L1, + receiver: L2, + data: Located, +} + +impl Choreography> for CommAndPrint +where + V: Portable + Debug, + L1: ChoreographyLocation, + L2: ChoreographyLocation, +{ + type L = LocationSet!(L1, L2); + fn run(self, op: &impl ChoreoOp) -> Located { + let v = op.comm(self.sender, self.receiver, &self.data); + op.locally(self.receiver, |un| println!("{:?}", un.unwrap(&v))); + v + } +} + +struct MainChoreography; + +impl Choreography> for MainChoreography { + type L = LocationSet!(Alice, Bob); + + fn run(self, op: &impl ChoreoOp) -> Located { + let v1 = op.locally(Alice, |_| 100); + let v2 = op.call(CommAndPrint { + sender: Alice, + receiver: Bob, + data: v1, + }); + let v2 = op.locally(Bob, |un| un.unwrap(&v2) + 10); + return op.colocally(CommAndPrint { + sender: Bob, + receiver: Alice, + data: v2, + }); + } +} + +fn main() { + let transport_channel = LocalTransportChannelBuilder::new() + .with(Alice) + .with(Bob) + .with(Carol) + .build(); + + let mut handles = vec![]; + { + let transport = LocalTransport::new(Alice, transport_channel.clone()); + handles.push(thread::spawn(|| { + let p = Projector::new(Alice, transport); + let v = p.epp_and_run(MainChoreography); + assert_eq!(p.unwrap(v), 110); + })); + } + { + let transport = LocalTransport::new(Bob, transport_channel.clone()); + handles.push(thread::spawn(|| { + let p = Projector::new(Bob, transport); + p.epp_and_run(MainChoreography); + })); + } + for h in handles { + h.join().unwrap(); + } +} diff --git a/chorus_lib/examples/runner.rs b/chorus_lib/examples/runner.rs index 0727c44..e6d39ec 100644 --- a/chorus_lib/examples/runner.rs +++ b/chorus_lib/examples/runner.rs @@ -1,6 +1,6 @@ extern crate chorus_lib; use chorus_lib::core::{ - ChoreoOp, Choreography, ChoreographyLocation, Located, Runner, Superposition, + ChoreoOp, Choreography, ChoreographyLocation, Located, LocationSet, Runner, Superposition, }; #[derive(ChoreographyLocation)] @@ -25,7 +25,8 @@ struct BobCarolChoreography { } impl Choreography for BobCarolChoreography { - fn run(self, op: &impl ChoreoOp) -> BobCarolResult { + type L = LocationSet!(Bob, Carol); + fn run(self, op: &impl ChoreoOp) -> BobCarolResult { let is_even_at_bob: Located = op.locally(Bob, |un| { let x = un.unwrap(&self.x_at_bob); x % 2 == 0 @@ -48,16 +49,14 @@ impl Choreography for BobCarolChoreography { struct MainChoreography; impl Choreography for MainChoreography { - fn run(self, op: &impl ChoreoOp) { + type L = LocationSet!(Alice, Bob, Carol); + fn run(self, op: &impl ChoreoOp) { let x_at_alice = op.locally(Alice, |_| get_random_number()); let x_at_bob = op.comm(Alice, Bob, &x_at_alice); let BobCarolResult { is_even_at_bob, is_even_at_carol, - } = op.colocally( - &[Bob.name(), Carol.name()], - BobCarolChoreography { x_at_bob }, - ); + } = op.colocally(BobCarolChoreography { x_at_bob }); op.locally(Bob, |un| { let is_even = un.unwrap(&is_even_at_bob); assert!(is_even); diff --git a/chorus_lib/examples/tic-tac-toe.rs b/chorus_lib/examples/tic-tac-toe.rs index eab462a..04f8655 100644 --- a/chorus_lib/examples/tic-tac-toe.rs +++ b/chorus_lib/examples/tic-tac-toe.rs @@ -2,11 +2,15 @@ extern crate chorus_lib; use chorus_lib::{ - core::{Choreography, ChoreographyLocation, Deserialize, Located, Projector, Serialize}, - transport::http::HttpTransport, + core::{ + ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Located, LocationSet, Projector, + Serialize, + }, + transport::http::{HttpTransport, HttpTransportConfigBuilder}, }; + use clap::Parser; -use std::{collections::HashMap, io::Write}; +use std::io::Write; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; #[derive(Serialize, Deserialize, Debug)] @@ -225,7 +229,8 @@ struct TicTacToeChoreography { } impl Choreography for TicTacToeChoreography { - fn run(self, op: &impl chorus_lib::core::ChoreoOp) -> () { + type L = LocationSet!(PlayerX, PlayerO); + fn run(self, op: &impl ChoreoOp) -> () { let mut board = Board::new(); loop { let board_x = op.locally(PlayerX, |un| { @@ -286,15 +291,20 @@ fn main() { } else { Box::new(UserBrain::new(args.player)) }; + match args.player { 'X' => { - let mut config = HashMap::new(); - config.insert(PlayerX.name(), (args.hostname.as_str(), args.port)); - config.insert( - PlayerO.name(), + let config = HttpTransportConfigBuilder::for_target( + PlayerX, + (args.hostname.as_str(), args.port), + ) + .with( + PlayerO, (args.opponent_hostname.as_str(), args.opponent_port), - ); - let transport = HttpTransport::new(PlayerX.name(), &config); + ) + .build(); + + let transport = HttpTransport::new(config); let projector = Projector::new(PlayerX, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.local(brain), @@ -302,13 +312,17 @@ fn main() { }); } 'O' => { - let mut config = HashMap::new(); - config.insert(PlayerO.name(), (args.hostname.as_str(), args.port)); - config.insert( - PlayerX.name(), + let config = HttpTransportConfigBuilder::for_target( + PlayerO, + (args.hostname.as_str(), args.port), + ) + .with( + PlayerX, (args.opponent_hostname.as_str(), args.opponent_port), - ); - let transport = HttpTransport::new(PlayerO.name(), &config); + ) + .build(); + + let transport = HttpTransport::new(config); let projector = Projector::new(PlayerO, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.remote(PlayerX), diff --git a/chorus_lib/src/core.rs b/chorus_lib/src/core.rs index 5a37956..ed2d853 100644 --- a/chorus_lib/src/core.rs +++ b/chorus_lib/src/core.rs @@ -6,15 +6,28 @@ use std::marker::PhantomData; use serde::de::DeserializeOwned; // re-export so that users can use derive macros without importing serde +#[doc(no_inline)] pub use serde::{Deserialize, Serialize}; -/// Represents a location. It can be derived using `#[derive(ChoreographyLocation)]`. +/// Represents a location. +/// +/// It can be derived using `#[derive(ChoreographyLocation)]`. +/// +/// ``` +/// # use chorus_lib::core::ChoreographyLocation; +/// # +/// #[derive(ChoreographyLocation)] +/// struct Alice; +/// ``` pub trait ChoreographyLocation: Copy { /// Returns the name of the location as a string. - fn name(&self) -> &'static str; + fn name() -> &'static str; } -/// Represents a value that can be used in a choreography. ChoRus uses [serde](https://serde.rs/) to serialize and deserialize values. +/// Represents a value that can be used in a choreography. +/// +/// ChoRus uses [serde](https://serde.rs/) to serialize and deserialize values. +/// /// It can be derived using `#[derive(Serialize, Deserialize)]` as long as all the fields satisfy the `Portable` trait. pub trait Portable: Serialize + DeserializeOwned {} impl Portable for T {} @@ -89,6 +102,122 @@ where } } +// --- HList and Helpers --- + +/// heterogeneous list +#[doc(hidden)] +pub trait LocationSet { + /// returns + fn to_string_list() -> Vec<&'static str>; +} + +/// end of HList +#[doc(hidden)] +pub struct HNil; + +/// An element of HList +#[doc(hidden)] +pub struct HCons(Head, Tail); + +impl LocationSet for HNil { + fn to_string_list() -> Vec<&'static str> { + Vec::new() + } +} +impl LocationSet for HCons +where + Head: ChoreographyLocation, + Tail: LocationSet, +{ + fn to_string_list() -> Vec<&'static str> { + let mut v = Tail::to_string_list(); + v.push(Head::name()); + v + } +} + +// To export `LocationSet` under the `core` module, we define an internal macro and export it. +// This is because Rust does not allow us to export a macro from a module without re-exporting it. +// `__ChoRus_Internal_LocationSet` is the internal macro and it is configured not to be visible in the documentation. + +/// Macro to define a set of locations that a choreography is defined on. +/// +/// ``` +/// # use chorus_lib::core::{ChoreographyLocation, LocationSet}; +/// # +/// # #[derive(ChoreographyLocation)] +/// # struct Alice; +/// # #[derive(ChoreographyLocation)] +/// # struct Bob; +/// # #[derive(ChoreographyLocation)] +/// # struct Carol; +/// # +/// type L = LocationSet!(Alice, Bob, Carol); +/// ``` +#[doc(hidden)] +#[macro_export] +macro_rules! __ChoRus_Internal_LocationSet { + () => { $crate::core::HNil }; + ($head:ty $(,)*) => { $crate::core::HCons<$head, $crate::core::HNil> }; + ($head:ty, $($tail:tt)*) => { $crate::core::HCons<$head, $crate::core::LocationSet!($($tail)*)> }; +} + +#[doc(inline)] +pub use __ChoRus_Internal_LocationSet as LocationSet; + +/// Marker +#[doc(hidden)] +pub struct Here; +/// Marker +#[doc(hidden)] +pub struct There(Index); + +/// Check if a location is a member of a location set +/// +/// The trait is used to check if a location is a member of a location set. +/// +/// It takes two type parameters `L` and `Index`. `L` is a location set and `Index` is some type that is inferred by the compiler. +/// If a location `L1` is in `L`, then there exists a type `Index` such that `L1` implements `Member`. +pub trait Member { + /// A location set that is the remainder of `L` after removing the member. + type Remainder: LocationSet; +} + +impl Member, Here> for Head +where + Tail: LocationSet, +{ + type Remainder = Tail; +} +impl Member>, There> + for X +where + Head: ChoreographyLocation, + X: Member, TailIndex>, +{ + type Remainder = HCons; +} + +/// Check if a location set is a subset of another location set +/// +/// The trait is used to check if a location set is a subset of another location set. +/// +/// It takes two type parameters `L` and `Index`. `L` is a location set and `Index` is some type that is inferred by the compiler. +/// If a location set `M` is a subset of `L`, then there exists a type `Index` such that `M` implements `Subset`. +pub trait Subset {} + +// Base case: HNil is a subset of any set +impl Subset for HNil {} + +// Recursive case +impl Subset> + for HCons +where + Head: Member, + Tail: Subset, +{ +} + /// Provides a method to work with located values at the current location pub struct Unwrapper { phantom: PhantomData, @@ -105,7 +234,7 @@ impl Unwrapper { /// /// The trait provides methods to work with located values. An implementation of the trait is "injected" into /// a choreography at runtime and provides the actual implementation of the operators. -pub trait ChoreoOp { +pub trait ChoreoOp { /// Performs a computation at the specified location. /// /// `locally` performs a computation at a location, which are specified by `location` and `computation`, respectively. @@ -114,37 +243,51 @@ pub trait ChoreoOp { /// - `computation` is a function that takes an `Unwrapper`. Using the `Unwrapper`, the function can access located values at the location. /// /// The `computation` can return a value of type `V` and the value will be stored in a `Located` struct at the choreography level. - fn locally( + fn locally( &self, location: L1, computation: impl Fn(Unwrapper) -> V, - ) -> Located; + ) -> Located + where + L1: Member; /// Performs a communication between two locations. /// /// `comm` sends `data` from `sender` to `receiver`. The `data` must be a `Located` struct at the `sender` location /// and the value type must implement `Portable`. - fn comm( + fn comm( &self, sender: L1, receiver: L2, data: &Located, - ) -> Located; + ) -> Located + where + L1: Member, + L2: Member; /// Performs a broadcast from a location to all other locations. /// /// `broadcast` broadcasts `data` from `sender` to all other locations. The `data` must be a `Located` struct at the `sender` location. /// The method returns the non-located value. - fn broadcast( + fn broadcast( &self, sender: L1, data: Located, - ) -> V; + ) -> V + where + L1: Member; /// Calls a choreography. - fn call>(&self, choreo: C) -> R; + fn call>(&self, choreo: C) -> R + where + M: LocationSet + Subset; /// Calls a choreography on a subset of locations. - fn colocally>(&self, locations: &[&str], choreo: C) -> R; + fn colocally, Index>( + &self, + choreo: C, + ) -> R + where + S: Subset; } /// Represents a choreography. @@ -155,20 +298,26 @@ pub trait ChoreoOp { /// /// The trait provides a method `run` that takes an implementation of `ChoreoOp` and returns a value of type `R`. pub trait Choreography { + /// Locations + type L: LocationSet; /// A method that executes a choreography. /// /// The method takes an implementation of `ChoreoOp`. Inside the method, you can use the operators provided by `ChoreoOp` to define a choreography. /// /// The method returns a value of type `R`, which is the return type of the choreography. - fn run(self, op: &impl ChoreoOp) -> R; + fn run(self, op: &impl ChoreoOp) -> R; } /// Provides methods to send and receive messages. /// /// The trait provides methods to send and receive messages between locations. Implement this trait to define a custom transport. -pub trait Transport { +/// +/// The type parameter `L` is the location set that the transport is operating on. +/// +/// The type parameter `TargetLocation` is the target `ChoreographyLocation`. +pub trait Transport { /// Returns a list of locations. - fn locations(&self) -> Vec; + fn locations(&self) -> Vec<&'static str>; /// Sends a message from `from` to `to`. fn send(&self, from: &str, to: &str, data: &V) -> (); /// Receives a message from `from` to `at`. @@ -176,18 +325,33 @@ pub trait Transport { } /// Provides a method to perform end-point projection. -pub struct Projector { - target: L1, +pub struct Projector, Index> +where + L1: Member, +{ + target: PhantomData, transport: T, + location_set: PhantomData, + index: PhantomData, } -impl Projector { +impl, Index> + Projector +where + L1: Member, +{ /// Constructs a `Projector` struct. /// /// - `target` is the projection target of the choreography. /// - `transport` is an implementation of `Transport`. pub fn new(target: L1, transport: B) -> Self { - Projector { target, transport } + _ = target; + Projector { + target: PhantomData, + transport, + location_set: PhantomData, + index: PhantomData, + } } /// Constructs a `Located` struct located at the projection target using the actual value. @@ -200,13 +364,11 @@ impl Projector { /// Constructs a `Located` struct *NOT* located at the projection target. /// /// Use this method to run a choreography that takes a located value as an input. - /// - /// Note that the method panics at runtime if the projection target and the location of the value are the same. - pub fn remote(&self, l2: L2) -> Located { - // NOTE(shumbo): Ideally, this check should be done at the type level. - if self.target.name() == l2.name() { - panic!("Cannot create a remote value at the same location"); - } + pub fn remote(&self, at: L2) -> Located + where + L2: Member<>::Remainder, Index2>, + { + _ = at; Located::remote() } @@ -218,19 +380,35 @@ impl Projector { } /// Performs end-point projection and runs a choreography. - pub fn epp_and_run<'a, V, C: Choreography>(&'a self, choreo: C) -> V { - struct EppOp<'a, B: Transport> { - target: String, + pub fn epp_and_run<'a, V, L: LocationSet, C: Choreography, IndexSet>( + &'a self, + choreo: C, + ) -> V + where + L: Subset, + { + struct EppOp< + 'a, + L: LocationSet, + L1: ChoreographyLocation, + LS: LocationSet, + B: Transport, + > { + target: PhantomData, transport: &'a B, - locations: Vec, + locations: Vec<&'static str>, + marker: PhantomData, + projector_location_set: PhantomData, } - impl<'a, B: Transport> ChoreoOp for EppOp<'a, B> { - fn locally( + impl<'a, L: LocationSet, T: ChoreographyLocation, LS: LocationSet, B: Transport> + ChoreoOp for EppOp<'a, L, T, LS, B> + { + fn locally( &self, - location: L1, + _location: L1, computation: impl Fn(Unwrapper) -> V, ) -> Located { - if location.name() == self.target { + if L1::name() == T::name() { let unwrapper = Unwrapper { phantom: PhantomData, }; @@ -241,83 +419,104 @@ impl Projector { } } - fn comm( + fn comm< + L1: ChoreographyLocation, + L2: ChoreographyLocation, + V: Portable, + Index1, + Index2, + >( &self, - sender: L1, - receiver: L2, + _sender: L1, + _receiver: L2, data: &Located, ) -> Located { - if sender.name() == self.target { - self.transport.send( - sender.name(), - receiver.name(), - data.value.as_ref().unwrap(), - ); + if L1::name() == T::name() { + self.transport + .send(L1::name(), L2::name(), data.value.as_ref().unwrap()); Located::remote() - } else if receiver.name() == self.target { - let value = self.transport.receive(sender.name(), receiver.name()); + } else if L2::name() == T::name() { + let value = self.transport.receive(L1::name(), L2::name()); Located::local(value) } else { Located::remote() } } - fn broadcast( + fn broadcast( &self, - sender: L1, + _sender: L1, data: Located, ) -> V { - if sender.name() == self.target { + if L1::name() == T::name() { for dest in &self.locations { - if self.target != *dest { - self.transport.send(&self.target, &dest, &data.value); + if T::name() != *dest { + self.transport.send(&T::name(), &dest, &data.value); } } return data.value.unwrap(); } else { - self.transport.receive(sender.name(), &self.target) + self.transport.receive(L1::name(), &T::name()) } } - fn call>(&self, choreo: C) -> T { - choreo.run(self) + fn call>(&self, choreo: C) -> R + where + M: LocationSet + Subset, + { + let op: EppOp<'a, M, T, LS, B> = EppOp { + target: PhantomData::, + transport: &self.transport, + locations: self.transport.locations(), + marker: PhantomData::, + projector_location_set: PhantomData::, + }; + choreo.run(&op) } - fn colocally>( + fn colocally, Index>( &self, - locs: &[&str], choreo: C, - ) -> T { - let locs_vec = Vec::from_iter(locs.into_iter().map(|s| s.to_string())); + ) -> R { + let locs_vec = S::to_string_list(); + for location in &locs_vec { - let op = EppOp { - target: location.clone(), - transport: self.transport, - locations: locs_vec.clone(), - }; - if *location == self.target.to_string() { + if *location == T::name().to_string() { + let op = EppOp { + target: PhantomData::, + transport: self.transport, + locations: locs_vec, + marker: PhantomData::, + projector_location_set: PhantomData::, + }; return choreo.run(&op); } } - T::remote() + R::remote() } } - let op: EppOp<'a, B> = EppOp { - target: self.target.name().to_string(), + let op: EppOp<'a, L, L1, LS, B> = EppOp { + target: PhantomData::, transport: &self.transport, locations: self.transport.locations(), + marker: PhantomData::, + projector_location_set: PhantomData::, }; choreo.run(&op) } } /// Provides a method to run a choreography without end-point projection. -pub struct Runner; +pub struct Runner { + marker: PhantomData, +} -impl Runner { +impl Runner { /// Constructs a runner. pub fn new() -> Self { - Runner + Runner { + marker: PhantomData::, + } } /// Constructs a located value. @@ -335,10 +534,10 @@ impl Runner { } /// Runs a choreography directly - pub fn run<'a, V, C: Choreography>(&'a self, choreo: C) -> V { - struct RunOp; - impl ChoreoOp for RunOp { - fn locally( + pub fn run<'a, V, C: Choreography>(&'a self, choreo: C) -> V { + struct RunOp(PhantomData); + impl ChoreoOp for RunOp { + fn locally( &self, _location: L1, computation: impl Fn(Unwrapper) -> V, @@ -350,7 +549,13 @@ impl Runner { Located::local(value) } - fn comm( + fn comm< + L1: ChoreographyLocation, + L2: ChoreographyLocation, + V: Portable, + Index1, + Index2, + >( &self, _sender: L1, _receiver: L2, @@ -362,7 +567,7 @@ impl Runner { Located::local(serde_json::from_str(s.as_str()).unwrap()) } - fn broadcast( + fn broadcast( &self, _sender: L1, data: Located, @@ -370,19 +575,24 @@ impl Runner { data.value.unwrap() } - fn call>(&self, choreo: C) -> R { - choreo.run(self) + fn call>(&self, choreo: C) -> R + where + M: LocationSet + Subset, + { + let op: RunOp = RunOp(PhantomData); + choreo.run(&op) } - fn colocally>( + fn colocally, Index>( &self, - _locations: &[&str], choreo: C, ) -> R { - choreo.run(self) + let op = RunOp::(PhantomData); + choreo.run(&op) } } - choreo.run(&RunOp) + let op: RunOp = RunOp(PhantomData); + choreo.run(&op) } } diff --git a/chorus_lib/src/transport.rs b/chorus_lib/src/transport.rs index a7f854b..090cc1a 100644 --- a/chorus_lib/src/transport.rs +++ b/chorus_lib/src/transport.rs @@ -2,3 +2,99 @@ pub mod http; pub mod local; + +use crate::core::{ChoreographyLocation, HCons, LocationSet}; +use std::collections::HashMap; +use std::marker::PhantomData; + +/// A generic struct for configuration of `Transport`. +#[derive(Clone)] +pub struct TransportConfig<'a, Target: ChoreographyLocation, TargetInfo, L: LocationSet, Info> { + /// The information about locations + pub info: HashMap<&'static str, Info>, + /// The information about the target choreography + pub target_info: (Target, TargetInfo), + /// The struct is parametrized by the location set (`L`). + location_set: PhantomData, + lifetime: PhantomData<&'a ()>, +} + +/// A builder for `TransportConfig`. +/// +/// Use this builder to create a `TransportConfig` instance. +/// +/// # Examples +/// +/// ``` +/// use chorus_lib::core::{LocationSet, ChoreographyLocation}; +/// use chorus_lib::transport::TransportConfigBuilder; +/// +/// #[derive(ChoreographyLocation)] +/// struct Alice; +/// +/// #[derive(ChoreographyLocation)] +/// struct Bob; +/// +/// let transport_config = TransportConfigBuilder::for_target(Alice, "value_for_target".to_string()) +/// .with(Bob, "value_for_bob".to_string()) +/// .build(); +/// ``` +pub struct TransportConfigBuilder< + 'a, + Target: ChoreographyLocation, + TargetInfo, + L: LocationSet, + Info, +> { + target: (Target, TargetInfo), + location_set: PhantomData, + info: HashMap<&'static str, Info>, + lifetime: PhantomData<&'a ()>, +} + +impl<'a, Target: ChoreographyLocation, TargetInfo, Info> + TransportConfigBuilder<'a, Target, TargetInfo, LocationSet!(Target), Info> +{ + /// Creates a new `TransportConfigBuilder` instance for a given target. + pub fn for_target(target: Target, info: TargetInfo) -> Self { + Self { + target: (target, info), + location_set: PhantomData, + info: HashMap::new(), + lifetime: PhantomData, + } + } +} + +impl<'a, Target: ChoreographyLocation, TargetInfo, L: LocationSet, Info> + TransportConfigBuilder<'a, Target, TargetInfo, L, Info> +{ + /// Adds information about a new `ChoreographyLocation`. + /// + /// This method tells the builder that the choreography involves a new location and how to communicate with it. + pub fn with<'b, NewLocation: ChoreographyLocation>( + self, + location: NewLocation, + info: Info, + ) -> TransportConfigBuilder<'b, Target, TargetInfo, HCons, Info> { + _ = location; + let mut new_info = self.info; + new_info.insert(NewLocation::name(), info); + TransportConfigBuilder { + target: self.target, + location_set: PhantomData, + info: new_info, + lifetime: PhantomData, + } + } + + /// Builds a `TransportConfig` instance. + pub fn build<'b>(self) -> TransportConfig<'b, Target, TargetInfo, L, Info> { + TransportConfig { + info: self.info, + target_info: self.target, + location_set: PhantomData, + lifetime: PhantomData, + } + } +} diff --git a/chorus_lib/src/transport/http.rs b/chorus_lib/src/transport/http.rs index a253c4b..8427758 100644 --- a/chorus_lib/src/transport/http.rs +++ b/chorus_lib/src/transport/http.rs @@ -3,6 +3,8 @@ use std::thread; use std::{collections::HashMap, sync::Arc}; +use std::marker::PhantomData; + use retry::{ delay::{jitter, Fixed}, retry, @@ -11,45 +13,72 @@ use tiny_http::Server; use ureq::{Agent, AgentBuilder}; use crate::{ - core::{Portable, Transport}, + core::{ChoreographyLocation, LocationSet, Member, Portable, Transport}, + transport::{TransportConfig, TransportConfigBuilder}, utils::queue::BlockingQueue, }; +type QueueMap = HashMap<&'static str, BlockingQueue>; + +/// Config for `HttpTransport`. +pub type HttpTransportConfig<'a, L, Target> = + TransportConfig<'a, Target, (&'a str, u16), L, (&'a str, u16)>; + +/// A builder for `HttpTransportConfig`. +/// +/// # Examples +/// +/// ``` +/// # use chorus_lib::core::{LocationSet, ChoreographyLocation}; +/// # use chorus_lib::transport::http::HttpTransportConfigBuilder; +/// # +/// # #[derive(ChoreographyLocation)] +/// # struct Alice; +/// # +/// # #[derive(ChoreographyLocation)] +/// # struct Bob; +/// # +/// let transport_config = HttpTransportConfigBuilder::for_target(Alice, ("0.0.0.0", 9010)) +/// .with(Bob, ("example.com", 80)) +/// .build(); +/// ``` +pub type HttpTransportConfigBuilder<'a, Target, L> = + TransportConfigBuilder<'a, Target, (&'a str, u16), L, (&'a str, u16)>; + /// The header name for the source location. const HEADER_SRC: &str = "X-CHORUS-SOURCE"; /// The HTTP transport. -pub struct HttpTransport { - config: HashMap, +pub struct HttpTransport<'a, L: LocationSet, TLocation> { + config: HashMap<&'static str, (&'a str, u16)>, agent: Agent, - queue_map: Arc>>, server: Arc, join_handle: Option>, + location_set: PhantomData, + queue_map: Arc, + target_location: PhantomData, } -impl HttpTransport { - /// Creates a new `HttpTransport` instance from the projection target and a configuration. - pub fn new(at: &'static str, config: &HashMap<&str, (&str, u16)>) -> Self { - let config = HashMap::from_iter( - config - .iter() - .map(|(k, (hostname, port))| (k.to_string(), (hostname.to_string(), *port))), - ); - let locs = Vec::from_iter(config.keys().map(|s| s.clone())); - - let queue_map = { +impl<'a, L: LocationSet, TLocation: ChoreographyLocation> HttpTransport<'a, L, TLocation> { + /// Creates a new `HttpTransport` instance from the configuration. + pub fn new(http_config: HttpTransportConfig<'a, L, TLocation>) -> Self + where + TLocation: Member, + { + let queue_map: Arc = { let mut m = HashMap::new(); - for loc in &locs { - m.insert(loc.to_string(), BlockingQueue::new()); + for loc in L::to_string_list() { + m.insert(loc, BlockingQueue::new()); } - Arc::new(m) + Arc::new(m.into()) }; - let (hostname, port) = config.get(at).unwrap(); + let (_, (hostname, port)) = &http_config.target_info; let server = Arc::new(Server::http(format!("{}:{}", hostname, port)).unwrap()); let join_handle = Some({ let server = server.clone(); let queue_map = queue_map.clone(); + thread::spawn(move || { for mut request in server.incoming_requests() { let mut body = String::new(); @@ -80,25 +109,29 @@ impl HttpTransport { let agent = AgentBuilder::new().build(); Self { - config, + config: http_config.info, agent, - queue_map, join_handle, server, + location_set: PhantomData, + queue_map, + target_location: PhantomData, } } } -impl Drop for HttpTransport { +impl<'a, L: LocationSet, TLocation> Drop for HttpTransport<'a, L, TLocation> { fn drop(&mut self) { self.server.unblock(); self.join_handle.take().map(thread::JoinHandle::join); } } -impl Transport for HttpTransport { - fn locations(&self) -> Vec { - Vec::from_iter(self.config.keys().map(|s| s.clone())) +impl<'a, L: LocationSet, TLocation: ChoreographyLocation> Transport + for HttpTransport<'a, L, TLocation> +{ + fn locations(&self) -> Vec<&'static str> { + self.config.keys().cloned().collect() } fn send(&self, from: &str, to: &str, data: &V) -> () { @@ -133,28 +166,36 @@ mod tests { #[derive(ChoreographyLocation)] struct Bob; + #[derive(ChoreographyLocation)] + struct Carol; + #[test] fn test_http_transport() { let v = 42; - let mut config = HashMap::new(); + let (signal, wait) = mpsc::channel::<()>(); - config.insert(Alice.name(), ("localhost", 9010)); - config.insert(Bob.name(), ("localhost", 9011)); + let mut handles = Vec::new(); { - let config = config.clone(); + let config = HttpTransportConfigBuilder::for_target(Alice, ("0.0.0.0", 9010)) + .with(Bob, ("localhost", 9011)) + .build(); + handles.push(thread::spawn(move || { wait.recv().unwrap(); // wait for Bob to start - let transport = HttpTransport::new(Alice.name(), &config); - transport.send::(Alice.name(), Bob.name(), &v); + let transport = HttpTransport::new(config); + transport.send::(Alice::name(), Bob::name(), &v); })); } { - let config = config.clone(); + let config = HttpTransportConfigBuilder::for_target(Bob, ("0.0.0.0", 9011)) + .with(Alice, ("localhost", 9010)) + .build(); + handles.push(thread::spawn(move || { - let transport = HttpTransport::new(Bob.name(), &config); + let transport = HttpTransport::new(config); signal.send(()).unwrap(); - let v2 = transport.receive::(Alice.name(), Bob.name()); + let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); })); } @@ -166,27 +207,31 @@ mod tests { #[test] fn test_http_transport_retry() { let v = 42; - let mut config = HashMap::new(); let (signal, wait) = mpsc::channel::<()>(); - config.insert(Alice.name(), ("localhost", 9020)); - config.insert(Bob.name(), ("localhost", 9021)); + let mut handles = Vec::new(); { - let config = config.clone(); + let config = HttpTransportConfigBuilder::for_target(Alice, ("0.0.0.0", 9020)) + .with(Bob, ("localhost", 9021)) + .build(); + handles.push(thread::spawn(move || { signal.send(()).unwrap(); - let transport = HttpTransport::new(Alice.name(), &config); - transport.send::(Alice.name(), Bob.name(), &v); + let transport = HttpTransport::new(config); + transport.send::(Alice::name(), Bob::name(), &v); })); } { - let config = config.clone(); + let config = HttpTransportConfigBuilder::for_target(Bob, ("0.0.0.0", 9021)) + .with(Alice, ("localhost", 9020)) + .build(); + handles.push(thread::spawn(move || { // wait for Alice to start, which forces Alice to retry wait.recv().unwrap(); sleep(Duration::from_millis(100)); - let transport = HttpTransport::new(Bob.name(), &config); - let v2 = transport.receive::(Alice.name(), Bob.name()); + let transport = HttpTransport::new(config); + let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); })); } diff --git a/chorus_lib/src/transport/local.rs b/chorus_lib/src/transport/local.rs index 724c434..22be7d1 100644 --- a/chorus_lib/src/transport/local.rs +++ b/chorus_lib/src/transport/local.rs @@ -5,52 +5,156 @@ use std::sync::Arc; use serde_json; -use crate::core::{Portable, Transport}; +use std::marker::PhantomData; + +use crate::core::{ChoreographyLocation, HCons, LocationSet, Portable, Transport}; use crate::utils::queue::BlockingQueue; type QueueMap = HashMap>>; -/// The local transport. -/// -/// This transport uses a blocking queue to allow for communication between threads. Each location must be executed in its thread. -/// -/// Unlike network-based transports, all locations must share the same `LocalTransport` instance. The struct implements `Clone` so that it can be shared across threads. -#[derive(Clone)] -pub struct LocalTransport { - internal_locations: Vec, +/// A Transport channel used between multiple `Transport`s. +pub struct LocalTransportChannel { + /// The location set where the channel is defined on. + location_set: std::marker::PhantomData, queue_map: Arc, } -impl LocalTransport { - /// Creates a new `LocalTransport` instance from a list of locations. - pub fn from(locations: &[&str]) -> Self { +impl Clone for LocalTransportChannel { + fn clone(&self) -> Self { + LocalTransportChannel { + location_set: PhantomData, + queue_map: self.queue_map.clone(), + } + } +} + +impl LocalTransportChannel { + /// Creates a new `LocalTransportChannel` instance. + /// + /// You must specify the location set of the channel. The channel can only be used by locations in the set. + /// + /// In most cases, you should use `LocalTransportChannelBuilder` instead of calling this method directly. + /// + /// # Examples + /// ``` + /// use chorus_lib::transport::local::{LocalTransportChannel}; + /// use chorus_lib::core::{LocationSet, ChoreographyLocation}; + /// + /// #[derive(ChoreographyLocation)] + /// struct Alice; + /// + /// #[derive(ChoreographyLocation)] + /// struct Bob; + /// + /// let transport_channel = LocalTransportChannel::::new(); + /// ``` + pub fn new() -> LocalTransportChannel { let mut queue_map: QueueMap = HashMap::new(); - for sender in locations.clone() { + let str_list = L::to_string_list(); + for sender in &str_list { let mut n = HashMap::new(); - for receiver in locations.clone() { + for receiver in &str_list { n.insert(receiver.to_string(), BlockingQueue::new()); } queue_map.insert(sender.to_string(), n); } - let mut locations_vec = Vec::new(); - for loc in locations.clone() { - locations_vec.push(loc.to_string()); + + LocalTransportChannel { + location_set: PhantomData, + queue_map: queue_map.into(), + } + } +} + +/// A builder for `LocalTransportChannel`. +/// +/// Use this builder to create a `LocalTransportChannel` instance. +/// +/// # Examples +/// +/// ``` +/// use chorus_lib::core::{LocationSet, ChoreographyLocation}; +/// use chorus_lib::transport::local::{LocalTransportChannelBuilder}; +/// +/// #[derive(ChoreographyLocation)] +/// struct Alice; +/// +/// #[derive(ChoreographyLocation)] +/// struct Bob; +/// +/// let transport_channel = LocalTransportChannelBuilder::new() +/// .with(Alice) +/// .with(Bob) +/// .build(); +/// ``` +pub struct LocalTransportChannelBuilder { + location_set: PhantomData, +} + +impl LocalTransportChannelBuilder { + /// Creates a new `LocalTransportChannelBuilder` instance + pub fn new() -> Self { + Self { + location_set: PhantomData, } + } +} + +impl LocalTransportChannelBuilder { + /// Adds a new location to the set of locations in the `LocalTransportChannel`. + pub fn with( + &self, + location: NewLocation, + ) -> LocalTransportChannelBuilder> { + _ = location; + LocalTransportChannelBuilder { + location_set: PhantomData, + } + } + + /// Builds a `LocalTransportChannel` instance. + pub fn build(&self) -> LocalTransportChannel { + LocalTransportChannel::new() + } +} + +/// The local transport. +/// +/// This transport uses a blocking queue to allow for communication between threads. Each location must be executed in its thread. +/// +/// All locations must share the same `LocalTransportChannel` instance. `LocalTransportChannel` implements `Clone` so that it can be shared across threads. +pub struct LocalTransport { + internal_locations: Vec<&'static str>, + location_set: PhantomData, + local_channel: LocalTransportChannel, + target_location: PhantomData, +} + +impl LocalTransport { + /// Creates a new `LocalTransport` instance from a Target `ChoreographyLocation` and a `LocalTransportChannel`. + pub fn new(target: TargetLocation, local_channel: LocalTransportChannel) -> Self { + _ = target; + LocalTransport { - queue_map: Arc::new(queue_map), - internal_locations: locations_vec, + internal_locations: L::to_string_list(), + location_set: PhantomData, + local_channel, + target_location: PhantomData, } } } -impl Transport for LocalTransport { - fn locations(&self) -> Vec { +impl Transport + for LocalTransport +{ + fn locations(&self) -> Vec<&'static str> { return self.internal_locations.clone(); } fn send(&self, from: &str, to: &str, data: &T) -> () { let data = serde_json::to_string(data).unwrap(); - self.queue_map + self.local_channel + .queue_map .get(from) .unwrap() .get(to) @@ -59,7 +163,14 @@ impl Transport for LocalTransport { } fn receive(&self, from: &str, at: &str) -> T { - let data = self.queue_map.get(from).unwrap().get(at).unwrap().pop(); + let data = self + .local_channel + .queue_map + .get(from) + .unwrap() + .get(at) + .unwrap() + .pop(); serde_json::from_str(&data).unwrap() } } @@ -79,18 +190,23 @@ mod tests { #[test] fn test_local_transport() { let v = 42; - let transport = LocalTransport::from(&[Alice.name(), Bob.name()]); + + let transport_channel = LocalTransportChannelBuilder::new() + .with(Alice) + .with(Bob) + .build(); + let mut handles = Vec::new(); { - let transport = transport.clone(); + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { - transport.send::(Alice.name(), Bob.name(), &v); + transport.send::(Alice::name(), Bob::name(), &v); })); } { - let transport = transport.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { - let v2 = transport.receive::(Alice.name(), Bob.name()); + let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); })); }