Skip to content

Commit

Permalink
Fix flatcontainer example (#573)
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jul 2, 2024
1 parent 121a9a8 commit 59a9aea
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ jobs:
# clutter target/debug/deps with multiple copies of things.
run: for file in $(find mdbook -name '*.md'); do rustdoc --test $file -L ./target/debug/deps; done
- run: cargo test
- run: cargo test --features bincode
15 changes: 9 additions & 6 deletions timely/examples/flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
#[cfg(feature = "bincode")]
use {
std::collections::HashMap,
timely::container::flatcontainer::{Containerized, FlatStack},
timely::container::CapacityContainerBuilder,
timely::container::flatcontainer::{RegionPreference, FlatStack},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
Expand All @@ -13,18 +14,18 @@ use {
#[cfg(feature = "bincode")]
fn main() {

type Container = FlatStack<<(String, i64) as Containerized>::Region>;
type Container = FlatStack<<(String, i64) as RegionPreference>::Region>;

// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input = <InputHandleCore<_, Container>>::new();
let mut input = <InputHandleCore<_, CapacityContainerBuilder<Container>>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary::<Container, _, _, _>(
.unary(
Pipeline,
"Split",
|_cap, _info| {
Expand All @@ -40,7 +41,8 @@ fn main() {
}
},
)
.unary_frontier::<Container, _, _, _>(
.container::<Container>()
.unary_frontier(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
Expand Down Expand Up @@ -79,6 +81,7 @@ fn main() {
}
},
)
.container::<Container>()
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});
Expand All @@ -98,4 +101,4 @@ fn main() {
#[cfg(not(feature = "bincode"))]
fn main() {
eprintln!("Example requires feature bincode.");
}
}

0 comments on commit 59a9aea

Please sign in to comment.