Skip to content

Commit

Permalink
timely-util: async output handles
Browse files Browse the repository at this point in the history
This PR replaces the synchronous timely handles with async ones. Their
main benefit is that it is totally fine to hold handles active across
await points which produces both better ergonomics and more efficient
batch creation for the cases where we produce messages one by one.

All the methods on the handle that send data are marked `async` so that
the handle can automatically trigger a yield. The fuel system is not
implemented in this PR but it will follow soon after this merges.

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Mar 10, 2023
1 parent d51e13f commit 98b3c85
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 87 deletions.
8 changes: 2 additions & 6 deletions src/compute/src/sink/persist_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,7 @@ where
batch_description
);

let mut output = output.activate();
let mut session = output.session(&cap);
session.give(batch_description);
output.give(&cap, batch_description).await;

// WIP: We downgrade our capability so that downstream
// operators (writer and appender) can know when all the
Expand Down Expand Up @@ -823,9 +821,7 @@ where
}
};

let mut output = output.activate();
let mut session = output.session(&cap);
session.give(batch_or_data);
output.give(&cap, batch_or_data).await;
}
}
} else {
Expand Down
12 changes: 4 additions & 8 deletions src/persist-client/src/operators/shard_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,6 @@ where
}
Some(Ok(ListenEvent::Progress(progress))) => {
let session_cap = cap_set.delayed(&current_ts);
let mut descs_output = descs_output.activate();
let mut descs_session = descs_output.session(&session_cap);

// NB: in order to play nice with downstream operators whose invariants
// depend on seeing the full contents of an individual batch, we must
Expand All @@ -328,7 +326,7 @@ where
// doing instead here, but this has seemed to work
// okay so far. Continue to revisit as necessary.
let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers;
descs_session.give((worker_idx, part_desc.into_exchangeable_part()));
descs_output.give(&session_cap, (worker_idx, part_desc.into_exchangeable_part())).await;
}
bytes_emitted
};
Expand Down Expand Up @@ -480,12 +478,10 @@ where
// outputs or sessions across await points, which
// would prevent messages from being flushed from
// the shared timely output buffer.
let mut fetched_output = fetched_output.activate();
let mut tokens_output = tokens_output.activate();
fetched_output.session(&cap).give(fetched);
fetched_output.give(&cap, fetched).await;
tokens_output
.session(&cap)
.give(token.into_exchangeable_part());
.give(&cap, token.into_exchangeable_part())
.await;
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/storage/src/decode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,8 @@ where
headers.as_deref(),
);

