Skip to content

Commit

Permalink
Remove StreamLike trait
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Dec 11, 2024
1 parent 0fdc20c commit b229162
Show file tree
Hide file tree
Showing 38 changed files with 283 additions and 326 deletions.
8 changes: 4 additions & 4 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ fn main() {
}).unwrap(); // asserts error-free execution;
}

trait UnionFind<G: Scope> {
fn union_find(self) -> OwnedStream<G, Vec<(usize, usize)>>;
trait UnionFind {
fn union_find(self) -> Self;
}

impl<G: Scope, S: StreamLike<G, Vec<(usize, usize)>>> UnionFind<G> for S {
fn union_find(self) -> OwnedStream<G, Vec<(usize, usize)>> {
impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
fn union_find(self) -> Stream<G, (usize, usize)> {

self.unary(Pipeline, "UnionFind", |_,_| {

Expand Down
2 changes: 2 additions & 0 deletions timely/examples/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ fn main() {
// create a new input, exchange data, and inspect its output
worker.dataflow::<usize,_,_>(|scope| {
input.to_stream(scope)
.container::<Vec<_>>()
.flat_map(|(text, diff): (String, i64)|
text.split_whitespace()
.map(move |word| (word.to_owned(), diff))
.collect::<Vec<_>>()
)
.container::<Vec<_>>()
.unary_frontier(exchange, "WordCount", |_capability, _info| {

let mut queues = HashMap::new();
Expand Down
4 changes: 3 additions & 1 deletion timely/src/dataflow/channels/pushers/owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ impl<T, D> PushOwned<T, D> {
}

/// Set the downstream pusher.
pub fn set<P: Push<Message<T, D>> + 'static>(self, pusher: P) {
///
/// Consumes `Self` as only a single pusher can be set.
pub fn set_pusher<P: Push<Message<T, D>> + 'static>(self, pusher: P) {
*self.0.borrow_mut() = Some(Box::new(pusher));
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! });
//! ```
pub use self::stream::{StreamCore, Stream, StreamLike, OwnedStream};
pub use self::stream::{StreamTee, Stream, StreamCore};
pub use self::scopes::{Scope, ScopeParent};

pub use self::operators::core::input::Handle as InputHandleCore;
Expand Down
14 changes: 4 additions & 10 deletions timely/src/dataflow/operators/aggregation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::hash::Hash;
use std::collections::HashMap;

use crate::ExchangeData;
use crate::dataflow::{Scope, StreamLike, OwnedStream};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;

Expand Down Expand Up @@ -64,22 +64,16 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
self,
fold: F,
emit: E,
hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp: Eq;
hash: H) -> Stream<S, R> where S::Timestamp: Eq;
}

impl<G, K, V, S> Aggregate<G, K, V> for S
where
G: Scope,
K: ExchangeData + Hash + Eq + Clone,
V: ExchangeData,
S: StreamLike<G, Vec<(K, V)>>,
{
impl<S: Scope, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {

fn aggregate<R: 'static, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
self,
fold: F,
emit: E,
hash: H) -> OwnedStream<G, Vec<R>> where G::Timestamp: Eq {
hash: H) -> Stream<S, R> where S::Timestamp: Eq {

let mut aggregates = HashMap::new();
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
Expand Down
14 changes: 4 additions & 10 deletions timely/src/dataflow/operators/aggregation/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::hash::Hash;
use std::collections::HashMap;

use crate::ExchangeData;
use crate::dataflow::{OwnedStream, Scope, StreamLike};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;

Expand Down Expand Up @@ -51,23 +51,17 @@ pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
I: IntoIterator<Item=R>, // type of output iterator
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
H: Fn(&K)->u64+'static, // "hash" function for keys
>(self, fold: F, hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp : Hash+Eq ;
>(self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
}

impl<G, K, V, S> StateMachine<G, K, V> for S
where
G: Scope,
K: ExchangeData + Hash + Eq + Clone,
V: ExchangeData,
S: StreamLike<G, Vec<(K, V)>>,
{
impl<S: Scope, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
fn state_machine<
R: 'static, // output type
D: Default+'static, // per-key state (data)
I: IntoIterator<Item=R>, // type of output iterator
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
H: Fn(&K)->u64+'static, // "hash" function for keys
>(self, fold: F, hash: H) -> OwnedStream<G, Vec<R>> where G::Timestamp : Hash+Eq {
>(self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {

let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
let mut states = HashMap::new(); // keys -> state
Expand Down
18 changes: 9 additions & 9 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{Scope, OwnedStream, StreamLike};
use crate::dataflow::{Scope, Stream, StreamCore};
use crate::Container;

/// Extension trait for `Stream`.
Expand Down Expand Up @@ -31,14 +31,14 @@ pub trait Branch<S: Scope, D> {
fn branch(
self,
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (OwnedStream<S, Vec<D>>, OwnedStream<S, Vec<D>>);
) -> (Stream<S, D>, Stream<S, D>);
}

impl<G: Scope, D: 'static, S: StreamLike<G, Vec<D>>> Branch<G, D> for S {
impl<S: Scope, D: 'static> Branch<S, D> for Stream<S, D> {
fn branch(
self,
condition: impl Fn(&G::Timestamp, &D) -> bool + 'static,
) -> (OwnedStream<G, Vec<D>>, OwnedStream<G, Vec<D>>) {
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
Expand Down Expand Up @@ -69,7 +69,7 @@ impl<G: Scope, D: 'static, S: StreamLike<G, Vec<D>>> Branch<G, D> for S {
}

/// Extension trait for `Stream`.
pub trait BranchWhen<G: Scope, C: Container>: Sized {
pub trait BranchWhen<T>: Sized {
/// Takes one input stream and splits it into two output streams.
/// For each time, the supplied closure is called. If it returns `true`,
/// the records for that will be sent to the second returned stream, otherwise
Expand All @@ -89,11 +89,11 @@ pub trait BranchWhen<G: Scope, C: Container>: Sized {
/// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
/// });
/// ```
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>);
fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
}

impl<G: Scope, C: Container + 'static, S: StreamLike<G, C>> BranchWhen<G, C> for S {
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>) {
impl<S: Scope, C: Container + 'static> BranchWhen<S::Timestamp> for StreamCore<S, C> {
fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Broadcast records to all workers.
use crate::ExchangeData;
use crate::dataflow::{OwnedStream, StreamLike, Scope};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::{Map, Exchange};

/// Broadcast records to all workers.
pub trait Broadcast<G: Scope, D: ExchangeData> {
pub trait Broadcast<D: ExchangeData> {
/// Broadcast records to all workers.
///
/// # Examples
Expand All @@ -18,11 +18,11 @@ pub trait Broadcast<G: Scope, D: ExchangeData> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn broadcast(self) -> OwnedStream<G, Vec<D>>;
fn broadcast(self) -> Self;
}

impl<G: Scope, D: ExchangeData + Clone, S: StreamLike<G, Vec<D>>> Broadcast<G, D> for S {
fn broadcast(self) -> OwnedStream<G, Vec<D>> {
impl<G: Scope, D: ExchangeData + Clone> Broadcast<D> for Stream<G, D> {
fn broadcast(self) -> Stream<G, D> {

// NOTE: Simplified implementation due to underlying motion
// in timely dataflow internals. Optimize once they have
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/operators/core/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! and there are several default implementations, including a linked-list, Rust's MPSC
//! queue, and a binary serializer wrapping any `W: Write`.
use crate::dataflow::{Scope, StreamLike};
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
Expand All @@ -17,7 +17,7 @@ use crate::progress::Timestamp;
use super::{Event, EventPusher};

/// Capture a stream of timestamped data for later replay.
pub trait Capture<G: Scope, C: Container + 'static> : Sized {
pub trait Capture<T: Timestamp, C: Container + 'static> : Sized {
/// Captures a stream of timestamped data for later replay.
///
/// # Examples
Expand Down Expand Up @@ -103,18 +103,18 @@ pub trait Capture<G: Scope, C: Container + 'static> : Sized {
///
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
/// ```
fn capture_into<P: EventPusher<G::Timestamp, C>+'static>(self, pusher: P);
fn capture_into<P: EventPusher<T, C>+'static>(self, pusher: P);

/// Captures a stream using Rust's MPSC channels.
fn capture(self) -> ::std::sync::mpsc::Receiver<Event<G::Timestamp, C>> {
fn capture(self) -> ::std::sync::mpsc::Receiver<Event<T, C>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}

impl<G: Scope, C: Container + 'static, S: StreamLike<G, C>> Capture<G, C> for S {
fn capture_into<P: EventPusher<G::Timestamp, C>+'static>(self, mut event_pusher: P) {
impl<S: Scope, C: Container + 'static> Capture<S::Timestamp, C> for StreamCore<S, C> {
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(self, mut event_pusher: P) {

let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! allowing the replay to occur in a timely dataflow computation with more or fewer workers
//! than that in which the stream was captured.
use crate::dataflow::{Scope, OwnedStream};
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
Expand All @@ -50,24 +50,24 @@ use crate::Container;

/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
/// Replays `self` into the provided scope, as a `OwnedStream<S, C>`.
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> OwnedStream<S, C> {
/// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
/// Replays `self` into the provided scope, as a `OwnedStream<S, C>`.
/// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
///
/// The `period` argument allows the specification of a re-activation period, where the operator
/// will re-activate itself every so often. The `None` argument instructs the operator not to
/// re-activate itself.
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> OwnedStream<S, C>;
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
}

impl<T: Timestamp, C: Container + Clone + 'static, I> Replay<T, C> for I
where
I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, C>+'static,
{
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> OwnedStream<S, C>{
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>{

let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());

Expand Down
33 changes: 17 additions & 16 deletions timely/src/dataflow/operators/core/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

use crate::Container;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{OwnedStream, StreamLike, Scope};
use crate::dataflow::{StreamCore, Scope};

/// Merge the contents of two streams.
pub trait Concat<G: Scope, C: Container, S: StreamLike<G, C>> {
pub trait Concat<G: Scope, C: Container> {
/// Merge the contents of two streams.
///
/// # Examples
Expand All @@ -16,21 +16,21 @@ pub trait Concat<G: Scope, C: Container, S: StreamLike<G, C>> {
/// timely::example(|scope| {
///
/// let stream = (0..10).to_stream(scope).tee();
/// stream.concat(&stream)
/// stream.owned().concat(stream.owned())
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concat(self, other: S) -> OwnedStream<G, C>;
fn concat(self, other: StreamCore<G, C>) -> StreamCore<G, C>;
}

impl<G: Scope, C: Container + 'static, S: StreamLike<G, C>> Concat<G, C, S> for S {
fn concat(self, other: S) -> OwnedStream<G, C> {
impl<G: Scope, C: Container + 'static> Concat<G, C> for StreamCore<G, C> {
fn concat(self, other: StreamCore<G, C>) -> StreamCore<G, C> {
self.scope().concatenate([self, other])
}
}

/// Merge the contents of multiple streams.
pub trait Concatenate<G: Scope, C: Container, S: StreamLike<G, C>> {
pub trait Concatenate<G: Scope, C: Container> {
/// Merge the contents of multiple streams.
///
/// # Examples
Expand All @@ -43,28 +43,29 @@ pub trait Concatenate<G: Scope, C: Container, S: StreamLike<G, C>> {
/// (0..10).to_stream(scope),
/// (0..10).to_stream(scope)];
///
/// scope.concatenate(streams)
/// scope.clone()
/// .concatenate(streams)
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concatenate<I>(self, sources: I) -> OwnedStream<G, C>
fn concatenate<I>(self, sources: I) -> StreamCore<G, C>
where
I: IntoIterator<Item=S>;
I: IntoIterator<Item=StreamCore<G, C>>;
}

impl<G: Scope, C: Container + 'static> Concatenate<G, C, OwnedStream<G, C>> for OwnedStream<G, C> {
fn concatenate<I>(self, sources: I) -> OwnedStream<G, C>
impl<G: Scope, C: Container + 'static> Concatenate<G, C> for StreamCore<G, C> {
fn concatenate<I>(self, sources: I) -> StreamCore<G, C>
where
I: IntoIterator<Item=OwnedStream<G, C>>
I: IntoIterator<Item=StreamCore<G, C>>
{
self.scope().concatenate(Some(self).into_iter().chain(sources))
}
}

impl<G: Scope, C: Container + 'static, S: StreamLike<G, C>> Concatenate<G, C, S> for &G {
fn concatenate<I>(self, sources: I) -> OwnedStream<G, C>
impl<G: Scope, C: Container + 'static> Concatenate<G, C> for G {
fn concatenate<I>(self, sources: I) -> StreamCore<G, C>
where
I: IntoIterator<Item=S>
I: IntoIterator<Item=StreamCore<G, C>>
{

// create an operator builder.
Expand Down
Loading

0 comments on commit b229162

Please sign in to comment.