Skip to content

Commit

Permalink
Test and relax bounds
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 4, 2023
1 parent 06ae60f commit 63c378c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq
}
}

impl<T, D: Data, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
impl<T, D: 'static, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
// internal method for use by `Session`.
#[inline]
fn give(&mut self, data: D) {
Expand Down Expand Up @@ -123,7 +123,7 @@ impl<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> wh
}
}

impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
impl<'a, T, D: 'static, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
/// Provides one record at the time specified by the `Session`.
#[inline]
pub fn give(&mut self, data: D) {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::progress::Timestamp;
use crate::Data;

/// Converts to a timely `Stream`.
pub trait ToStream<T: Timestamp, D: Data> {
pub trait ToStream<T: Timestamp, D> {
/// Converts to a timely `Stream`.
///
/// # Examples
Expand All @@ -32,7 +32,7 @@ pub trait ToStream<T: Timestamp, D: Data> {
fn to_stream<S: Scope<Timestamp=T>>(self, scope: &mut S) -> OwnedStream<S, Vec<D>>;
}

impl<T: Timestamp, I: IntoIterator+'static> ToStream<T, I::Item> for I where I::Item: Data {
impl<T: Timestamp, I: IntoIterator+'static> ToStream<T, I::Item> for I {
fn to_stream<S: Scope<Timestamp=T>>(self, scope: &mut S) -> OwnedStream<S, Vec<I::Item>> {

source(scope, "ToStream", |capability, info| {
Expand Down
25 changes: 25 additions & 0 deletions timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,28 @@ where
.finish()
}
}

#[cfg(test)]
mod tests {
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::{Operator, ToStream};

#[derive(Debug, Eq, PartialEq)]
struct NotClone;

#[test]
fn test_non_clone_stream() {
crate::example(|scope| {
let _ = vec![NotClone]
.to_stream(scope)
.sink(Pipeline, "check non-clone", |input| {
while let Some((_time, data)) = input.next() {
for datum in data.iter() {
assert_eq!(datum, &NotClone);
}
}
});
});
}

}

0 comments on commit 63c378c

Please sign in to comment.