let mut output = output.activate();
if value_bytes_remaining.is_empty() {
output.session(&cap).give(DecodeResult {
let result = DecodeResult {
key: None,
value: Some(value.map(|r| (r, 1)).map_err(|inner| {
DecodeError {
Expand All @@ -596,11 +595,12 @@ where
upstream_time_millis,
partition: partition.clone(),
metadata,
});
};
output.give(&cap, result).await;
value_buf = vec![];
break;
} else {
output.session(&cap).give(DecodeResult {
let result = DecodeResult {
key: None,
value: Some(value.map(|r| (r, 1)).map_err(|inner| {
DecodeError {
Expand All @@ -612,7 +612,8 @@ where
upstream_time_millis,
partition: partition.clone(),
metadata,
});
};
output.give(&cap, result).await;
}
if is_err {
// If decoding has gone off the rails, we can no longer be sure that the delimiters are correct, so it
Expand Down Expand Up @@ -650,8 +651,7 @@ where
headers.as_deref(),
);

let mut output = output.activate();
output.session(&cap).give(DecodeResult {
let result = DecodeResult {
key: None,
value: Some(value.map(|r| (r, 1)).map_err(|inner| DecodeError {
kind: inner,
Expand All @@ -661,7 +661,8 @@ where
upstream_time_millis,
partition: partition.clone(),
metadata,
});
};
output.give(&cap, result).await;
}
}
}
Expand Down
73 changes: 24 additions & 49 deletions src/storage/src/source/source_reader_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::capture::capture::Capture;
use timely::dataflow::operators::capture::Event;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::{Broadcast, CapabilitySet, Partition};
use timely::dataflow::{Scope, Stream};
use timely::progress::frontier::MutableAntichain;
Expand All @@ -68,7 +67,9 @@ use mz_storage_client::types::errors::SourceError;
use mz_storage_client::types::sources::encoding::SourceDataEncoding;
use mz_storage_client::types::sources::{MzOffset, SourceConnection, SourceTimestamp, SourceToken};
use mz_timely_util::antichain::AntichainExt;
use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder};
use mz_timely_util::builder_async::{
AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
};
use mz_timely_util::capture::UnboundedTokioCapture;
use mz_timely_util::operator::StreamExt as _;

Expand Down Expand Up @@ -366,10 +367,10 @@ where
tokio::pin!(source_stream);
tokio::pin!(resume_uppers);

health_output.activate().session(&health_cap).give((worker_id, HealthStatusUpdate {
health_output.give(&health_cap, (worker_id, HealthStatusUpdate {
update: HealthStatus::Starting,
should_halt: false,
}));
})).await;
let mut prev_status = HealthStatusUpdate {
update: HealthStatus::Starting,
should_halt: false,
Expand Down Expand Up @@ -412,7 +413,6 @@ where
// We want to efficiently batch up messages that are ready. To do that we will
// activate the output handle here and then drain the currently available
// messages until we either run out of messages or run out of time.
let mut output = output.activate();
while timer.elapsed() < YIELD_INTERVAL {
match source_stream.next().now_or_never() {
Some(Some(SourceMessageType::Message(message, cap, diff))) => {
Expand All @@ -430,10 +430,7 @@ where

if prev_status != new_status_update {
prev_status = new_status_update.clone();
health_output
.activate()
.session(&health_cap)
.give((worker_id, new_status_update));
health_output.give(&health_cap, (worker_id, new_status_update)).await;
}

if let Ok(message) = &message {
Expand All @@ -448,7 +445,7 @@ where
// If cap is not beyond emit_cap we can't re-use emit_cap so
// flush the current batch
Some(emit_cap) => if !PartialOrder::less_equal(emit_cap, &cap) {
output.session(&emit_cap).give_container(&mut batch);
output.give_container(&*emit_cap, &mut batch).await;
batch.clear();
*emit_cap = cap;
},
Expand All @@ -458,12 +455,12 @@ where
}
Some(Some(SourceMessageType::SourceStatus(new_status))) => {
prev_status = new_status.clone();
health_output.activate().session(&health_cap).give((worker_id, new_status));
health_output.give(&health_cap, (worker_id, new_status)).await;
}
Some(None) => {
trace!("timely-{worker_id} source({id}): source ended, dropping capabilities");
if let Some(emit_cap) = emit_cap.take() {
output.session(&emit_cap).give_container(&mut batch);
output.give_container(&emit_cap, &mut batch).await;
batch.clear();
}
return;
Expand All @@ -472,14 +469,10 @@ where
}
}
if let Some(emit_cap) = emit_cap.take() {
output.session(&emit_cap).give_container(&mut batch);
output.give_container(&emit_cap, &mut batch).await;
batch.clear();
}
assert!(batch.is_empty());
// Now we drop the activated output handle to force timely to emit any pending
// batch. It's crucial that this happens before our attempt to yield otherwise
// the buffer would get stuck in this operator.
drop(output);
if timer.elapsed() > YIELD_INTERVAL {
tokio::task::yield_now().await;
}
Expand Down Expand Up @@ -751,15 +744,10 @@ where
&initial_batch.updates
);

// Out of an abundance of caution, do not hold the output handle
// across an await, and drop it before we downgrade the capability.
{
let mut remap_output = remap_output.activate();
let cap = cap_set.delayed(cap_set.first().unwrap());
let mut session = remap_output.session(&cap);
session.give_vec(&mut initial_batch.updates);
cap_set.downgrade(initial_batch.upper);
}
let cap = cap_set.delayed(cap_set.first().unwrap());
remap_output.give_container(&cap, &mut initial_batch.updates).await;
drop(cap);
cap_set.downgrade(initial_batch.upper);

let mut ticker = tokio::time::interval(timestamp_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -780,14 +768,8 @@ where
remap_trace_batch.upper.pretty()
);

// Out of an abundance of caution, do not hold the output handle
// across an await, and drop it before we downgrade the capability.
{
let mut remap_output = remap_output.activate();
let cap = cap_set.delayed(cap_set.first().unwrap());
let mut session = remap_output.session(&cap);
session.give_vec(&mut remap_trace_batch.updates);
}
let cap = cap_set.delayed(cap_set.first().unwrap());
remap_output.give_container(&cap, &mut remap_trace_batch.updates).await;

// If the last remap trace closed the input, we no longer
// need to (or can) advance the timestamper.
Expand All @@ -799,14 +781,8 @@ where

let mut remap_trace_batch = timestamper.advance().await;

// Out of an abundance of caution, do not hold the output handle
// across an await, and drop it before we downgrade the capability.
{
let mut remap_output = remap_output.activate();
let cap = cap_set.delayed(cap_set.first().unwrap());
let mut session = remap_output.session(&cap);
session.give_vec(&mut remap_trace_batch.updates);
}
let cap = cap_set.delayed(cap_set.first().unwrap());
remap_output.give_container(&cap, &mut remap_trace_batch.updates).await;

cap_set.downgrade(remap_trace_batch.upper);
}
Expand Down Expand Up @@ -981,7 +957,6 @@ where
// Accumulate updates to offsets for Prometheus and system table metrics collection
let mut metric_updates = BTreeMap::new();

let mut output = reclocked_output.activate();
let mut total_processed = 0;
for ((message, from_ts, diff), into_ts) in timestamper.reclock(msgs) {
let into_ts = into_ts.expect("reclock for update not beyond upper failed");
Expand All @@ -991,11 +966,11 @@ where
diff,
&mut bytes_read,
&cap_set,
&mut output,
&mut reclocked_output,
&mut metric_updates,
into_ts,
id,
);
).await;
total_processed += 1;
}
// The loop above might have completely emptied batches. We can now remove them
Expand Down Expand Up @@ -1140,15 +1115,15 @@ where
///
/// TODO: This function is a bit of a mess rn but hopefully this function makes
/// the existing mess more obvious and points towards ways to improve it.
fn handle_message<K, V, T, D>(
async fn handle_message<K, V, T, D>(
message: Result<SourceMessage<K, V>, SourceReaderError>,
time: T,
diff: D,
bytes_read: &mut usize,
cap_set: &CapabilitySet<mz_repr::Timestamp>,
output_handle: &mut OutputHandle<
output_handle: &mut AsyncOutputHandle<
mz_repr::Timestamp,
(usize, Result<SourceOutput<K, V, D>, SourceError>),
Vec<(usize, Result<SourceOutput<K, V, D>, SourceError>)>,
Tee<mz_repr::Timestamp, (usize, Result<SourceOutput<K, V, D>, SourceError>)>,
>,
metric_updates: &mut BTreeMap<PartitionId, (MzOffset, mz_repr::Timestamp, Diff)>,
Expand Down Expand Up @@ -1199,7 +1174,7 @@ fn handle_message<K, V, T, D>(
};

let ts_cap = cap_set.delayed(&ts);
output_handle.session(&ts_cap).give(output);
output_handle.give(&ts_cap, output).await;
match metric_updates.entry(partition) {
Entry::Occupied(mut entry) => {
entry.insert((offset, ts, entry.get().2 + 1));
Expand Down
Loading

0 comments on commit 98b3c85

Please sign in to comment.