-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
timely-util: async output handles #17998
Conversation
9181971
to
628c452
Compare
pub async fn give<C: CapabilityTrait<T>>(&mut self, cap: &C, data: D) { | ||
let mut handle = self.handle.borrow_mut(); | ||
cap.session(&mut handle).give(data); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principal this seems fine, but I'm wondering about performance implications. give
is supposed to be very low cost, and including the session
call plus async introduces a significant overhead. It might be that we can't measure it at the moment, but I hope at some point we will. Because of this, I think we should build abstractions in terms of "batch of data" instead of "single item".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling session
only adds an additional time comparison per call. If the time of the capability is the same then multiple consecutive give
calls will indeed create a batch of the data instead of emitting them in individual degenerate containers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the async
call has the same overhead as a normal function call. In other words give{_container}(..).await
does not need to yield to timely to produce the message. In fact there is no await point in the implementation in this PR. But the plan is to associate output handles with fuel so that when fuel == 0
then the handle will yield for you and ensure that data isn't being buffered up in the operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should probably give a bit more context on why the sessions can't be exposed to users. A timely session borrows the buffer that is associated with an OutputHandle
which means that while a session is active nothing can touch the output handle.
However this is exactly what I want to do in this case. The AsyncOutputHandle
is shared between the user code (the async logic) and the operator infrastructure. And what happens is that whenever the user code yields for any reason the operator infrastructure needs to be able to call output.cease()
on all the output handles so that data can be passed downstream.
This means that we can never give an actual timely Session
type to user code which would potentially keep it alive across an await point, because then we'll be unable to flush the output handle. So this is the compromise made by this implementation by starting a session for every give
/give_container
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And a final point (sorry for the multiple comments) is that if you look at the diff you'll find that all the async operators were already defensively creating temporary sessions, often in separate blocks, to ensure they are never held across await points[1][2]. So this implementation just codifies a pattern that we already had to ensure manually.
[1] https://github.com/MaterializeInc/materialize/pull/17998/files#diff-4d9165ee597139d5a552a908445a834a77fead765eb10b0b6a85f2a5b9e3ca56L479
[2] https://github.com/MaterializeInc/materialize/pull/17998/files#diff-4d9165ee597139d5a552a908445a834a77fead765eb10b0b6a85f2a5b9e3ca56L754
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the patch I sent above I see 4.9 seconds when creating the session for every give and 3.6 seconds for creating the session once, so much smaller 1.36x slowdown. I'll send a timely PR with that patch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posted here TimelyDataflow/timely-dataflow#513
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check if the capability is valid for the output seems to be quite expensive. When removing it, the time drops from 5.1s to 3.3s. But it's there for a reason...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your patch doesn't change the result. I guess cloning a Copy
timestamp is basically free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I might have messed up benchmarks. I agree that this microbenchmark shouldn't affect this PR. The efficiency can be brought back with enough elbow grease but it's pretty tricky and would require significantly more unsafe code I think.
In microbenchmarks (see MaterializeInc/materialize#17998 (comment)) the code in the PR performed better than the current version. Signed-off-by: Petros Angelatos <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine! Thank you @petrosagg for the discussion about performance. I left some minor comments.
src/storage/src/decode/mod.rs
Outdated
DecodeResult { | ||
key: None, | ||
value: Some(value.map(|r| (r, 1)).map_err(|inner| { | ||
DecodeError { | ||
kind: inner, | ||
raw: None, | ||
} | ||
})), | ||
position: position.into(), | ||
upstream_time_millis, | ||
partition: partition.clone(), | ||
metadata, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extract the DecodeResult
into a variable so the code stays readable. Here and below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, fixed
@@ -212,6 +219,110 @@ impl<'handle, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> Future | |||
} | |||
} | |||
|
|||
// TODO: delete and use CapabilityTrait instead once TimelyDataflow/timely-dataflow#512 gets merged | |||
pub trait CapabilityTrait<T: Timestamp> { | |||
fn session<'a, D, P>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to add #[inline]
to all implementations of session
. We're not doing this much in Mz, but it seems we might want to start, especially when functions are called across crate boundaries.
// * We're erasing the lifetime but we guarantee through field order that the handle will | ||
// be dropped before the wrapper, thus manually enforcing the lifetime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment on AsyncOutputHandle
that the field order must not be changed, or add a Drop
impl?
This PR replaces the synchronous timely handles with async ones. Their main benefit is that it is totally fine to hold handles active across await points which produces both better ergonomics and more efficient batch creation for the cases where we produce messages one by one. All the methods on the handle that send data are marked `async` so that the handle can automatically trigger a yield. The fuel system is not implemented in this PR but it will follow soon after this merges. Signed-off-by: Petros Angelatos <[email protected]>
628c452
to
98b3c85
Compare
Motivation
This PR replaces the synchronous timely handles with async ones. Their main benefit is that it is totally fine to hold handles active across await points which produces both better ergonomics and more efficient batch creation for the cases where we produce messages one by one.
All the methods on the handle that send data are marked
async
so that the handle can automatically trigger a yield. The fuel system is not implemented in this PR but it will follow soon after this merges.Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-proto
label.