Skip to content

Commit

Permalink
reduce diff to master
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 4, 2023
1 parent 5734380 commit 06ae60f
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 22 deletions.
5 changes: 2 additions & 3 deletions kafkaesque/src/kafka_source.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use timely::Data;
use timely::dataflow::Scope;
use timely::dataflow::{OwnedStream, Scope};
use timely::dataflow::operators::Capability;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::channels::pushers::PushOwned;

use rdkafka::Message;
use rdkafka::consumer::{ConsumerContext, BaseConsumer};
use timely::dataflow::channels::pushers::PushOwned;
use timely::dataflow::stream::OwnedStream;

/// Constructs a stream of data from a Kafka consumer.
///
Expand Down
1 change: 0 additions & 1 deletion timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use timely::dataflow::*;
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::stream::{OwnedStream, StreamLike};

fn main() {

Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
use std::{fmt::{self, Debug}, marker::PhantomData};
use timely_container::PushPartitioned;

use crate::communication::{Push, Pull, Data};
use crate::communication::{Push, Pull};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::Container;
use crate::{ExchangeData, Container};

use crate::worker::AsWorker;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
Expand Down Expand Up @@ -69,9 +69,9 @@ impl<C, D, F: FnMut(&D)->u64+'static> ExchangeCore<C, D, F> {
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, C, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
impl<T: Timestamp, C, D: ExchangeData, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
where
C: Data + Clone + PushPartitioned<Item=D>,
C: ExchangeData + PushPartitioned<Item=D>,
{
type Pusher = ExchangePusher<T, C, D, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, F>;
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct TeeCore<T, D> {
/// [TeeCore] specialized to `Vec`-based container.
pub type Tee<T, D> = TeeCore<T, Vec<D>>;

impl<T: Data, D: Container+Clone> Push<BundleCore<T, D>> for TeeCore<T, D> {
impl<T: Data, D: Container+Data> Push<BundleCore<T, D>> for TeeCore<T, D> {
#[inline]
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
let mut pushers = self.shared.borrow_mut();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait BranchWhen<G: Scope, C: Container>: Sized {
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>);
}

impl<G: Scope, C: Container + Clone, S: StreamLike<G, C>> BranchWhen<G, C> for S {
impl<G: Scope, C: Container + Data, S: StreamLike<G, C>> BranchWhen<G, C> for S {
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/operators/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
//! and there are several default implementations, including a linked-list, Rust's MPSC
//! queue, and a binary serializer wrapping any `W: Write`.
use crate::dataflow::Scope;
use crate::dataflow::{Scope, StreamLike};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;

use crate::Container;
use crate::dataflow::stream::StreamLike;
use crate::progress::ChangeBatch;
use crate::progress::Timestamp;

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::progress::Timestamp;

use super::EventCore;
use super::event::EventIteratorCore;
use crate::Container;
use crate::{Container, Data};

/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
Expand All @@ -62,7 +62,7 @@ pub trait Replay<T: Timestamp, C> : Sized {
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> OwnedStream<S, C>;
}

impl<T: Timestamp, C: Container+Clone, I> Replay<T, C> for I
impl<T: Timestamp, C: Container+Data, I> Replay<T, C> for I
where I : IntoIterator,
<I as IntoIterator>::Item: EventIteratorCore<T, C>+'static {
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> OwnedStream<S, C>{
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/concat.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Merges the contents of multiple streams.

use crate::Container;
use crate::{Container, Data};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{OwnedStream, StreamLike, Scope};

Expand All @@ -23,7 +23,7 @@ pub trait Concat<G: Scope, D: Container, S: StreamLike<G, D>> {
fn concat(self, other: S) -> OwnedStream<G, D>;
}

impl<G: Scope, D: Container+Clone, S: StreamLike<G, D>> Concat<G, D, S> for S {
impl<G: Scope, D: Container+Data, S: StreamLike<G, D>> Concat<G, D, S> for S {
fn concat(self, other: S) -> OwnedStream<G, D> {
self.scope().concatenate([self, other])
}
Expand Down Expand Up @@ -52,7 +52,7 @@ pub trait Concatenate<G: Scope, D: Container, S: StreamLike<G, D>> {
I: IntoIterator<Item=S>;
}

impl<G: Scope, D: Container+Clone> Concatenate<G, D, OwnedStream<G, D>> for OwnedStream<G, D> {
impl<G: Scope, D: Container+Data> Concatenate<G, D, OwnedStream<G, D>> for OwnedStream<G, D> {
fn concatenate<I>(self, sources: I) -> OwnedStream<G, D>
where
I: IntoIterator<Item=OwnedStream<G, D>>,
Expand All @@ -61,7 +61,7 @@ impl<G: Scope, D: Container+Clone> Concatenate<G, D, OwnedStream<G, D>> for Owne
}
}

impl<G: Scope, D: Container+Clone, S: StreamLike<G, D>> Concatenate<G, D, S> for &G {
impl<G: Scope, D: Container+Data, S: StreamLike<G, D>> Concatenate<G, D, S> for &G {
fn concatenate<I>(self, sources: I) -> OwnedStream<G, D>
where
I: IntoIterator<Item=S>
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

//! Methods to construct generic streaming and blocking unary operators.
use crate::dataflow::channels::pushers::PushOwned;
use crate::dataflow::channels::pact::ParallelizationContractCore;

use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
Expand All @@ -12,7 +13,6 @@ use super::builder_rc::OperatorBuilder;
use crate::dataflow::operators::generic::OperatorInfo;
use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
use crate::Container;
use crate::dataflow::channels::pushers::PushOwned;

/// Methods to construct generic streaming and blocking operators.
pub trait Operator<G: Scope, D1: Container> {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
impl<G, D, D2, F, S> Partition<G, D, D2, F> for S
where
G: Scope,
D: Data + Clone,
D: Data,
D2: Data,
F: Fn(D) -> (u64, D2) + 'static,
S: StreamLike<G, Vec<D>>,
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use crate::progress::{Source, Target};

use crate::communication::Push;
use crate::dataflow::Scope;
use crate::dataflow::channels::pushers::{TeeHelper, PushOwned};
use crate::dataflow::channels::pushers::{TeeCore, TeeHelper, PushOwned};
use crate::dataflow::channels::BundleCore;
use std::fmt::{self, Debug};
use crate::Container;
use crate::dataflow::channels::pushers::TeeCore;

/// Common behavior for all streams. Streams belong to a scope and carry data.
///
Expand Down

0 comments on commit 06ae60f

Please sign in to comment.