Skip to content

Commit

Permalink
Change the API so that we have a builder pattern for LocalTransportCh…
Browse files Browse the repository at this point in the history
…annel
  • Loading branch information
ihaveint committed Sep 29, 2023
1 parent f0b8461 commit 83a5212
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 107 deletions.
4 changes: 2 additions & 2 deletions chorus_book/src/guide-projector.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ To create a `Projector`, you need to provide the target location and the transpo
# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel};
# use chorus_lib::core::{ChoreographyLocation, Projector};
# use chorus_lib::{LocationSet};
# let transport_channel = LocalTransportChannel::<LocationSet!(Alice, Bob)>::new();
# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob);
# let alice_transport = LocalTransport::new(Alice, transport_channel.clone());
# #[derive(ChoreographyLocation)]
# struct Alice;
Expand All @@ -31,7 +31,7 @@ To execute a choreography, you need to call the `epp_and_run` method on the `Pro
# use chorus_lib::transport::local::{LocalTransport, LocalTransportChannel};
# use chorus_lib::core::{ChoreographyLocation, Projector, Choreography, ChoreoOp};
# use chorus_lib::{LocationSet};
# let transport_channel = LocalTransportChannel::<LocationSet!(Alice, Bob)>::new();
# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob);
# let alice_transport = LocalTransport::new(Alice, transport_channel.clone());
# #[derive(ChoreographyLocation)]
# struct Alice;
Expand Down
27 changes: 18 additions & 9 deletions chorus_book/src/guide-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ChoRus provides two built-in transports: `local` and `http`.

### The Local Transport

The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. Each `local` transport is defined over `LocalTransportChannel`, which contains the set of `ChoreographyLocation` that the `local` transport operates on. You can build a `LocalTransportChannel` by importing the `LocalTransportChannel` stsruct from the `chorus_lib` crate.
The `local` transport is used to execute choreographies on the same machine on different threads. This is useful for testing and prototyping. Each `local` transport is defined over `LocalTransportChannel`, which contains the set of `ChoreographyLocation` that the `local` transport operates on. You can build a `LocalTransportChannel` by importing the `LocalTransportChannel` struct from the `chorus_lib` crate.

```rust
# extern crate chorus_lib;
Expand All @@ -20,7 +20,7 @@ The `local` transport is used to execute choreographies on the same machine on d
# struct Bob;
use chorus_lib::transport::local::LocalTransportChannel;

