From 83a5212927d99f5e027b7b640e71df9fbecae769 Mon Sep 17 00:00:00 2001 From: Soroush Zare Date: Wed, 27 Sep 2023 19:20:51 -0700 Subject: [PATCH] Change the API so that we have a builder pattern for LocalTransportChannel --- chorus_book/src/guide-projector.md | 4 +-- chorus_book/src/guide-transport.md | 27 ++++++++++----- chorus_book/src/header.txt | 2 +- chorus_lib/examples/bookseller.rs | 2 +- chorus_lib/examples/bookseller2.rs | 6 +++- chorus_lib/examples/hello.rs | 11 +++--- chorus_lib/examples/loc-poly.rs | 6 +++- chorus_lib/examples/tic-tac-toe.rs | 12 +++---- chorus_lib/src/core.rs | 27 ++++----------- chorus_lib/src/transport.rs | 25 ++++---------- chorus_lib/src/transport/http.rs | 39 +++++++-------------- chorus_lib/src/transport/local.rs | 54 +++++++++++++++++++++--------- 12 files changed, 108 insertions(+), 107 deletions(-) diff --git a/chorus_book/src/guide-projector.md b/chorus_book/src/guide-projector.md index 7c01015..0a84c40 100644 --- a/chorus_book/src/guide-projector.md +++ b/chorus_book/src/guide-projector.md @@ -11,7 +11,7 @@ To create a `Projector`, you need to provide the target location and the transpo # use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; # use chorus_lib::core::{ChoreographyLocation, Projector}; # use chorus_lib::{LocationSet}; -# let transport_channel = LocalTransportChannel::::new(); +# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); # let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); # #[derive(ChoreographyLocation)] # struct Alice; @@ -31,7 +31,7 @@ To execute a choreography, you need to call the `epp_and_run` method on the `Pro # use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel}; # use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp}; # use chorus_lib::{LocationSet}; -# let transport_channel = LocalTransportChannel::::new(); +# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); # let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); # #[derive(ChoreographyLocation)] # struct Alice; diff --git a/chorus_book/src/guide-transport.md b/chorus_book/src/guide-transport.md index 311f8fa..504030e 100644 --- a/chorus_book/src/guide-transport.md +++ b/chorus_book/src/guide-transport.md @@ -8,7 +8,7 @@ ChoRus provides two built-in transports: `local` and `http`. ### The Local Transport -The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. Each `local` transport is defined over `LocalTransportChannel`, which contains the set of `ChoreographyLocation` that the `local` transport operates on. You can build a `LocalTransportChannel` by importing the `LocalTransportChannel` stsruct from the `chorus_lib` crate. +The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. Each `local` transport is defined over `LocalTransportChannel`, which contains the set of `ChoreographyLocation` that the `local` transport operates on. You can build a `LocalTransportChannel` by importing the `LocalTransportChannel` struct from the `chorus_lib` crate. ```rust # extern crate chorus_lib; @@ -20,7 +20,7 @@ The `local` transport is used to execute choreographies on the same machine on d # struct Bob; use chorus_lib::transport::local::LocalTransportChannel; -let transport_channel = LocalTransportChannel::::new(); +let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); ``` To use the `local` transport, first import the `LocalTransport` struct from the `chorus_lib` crate. @@ -34,7 +34,7 @@ Then build the transport by using the `LocalTransport::new` associated function, # #[derive(ChoreographyLocation)] # struct Alice; # use chorus_lib::transport::local::LocalTransportChannel; -# let transport_channel = LocalTransportChannel::::new(); +# let transport_channel = LocalTransportChannel::new().with(Alice); use chorus_lib::transport::local::{LocalTransport}; let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); @@ -58,7 +58,7 @@ Because of the nature of the `Local` transport, you must use the same `LocalTran # fn run(self, op: &impl ChoreoOp) { # } # } -let transport_channel = LocalTransportChannel::::new(); +let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); let mut handles: Vec> = Vec::new(); { // create a transport for Alice @@ -89,23 +89,32 @@ To use the `http` transport, import the `HttpTransport` struct and the `HttpTran use chorus_lib::transport::http::{HttpTransport, HttpTransportConfig}; ``` -The primary constructor requires an argument of type `HttpTransportConfig`. To create an instance of this configuration, utilize the builder pattern. Start with `HttpTransportConfig::for_target(target_location, target_information)` and then chain additional locations using the `.with(other_location, other_location_information)` method. Conclude with `.build()`. In this context, `target_location` refers to the target `ChoreographyLocation`, and `target_information` is specifically a tuple of `(host_name: String, port: u16)`. Subsequent calls to `.with()` allow you to add more locations and their respective information. For the `HttpTransport`, think of `HttpTransportConfig` as a mapping from locations to their hostnames and ports. However, for other generic transports, the corresponding information might vary, potentially diverging from the `(host_name, port)` format presented here. In some cases, the `target_information` could even have a different type than the following `other_location_information` types. But all the `other_location_information`s should have the same type. +The primary constructor requires an argument of type `HttpTransportConfig`. To create an instance of this configuration, start with `HttpTransportConfig::for_target(target_location, (hostname, port))`. It will create set a projection target and the hostname and port to listen on. Then, provide information to connect to other locations by method-chaining the `.with(other_location, (hostname, port))` method. You can think of `HttpTransportConfig` as a mapping from locations to their hostnames and ports. ```rust {{#include ./header.txt}} # use chorus_lib::transport::http::{HttpTransport, HttpTransportConfig}; let config = HttpTransportConfig::for_target(Alice, ("localhost".to_string(), 8080)) - .with(Bob, ("localhost".to_string(), 8081)) - .build(); + .with(Bob, ("localhost".to_string(), 8081)); -let transport = HttpTransport::new(&config); +let transport = HttpTransport::new(config); ``` In the above example, the transport will start the HTTP server on port 8080 on localhost. If Alice needs to send a message to Bob, it will use `http://localhost:8081` as the destination. ## Creating a Custom Transport -You can also create your own transport by implementing the `Transport` trait. See the API documentation for more details. +You can also create your own transport by implementing the `Transport` trait. It might be helpful first build a `TransportConfig` to have the the information that you need for each `ChoreographyLocation`, and then have a constructor that takes the `TransportConfig` and builds the `Transport` based on it. While the syntax is similar to `HttpTransportConfig`, the type of information for each `ChoreographyLocation` might diverge from the `(host_name, port)` format presented here. In some cases, the `target_information` could even have a different type than the following `other_location_information` types. But all the `other_location_information`s should have the same type. + +```rust +{{#include ./header.txt}} +# use chorus_lib::transport::TransportConfig; +let config = TransportConfig::for_target(Alice, ()) + .with(Bob, ("localhost".to_string(), 8081)) + .with(Carol, ("localhost".to_string(), 8082)); +``` + +See the API documentation for more details. ### Note on the location set of the Choreography diff --git a/chorus_book/src/header.txt b/chorus_book/src/header.txt index 60d16f4..7231c30 100644 --- a/chorus_book/src/header.txt +++ b/chorus_book/src/header.txt @@ -8,7 +8,7 @@ # struct Bob; # #[derive(ChoreographyLocation)] # struct Carol; -# let transport_channel = LocalTransportChannel::::new(); +# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob).with(Carol); # let alice_transport = LocalTransport::new(Alice, transport_channel.clone()); # let bob_transport = LocalTransport::new(Bob, transport_channel.clone()); # let carol_transport = LocalTransport::new(Carol, transport_channel.clone()); \ No newline at end of file diff --git a/chorus_lib/examples/bookseller.rs b/chorus_lib/examples/bookseller.rs index 4115bc9..7889aa6 100644 --- a/chorus_lib/examples/bookseller.rs +++ b/chorus_lib/examples/bookseller.rs @@ -73,7 +73,7 @@ impl Choreography for BooksellerChoreography { } fn main() { - let transport_channel = LocalTransportChannel::::new(); + let transport_channel = LocalTransportChannel::new().with(Seller).with(Buyer); let transport_seller = LocalTransport::new(Seller, transport_channel.clone()); let transport_buyer = LocalTransport::new(Buyer, transport_channel.clone()); diff --git a/chorus_lib/examples/bookseller2.rs b/chorus_lib/examples/bookseller2.rs index 4ea5be3..729d5af 100644 --- a/chorus_lib/examples/bookseller2.rs +++ b/chorus_lib/examples/bookseller2.rs @@ -143,7 +143,11 @@ fn main() { i }; - let transport_channel = LocalTransportChannel::::new(); + let transport_channel = LocalTransportChannel::new() + .with(Seller) + .with(Buyer1) + .with(Buyer2); + let seller_projector = Arc::new(Projector::new( Seller, LocalTransport::new(Seller, transport_channel.clone()), diff --git a/chorus_lib/examples/hello.rs b/chorus_lib/examples/hello.rs index beeb9d5..ba8bcfd 100644 --- a/chorus_lib/examples/hello.rs +++ b/chorus_lib/examples/hello.rs @@ -40,19 +40,20 @@ impl Choreography for HelloWorldChoreography { fn main() { let mut handles: Vec> = Vec::new(); // Create a transport channel - let transport_channel = LocalTransportChannel::::new(); + // let transport_channel = LocalTransportChannel::::new(); + let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); // Run the choreography in two threads { - let transport_channel = transport_channel.clone(); - let transport = LocalTransport::new(Alice, transport_channel); + // let transport_channel = transport_channel.clone(); + let transport = LocalTransport::new(Alice, transport_channel.clone()); handles.push(thread::spawn(move || { let p = Projector::new(Alice, transport); p.epp_and_run(HelloWorldChoreography); })); } { - let transport_channel = transport_channel.clone(); - let transport = LocalTransport::new(Bob, transport_channel); + // let transport_channel = transport_channel.clone(); + let transport = LocalTransport::new(Bob, transport_channel.clone()); handles.push(thread::spawn(move || { let p = Projector::new(Bob, transport); p.epp_and_run(HelloWorldChoreography); diff --git a/chorus_lib/examples/loc-poly.rs b/chorus_lib/examples/loc-poly.rs index ad6f09a..e6b8e0b 100644 --- a/chorus_lib/examples/loc-poly.rs +++ b/chorus_lib/examples/loc-poly.rs @@ -57,7 +57,11 @@ impl Choreography> for MainChoreography { } fn main() { - let transport_channel = LocalTransportChannel::::new(); + let transport_channel = LocalTransportChannel::new() + .with(Alice) + .with(Bob) + .with(Carol); + let mut handles = vec![]; { let transport = LocalTransport::new(Alice, transport_channel.clone()); diff --git a/chorus_lib/examples/tic-tac-toe.rs b/chorus_lib/examples/tic-tac-toe.rs index 200b797..79abb03 100644 --- a/chorus_lib/examples/tic-tac-toe.rs +++ b/chorus_lib/examples/tic-tac-toe.rs @@ -305,9 +305,9 @@ fn main() { args.opponent_hostname.as_str().to_string(), args.opponent_port, ), - ) - .build(); - let transport = HttpTransport::new(&config); + ); + + let transport = HttpTransport::new(config); let projector = Projector::new(PlayerX, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.local(brain), @@ -325,9 +325,9 @@ fn main() { args.opponent_hostname.as_str().to_string(), args.opponent_port, ), - ) - .build(); - let transport = HttpTransport::new(&config); + ); + + let transport = HttpTransport::new(config); let projector = Projector::new(PlayerO, transport); projector.epp_and_run(TicTacToeChoreography { brain_for_x: projector.remote(PlayerX), diff --git a/chorus_lib/src/core.rs b/chorus_lib/src/core.rs index 4fe152e..f2ba43e 100644 --- a/chorus_lib/src/core.rs +++ b/chorus_lib/src/core.rs @@ -96,12 +96,11 @@ pub trait HList { /// returns fn to_string_list() -> Vec<&'static str>; } + /// end of HList -#[derive(Clone)] pub struct HNil; -/// An element of HList -#[derive(Clone)] +/// An element of HList pub struct HCons(Head, Tail); impl HList for HNil { @@ -171,22 +170,6 @@ where { } -/// Equal trait -pub trait Equal {} - -// Base case: HNil is equal to HNil -impl Equal for HNil {} - -// Recursive case: Head::Tail is equal to L if -// 1. Head is a member of L -// 2. Tail is equal to the remainder of L -impl Equal> for HCons -where - Head: Member, - Tail: Equal, -{ -} - /// Provides a method to work with located values at the current location pub struct Unwrapper { phantom: PhantomData, @@ -280,7 +263,11 @@ pub trait Choreography { /// Provides methods to send and receive messages. /// /// The trait provides methods to send and receive messages between locations. Implement this trait to define a custom transport. -pub trait Transport { +/// +/// The type parameter `L` is the location set that the transport is operating on. +/// +/// The type paramter `TargetLocation` is the target `ChoreographyLocation`. +pub trait Transport { /// Returns a list of locations. fn locations(&self) -> Vec; /// Sends a message from `from` to `to`. diff --git a/chorus_lib/src/transport.rs b/chorus_lib/src/transport.rs index 3329bd1..9f8209a 100644 --- a/chorus_lib/src/transport.rs +++ b/chorus_lib/src/transport.rs @@ -13,11 +13,11 @@ use std::marker::PhantomData; pub struct TransportConfig { /// The information about locations - pub info: HashMap, + info: HashMap, /// The information about the target choreography - pub target_info: (TargetLocation, TargetInfoType), + target_info: (TargetLocation, TargetInfoType), /// The struct is parametrized by the location set (`L`). - pub location_set: PhantomData, + location_set: PhantomData, } impl @@ -38,30 +38,17 @@ impl { /// Adds information about a new `ChoreographyLocation`. pub fn with( - self, + mut self, _location: NewLocation, info: InfoType, ) -> TransportConfig, InfoType, TargetLocation, TargetInfoType> where { - let mut new_info = HashMap::new(); - for (k, v) in self.info.into_iter() { - new_info.insert(k, v); - } - new_info.insert(NewLocation::name().to_string(), info); + self.info.insert(NewLocation::name().to_string(), info); - TransportConfig { - info: new_info, - target_info: self.target_info, - location_set: PhantomData, - } - } - - /// Finalize the `TransportConfig`. - pub fn build(self) -> TransportConfig { TransportConfig { info: self.info, - location_set: PhantomData, target_info: self.target_info, + location_set: PhantomData, } } } diff --git a/chorus_lib/src/transport/http.rs b/chorus_lib/src/transport/http.rs index 53a574d..0c20a58 100644 --- a/chorus_lib/src/transport/http.rs +++ b/chorus_lib/src/transport/http.rs @@ -26,15 +26,6 @@ pub type HttpTransportConfig = TransportConfig -#[derive(Clone)] -pub struct HttpConfig { - /// The information about locations - pub info: HashMap, - /// The struct is parametrized by the location set (`L`). - pub location_set: PhantomData, -} - /// The HTTP transport. pub struct HttpTransport { config: HashMap, @@ -48,7 +39,7 @@ pub struct HttpTransport { impl HttpTransport { /// Creates a new `HttpTransport` instance from the configuration. - pub fn new(http_config: &HttpTransportConfig) -> Self + pub fn new(http_config: HttpTransportConfig) -> Self where TLocation: Member, { @@ -60,8 +51,6 @@ impl HttpTransport { Arc::new(m.into()) }; - let info = &http_config.info; - let (_, (hostname, port)) = &http_config.target_info; let server = Arc::new(Server::http(format!("{}:{}", hostname, port)).unwrap()); let join_handle = Some({ @@ -98,7 +87,7 @@ impl HttpTransport { let agent = AgentBuilder::new().build(); Self { - config: info.clone(), + config: http_config.info, agent, join_handle, server, @@ -116,7 +105,9 @@ impl Drop for HttpTransport { } } -impl Transport for HttpTransport { +impl Transport + for HttpTransport +{ fn locations(&self) -> Vec { Vec::from_iter(self.config.keys().map(|s| s.clone())) } @@ -165,22 +156,20 @@ mod tests { let mut handles = Vec::new(); { let config = HttpTransportConfig::for_target(Alice, ("0.0.0.0".to_string(), 9010)) - .with(Bob, ("localhost".to_string(), 9011)) - .build(); + .with(Bob, ("localhost".to_string(), 9011)); handles.push(thread::spawn(move || { wait.recv().unwrap(); // wait for Bob to start - let transport = HttpTransport::new(&config); + let transport = HttpTransport::new(config); transport.send::(Alice::name(), Bob::name(), &v); })); } { let config = HttpTransportConfig::for_target(Bob, ("0.0.0.0".to_string(), 9011)) - .with(Alice, ("localhost".to_string(), 9010)) - .build(); + .with(Alice, ("localhost".to_string(), 9010)); handles.push(thread::spawn(move || { - let transport = HttpTransport::new(&config); + let transport = HttpTransport::new(config); signal.send(()).unwrap(); let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); @@ -199,25 +188,23 @@ mod tests { let mut handles = Vec::new(); { let config = HttpTransportConfig::for_target(Alice, ("0.0.0.0".to_string(), 9020)) - .with(Bob, ("localhost".to_string(), 9021)) - .build(); + .with(Bob, ("localhost".to_string(), 9021)); handles.push(thread::spawn(move || { signal.send(()).unwrap(); - let transport = HttpTransport::new(&config); + let transport = HttpTransport::new(config); transport.send::(Alice::name(), Bob::name(), &v); })); } { let config = HttpTransportConfig::for_target(Bob, ("0.0.0.0".to_string(), 9021)) - .with(Alice, ("localhost".to_string(), 9020)) - .build(); + .with(Alice, ("localhost".to_string(), 9020)); handles.push(thread::spawn(move || { // wait for Alice to start, which forces Alice to retry wait.recv().unwrap(); sleep(Duration::from_millis(100)); - let transport = HttpTransport::new(&config); + let transport = HttpTransport::new(config); let v2 = transport.receive::(Alice::name(), Bob::name()); assert_eq!(v, v2); })); diff --git a/chorus_lib/src/transport/local.rs b/chorus_lib/src/transport/local.rs index 511b3bc..3ba58b3 100644 --- a/chorus_lib/src/transport/local.rs +++ b/chorus_lib/src/transport/local.rs @@ -7,37 +7,60 @@ use serde_json; use std::marker::PhantomData; -#[cfg(test)] use crate::LocationSet; -use crate::core::{ChoreographyLocation, HList, Portable, Transport}; +use crate::core::{ChoreographyLocation, HCons, HList, Portable, Transport}; use crate::utils::queue::BlockingQueue; type QueueMap = HashMap>>; /// A Transport channel used between multiple `Transport`s. -pub struct LocalTransportChannel { +pub struct LocalTransportChannel { /// The location set where the channel is defined on. - pub location_set: std::marker::PhantomData, - queue_map: QueueMap, + location_set: std::marker::PhantomData, + queue_map: Arc, +} + +impl Clone for LocalTransportChannel { + fn clone(&self) -> Self { + LocalTransportChannel { + location_set: PhantomData, + queue_map: self.queue_map.clone(), + } + } +} + +impl LocalTransportChannel { + /// Creates a new `LocalTransportChannel` instance + pub fn new() -> Self { + Self { + location_set: PhantomData, + queue_map: HashMap::new().into(), + } + } } impl LocalTransportChannel { - /// Creates a `LocalTransportChannel`. - pub fn new() -> Arc> { + /// Adds a new location to the set of locations in the `LocalTransportChannel`. + pub fn with( + self, + _location: NewLocation, + ) -> LocalTransportChannel> { let mut queue_map: QueueMap = HashMap::new(); - for sender in L::to_string_list() { + let mut str_list = L::to_string_list(); + str_list.push(NewLocation::name()); + for sender in &str_list { let mut n = HashMap::new(); - for receiver in L::to_string_list() { + for receiver in &str_list { n.insert(receiver.to_string(), BlockingQueue::new()); } queue_map.insert(sender.to_string(), n); } - Arc::new(LocalTransportChannel { + LocalTransportChannel { location_set: PhantomData, - queue_map, - }) + queue_map: queue_map.into(), + } } } @@ -46,17 +69,16 @@ impl LocalTransportChannel { /// This transport uses a blocking queue to allow for communication between threads. Each location must be executed in its thread. /// /// Unlike network-based transports, all locations must share the same `LocalTransport` instance. The struct implements `Clone` so that it can be shared across threads. -#[derive(Clone)] pub struct LocalTransport { internal_locations: Vec, location_set: PhantomData, - local_channel: Arc>, + local_channel: LocalTransportChannel, target_location: PhantomData, } impl LocalTransport { /// Creates a new `LocalTransport` instance from a Target `ChoreographyLocation` and a `LocalTransportChannel`. - pub fn new(_target: TargetLocation, local_channel: Arc>) -> Self { + pub fn new(_target: TargetLocation, local_channel: LocalTransportChannel) -> Self { let locations_list = L::to_string_list(); let mut locations_vec = Vec::new(); @@ -120,7 +142,7 @@ mod tests { fn test_local_transport() { let v = 42; - let transport_channel = LocalTransportChannel::::new(); + let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob); let mut handles = Vec::new(); {