let transport_channel = LocalTransportChannel::<LocationSet!(Alice, Bob)>::new();
let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob);
```

To use the `local` transport, first import the `LocalTransport` struct from the `chorus_lib` crate.
Expand All @@ -34,7 +34,7 @@ Then build the transport by using the `LocalTransport::new` associated function,
# #[derive(ChoreographyLocation)]
# struct Alice;
# use chorus_lib::transport::local::LocalTransportChannel;
# let transport_channel = LocalTransportChannel::<LocationSet!(Alice)>::new();
# let transport_channel = LocalTransportChannel::new().with(Alice);
use chorus_lib::transport::local::{LocalTransport};

let alice_transport = LocalTransport::new(Alice, transport_channel.clone());
Expand All @@ -58,7 +58,7 @@ Because of the nature of the `Local` transport, you must use the same `LocalTran
# fn run(self, op: &impl ChoreoOp<Self::L>) {
# }
# }
let transport_channel = LocalTransportChannel::<LocationSet!(Alice, Bob)>::new();
let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob);
let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();
{
// create a transport for Alice
Expand Down Expand Up @@ -89,23 +89,32 @@ To use the `http` transport, import the `HttpTransport` struct and the `HttpTran
use chorus_lib::transport::http::{HttpTransport, HttpTransportConfig};
```

The primary constructor requires an argument of type `HttpTransportConfig`. To create an instance of this configuration, utilize the builder pattern. Start with `HttpTransportConfig::for_target(target_location, target_information)` and then chain additional locations using the `.with(other_location, other_location_information)` method. Conclude with `.build()`. In this context, `target_location` refers to the target `ChoreographyLocation`, and `target_information` is specifically a tuple of `(host_name: String, port: u16)`. Subsequent calls to `.with()` allow you to add more locations and their respective information. For the `HttpTransport`, think of `HttpTransportConfig` as a mapping from locations to their hostnames and ports. However, for other generic transports, the corresponding information might vary, potentially diverging from the `(host_name, port)` format presented here. In some cases, the `target_information` could even have a different type than the following `other_location_information` types. But all the `other_location_information`s should have the same type.
The primary constructor requires an argument of type `HttpTransportConfig`. To create an instance of this configuration, start with `HttpTransportConfig::for_target(target_location, (hostname, port))`. It will create set a projection target and the hostname and port to listen on. Then, provide information to connect to other locations by method-chaining the `.with(other_location, (hostname, port))` method. You can think of `HttpTransportConfig` as a mapping from locations to their hostnames and ports.

```rust
{{#include ./header.txt}}
# use chorus_lib::transport::http::{HttpTransport, HttpTransportConfig};
let config = HttpTransportConfig::for_target(Alice, ("localhost".to_string(), 8080))
.with(Bob, ("localhost".to_string(), 8081))
.build();
.with(Bob, ("localhost".to_string(), 8081));

let transport = HttpTransport::new(&config);
let transport = HttpTransport::new(config);
```

In the above example, the transport will start the HTTP server on port 8080 on localhost. If Alice needs to send a message to Bob, it will use `http://localhost:8081` as the destination.

## Creating a Custom Transport

You can also create your own transport by implementing the `Transport` trait. See the API documentation for more details.
You can also create your own transport by implementing the `Transport` trait. It might be helpful first build a `TransportConfig` to have the the information that you need for each `ChoreographyLocation`, and then have a constructor that takes the `TransportConfig` and builds the `Transport` based on it. While the syntax is similar to `HttpTransportConfig`, the type of information for each `ChoreographyLocation` might diverge from the `(host_name, port)` format presented here. In some cases, the `target_information` could even have a different type than the following `other_location_information` types. But all the `other_location_information`s should have the same type.

```rust
{{#include ./header.txt}}
# use chorus_lib::transport::TransportConfig;
let config = TransportConfig::for_target(Alice, ())
.with(Bob, ("localhost".to_string(), 8081))
.with(Carol, ("localhost".to_string(), 8082));
```

See the API documentation for more details.


### Note on the location set of the Choreography
Expand Down
2 changes: 1 addition & 1 deletion chorus_book/src/header.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# struct Bob;
# #[derive(ChoreographyLocation)]
# struct Carol;
# let transport_channel = LocalTransportChannel::<LocationSet!(Alice, Bob, Carol)>::new();
# let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob).with(Carol);
# let alice_transport = LocalTransport::new(Alice, transport_channel.clone());
# let bob_transport = LocalTransport::new(Bob, transport_channel.clone());
# let carol_transport = LocalTransport::new(Carol, transport_channel.clone());
2 changes: 1 addition & 1 deletion chorus_lib/examples/bookseller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Choreography for BooksellerChoreography {
}

fn main() {
let transport_channel = LocalTransportChannel::<LocationSet!(Seller, Buyer)>::new();
let transport_channel = LocalTransportChannel::new().with(Seller).with(Buyer);
let transport_seller = LocalTransport::new(Seller, transport_channel.clone());
let transport_buyer = LocalTransport::new(Buyer, transport_channel.clone());

Expand Down
6 changes: 5 additions & 1 deletion chorus_lib/examples/bookseller2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ fn main() {
i
};

let transport_channel = LocalTransportChannel::<LocationSet!(Seller, Buyer1, Buyer2)>::new();
let transport_channel = LocalTransportChannel::new()
.with(Seller)
.with(Buyer1)
.with(Buyer2);

let seller_projector = Arc::new(Projector::new(
Seller,
LocalTransport::new(Seller, transport_channel.clone()),
Expand Down
11 changes: 6 additions & 5 deletions chorus_lib/examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,20 @@ impl Choreography for HelloWorldChoreography {
fn main() {
let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();
// Create a transport channel
let transport_channel = LocalTransportChannel::<LocationSet!(Bob, Alice)>::new();
// let transport_channel = LocalTransportChannel::<LocationSet!(Bob, Alice)>::new();
let transport_channel = LocalTransportChannel::new().with(Alice).with(Bob);
// Run the choreography in two threads
{
let transport_channel = transport_channel.clone();
let transport = LocalTransport::new(Alice, transport_channel);
// let transport_channel = transport_channel.clone();
let transport = LocalTransport::new(Alice, transport_channel.clone());
handles.push(thread::spawn(move || {
let p = Projector::new(Alice, transport);
p.epp_and_run(HelloWorldChoreography);
}));
}
{
let transport_channel = transport_channel.clone();
let transport = LocalTransport::new(Bob, transport_channel);
// let transport_channel = transport_channel.clone();
let transport = LocalTransport::new(Bob, transport_channel.clone());
handles.push(thread::spawn(move || {
let p = Projector::new(Bob, transport);
p.epp_and_run(HelloWorldChoreography);
Expand Down
6 changes: 5 additions & 1 deletion chorus_lib/examples/loc-poly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ impl Choreography<Located<i32, Alice>> for MainChoreography {
}

fn main() {
let transport_channel = LocalTransportChannel::<LocationSet!(Alice, Bob, Carol)>::new();
let transport_channel = LocalTransportChannel::new()
.with(Alice)
.with(Bob)
.with(Carol);

let mut handles = vec![];
{
let transport = LocalTransport::new(Alice, transport_channel.clone());
Expand Down
12 changes: 6 additions & 6 deletions chorus_lib/examples/tic-tac-toe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ fn main() {
args.opponent_hostname.as_str().to_string(),
args.opponent_port,
),
)
.build();
let transport = HttpTransport::new(&config);
);

let transport = HttpTransport::new(config);
let projector = Projector::new(PlayerX, transport);
projector.epp_and_run(TicTacToeChoreography {
brain_for_x: projector.local(brain),
Expand All @@ -325,9 +325,9 @@ fn main() {
args.opponent_hostname.as_str().to_string(),
args.opponent_port,
),
)
.build();
let transport = HttpTransport::new(&config);
);

let transport = HttpTransport::new(config);
let projector = Projector::new(PlayerO, transport);
projector.epp_and_run(TicTacToeChoreography {
brain_for_x: projector.remote(PlayerX),
Expand Down
27 changes: 7 additions & 20 deletions chorus_lib/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,11 @@ pub trait HList {
/// returns
fn to_string_list() -> Vec<&'static str>;
}

/// end of HList
#[derive(Clone)]
pub struct HNil;
/// An element of HList

#[derive(Clone)]
/// An element of HList
pub struct HCons<Head, Tail>(Head, Tail);

impl HList for HNil {
Expand Down Expand Up @@ -171,22 +170,6 @@ where
{
}

/// Equal trait
pub trait Equal<L: HList, Index> {}

// Base case: HNil is equal to HNil
impl Equal<HNil, Here> for HNil {}

// Recursive case: Head::Tail is equal to L if
// 1. Head is a member of L
// 2. Tail is equal to the remainder of L
impl<L: HList, Head, Tail, Index1, Index2> Equal<L, HCons<Index1, Index2>> for HCons<Head, Tail>
where
Head: Member<L, Index1>,
Tail: Equal<Head::Remainder, Index2>,
{
}

/// Provides a method to work with located values at the current location
pub struct Unwrapper<L1: ChoreographyLocation> {
phantom: PhantomData<L1>,
Expand Down Expand Up @@ -280,7 +263,11 @@ pub trait Choreography<R = ()> {
/// Provides methods to send and receive messages.
///
/// The trait provides methods to send and receive messages between locations. Implement this trait to define a custom transport.
pub trait Transport<L, TargetLocation> {
///
/// The type parameter `L` is the location set that the transport is operating on.
///
/// The type paramter `TargetLocation` is the target `ChoreographyLocation`.
pub trait Transport<L: HList, TargetLocation: ChoreographyLocation> {
/// Returns a list of locations.
fn locations(&self) -> Vec<String>;
/// Sends a message from `from` to `to`.
Expand Down
25 changes: 6 additions & 19 deletions chorus_lib/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use std::marker::PhantomData;
pub struct TransportConfig<L: HList, InfoType, TargetLocation: ChoreographyLocation, TargetInfoType>
{
/// The information about locations
pub info: HashMap<String, InfoType>,
info: HashMap<String, InfoType>,
/// The information about the target choreography
pub target_info: (TargetLocation, TargetInfoType),
target_info: (TargetLocation, TargetInfoType),
/// The struct is parametrized by the location set (`L`).
pub location_set: PhantomData<L>,
location_set: PhantomData<L>,
}

impl<InfoType, TargetLocation: ChoreographyLocation, TargetInfoType>
Expand All @@ -38,30 +38,17 @@ impl<L: HList, InfoType, TargetLocation: ChoreographyLocation, TargetInfoType>
{
/// Adds information about a new `ChoreographyLocation`.
pub fn with<NewLocation: ChoreographyLocation>(
self,
mut self,
_location: NewLocation,
info: InfoType,
) -> TransportConfig<HCons<NewLocation, L>, InfoType, TargetLocation, TargetInfoType>
where {
let mut new_info = HashMap::new();
for (k, v) in self.info.into_iter() {
new_info.insert(k, v);
}
new_info.insert(NewLocation::name().to_string(), info);
self.info.insert(NewLocation::name().to_string(), info);

TransportConfig {
info: new_info,
target_info: self.target_info,
location_set: PhantomData,
}
}

/// Finalize the `TransportConfig`.
pub fn build(self) -> TransportConfig<L, InfoType, TargetLocation, TargetInfoType> {
TransportConfig {
info: self.info,
location_set: PhantomData,
target_info: self.target_info,
location_set: PhantomData,
}
}
}
39 changes: 13 additions & 26 deletions chorus_lib/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ pub type HttpTransportConfig<L, Target> = TransportConfig<L, (String, u16), Targ
/// The header name for the source location.
const HEADER_SRC: &str = "X-CHORUS-SOURCE";

/// A wrapper for HashMap<String, (String, u16)>
#[derive(Clone)]
pub struct HttpConfig<L: HList> {
/// The information about locations
pub info: HashMap<String, (String, u16)>,
/// The struct is parametrized by the location set (`L`).
pub location_set: PhantomData<L>,
}

/// The HTTP transport.
pub struct HttpTransport<L: HList, TLocation> {
config: HashMap<String, (String, u16)>,
Expand All @@ -48,7 +39,7 @@ pub struct HttpTransport<L: HList, TLocation> {

impl<L: HList, TLocation: ChoreographyLocation> HttpTransport<L, TLocation> {
/// Creates a new `HttpTransport` instance from the configuration.
pub fn new<Index>(http_config: &HttpTransportConfig<L, TLocation>) -> Self
pub fn new<Index>(http_config: HttpTransportConfig<L, TLocation>) -> Self
where
TLocation: Member<L, Index>,
{
Expand All @@ -60,8 +51,6 @@ impl<L: HList, TLocation: ChoreographyLocation> HttpTransport<L, TLocation> {
Arc::new(m.into())
};

let info = &http_config.info;

let (_, (hostname, port)) = &http_config.target_info;
let server = Arc::new(Server::http(format!("{}:{}", hostname, port)).unwrap());
let join_handle = Some({
Expand Down Expand Up @@ -98,7 +87,7 @@ impl<L: HList, TLocation: ChoreographyLocation> HttpTransport<L, TLocation> {
let agent = AgentBuilder::new().build();

Self {
config: info.clone(),
config: http_config.info,
agent,
join_handle,
server,
Expand All @@ -116,7 +105,9 @@ impl<L: HList, TLocation> Drop for HttpTransport<L, TLocation> {
}
}

impl<L: HList, TLocation> Transport<L, TLocation> for HttpTransport<L, TLocation> {
impl<L: HList, TLocation: ChoreographyLocation> Transport<L, TLocation>
for HttpTransport<L, TLocation>
{
fn locations(&self) -> Vec<String> {
Vec::from_iter(self.config.keys().map(|s| s.clone()))
}
Expand Down Expand Up @@ -165,22 +156,20 @@ mod tests {
let mut handles = Vec::new();
{
let config = HttpTransportConfig::for_target(Alice, ("0.0.0.0".to_string(), 9010))
.with(Bob, ("localhost".to_string(), 9011))
.build();
.with(Bob, ("localhost".to_string(), 9011));

handles.push(thread::spawn(move || {
wait.recv().unwrap(); // wait for Bob to start
let transport = HttpTransport::new(&config);
let transport = HttpTransport::new(config);
transport.send::<i32>(Alice::name(), Bob::name(), &v);
}));
}
{
let config = HttpTransportConfig::for_target(Bob, ("0.0.0.0".to_string(), 9011))
.with(Alice, ("localhost".to_string(), 9010))
.build();
.with(Alice, ("localhost".to_string(), 9010));

handles.push(thread::spawn(move || {
let transport = HttpTransport::new(&config);
let transport = HttpTransport::new(config);
signal.send(()).unwrap();
let v2 = transport.receive::<i32>(Alice::name(), Bob::name());
assert_eq!(v, v2);
Expand All @@ -199,25 +188,23 @@ mod tests {
let mut handles = Vec::new();
{
let config = HttpTransportConfig::for_target(Alice, ("0.0.0.0".to_string(), 9020))
.with(Bob, ("localhost".to_string(), 9021))
.build();
.with(Bob, ("localhost".to_string(), 9021));

handles.push(thread::spawn(move || {
signal.send(()).unwrap();
let transport = HttpTransport::new(&config);
let transport = HttpTransport::new(config);
transport.send::<i32>(Alice::name(), Bob::name(), &v);
}));
}
{
let config = HttpTransportConfig::for_target(Bob, ("0.0.0.0".to_string(), 9021))
.with(Alice, ("localhost".to_string(), 9020))
.build();
.with(Alice, ("localhost".to_string(), 9020));

handles.push(thread::spawn(move || {
// wait for Alice to start, which forces Alice to retry
wait.recv().unwrap();
sleep(Duration::from_millis(100));
let transport = HttpTransport::new(&config);
let transport = HttpTransport::new(config);
let v2 = transport.receive::<i32>(Alice::name(), Bob::name());
assert_eq!(v, v2);
}));
Expand Down
Loading

0 comments on commit 83a5212

Please sign in to comment.