Skip to content

Commit

Permalink
Merge pull request #17998 from petrosagg/async-output-handles
Browse files Browse the repository at this point in the history
timely-util: async output handles
  • Loading branch information
petrosagg authored Mar 10, 2023
2 parents 5450489 + 98b3c85 commit a5849e8
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 87 deletions.
8 changes: 2 additions & 6 deletions src/compute/src/sink/persist_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,7 @@ where
batch_description
);

let mut output = output.activate();
let mut session = output.session(&cap);
session.give(batch_description);
output.give(&cap, batch_description).await;

// WIP: We downgrade our capability so that downstream
// operators (writer and appender) can know when all the
Expand Down Expand Up @@ -823,9 +821,7 @@ where
}
};

let mut output = output.activate();
let mut session = output.session(&cap);
session.give(batch_or_data);
output.give(&cap, batch_or_data).await;
}
}
} else {
Expand Down
12 changes: 4 additions & 8 deletions src/persist-client/src/operators/shard_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,6 @@ where
}
Some(Ok(ListenEvent::Progress(progress))) => {
let session_cap = cap_set.delayed(&current_ts);
let mut descs_output = descs_output.activate();
let mut descs_session = descs_output.session(&session_cap);

// NB: in order to play nice with downstream operators whose invariants
// depend on seeing the full contents of an individual batch, we must
Expand All @@ -328,7 +326,7 @@ where
// doing instead here, but this has seemed to work
// okay so far. Continue to revisit as necessary.
let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers;
descs_session.give((worker_idx, part_desc.into_exchangeable_part()));
descs_output.give(&session_cap, (worker_idx, part_desc.into_exchangeable_part())).await;
}
bytes_emitted
};
Expand Down Expand Up @@ -480,12 +478,10 @@ where
// outputs or sessions across await points, which
// would prevent messages from being flushed from
// the shared timely output buffer.
let mut fetched_output = fetched_output.activate();
let mut tokens_output = tokens_output.activate();
fetched_output.session(&cap).give(fetched);
fetched_output.give(&cap, fetched).await;
tokens_output
.session(&cap)
.give(token.into_exchangeable_part());
.give(&cap, token.into_exchangeable_part())
.await;
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/storage/src/decode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,8 @@ where
headers.as_deref(),
);

let mut output = output.activate();
if value_bytes_remaining.is_empty() {
output.session(&cap).give(DecodeResult {
let result = DecodeResult {
key: None,
value: Some(value.map(|r| (r, 1)).map_err(|inner| {
DecodeError {
Expand All @@ -596,11 +595,12 @@ where
upstream_time_millis,
partition: partition.clone(),
metadata,
});
};
output.give(&cap, result).await;
value_buf = vec![];
break;
} else {
output.session(&cap).give(DecodeResult {
let result = DecodeResult {
key: None,
value: Some(value.map(|r| (r, 1)).map_err(|inner| {
DecodeError {
Expand All @@ -612,7 +612,8 @@ where
upstream_time_millis,
partition: partition.clone(),
metadata,
});
};
output.give(&cap, result).await;
}
if is_err {
// If decoding has gone off the rails, we can no longer be sure that the delimiters are correct, so it
Expand Down Expand Up @@ -650,8 +651,7 @@ where
headers.as_deref(),
);

let mut output = output.activate();
output.session(&cap).give(DecodeResult {
let result = DecodeResult {
key: None,
value: Some(value.map(|r| (r, 1)).map_err(|inner| DecodeError {
kind: inner,
Expand All @@ -661,7 +661,8 @@ where
upstream_time_millis,
partition: partition.clone(),
metadata,
});
};
output.give(&cap, result).await;
}
}
}
Expand Down
73 changes: 24 additions & 49 deletions src/storage/src/source/source_reader_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::capture::capture::Capture;
use timely::dataflow::operators::capture::Event;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::{Broadcast, CapabilitySet, Partition};
use timely::dataflow::{Scope, Stream};
use timely::progress::frontier::MutableAntichain;
Expand All @@ -68,7 +67,9 @@ use mz_storage_client::types::errors::SourceError;
use mz_storage_client::types::sources::encoding::SourceDataEncoding;
use mz_storage_client::types::sources::{MzOffset, SourceConnection, SourceTimestamp, SourceToken};
use mz_timely_util::antichain::AntichainExt;
use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder};
use mz_timely_util::builder_async::{
AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
};
use mz_timely_util::capture::UnboundedTokioCapture;
use mz_timely_util::operator::StreamExt as _;

Expand Down Expand Up @@ -356,10 +357,10 @@ where
tokio::pin!(source_stream);
tokio::pin!(resume_uppers);

health_output.activate().session(&health_cap).give((worker_id, HealthStatusUpdate {
health_output.give(&health_cap, (worker_id, HealthStatusUpdate {
update: HealthStatus::Starting,
should_halt: false,
}));
})).await;
let mut prev_status = HealthStatusUpdate {
update: HealthStatus::Starting,
should_halt: false,
Expand Down Expand Up @@ -402,7 +403,6 @@ where
// We want to efficiently batch up messages that are ready. To do that we will
// activate the output handle here and then drain the currently available
// messages until we either run out of messages or run out of time.
let mut output = output.activate();
while timer.elapsed() < YIELD_INTERVAL {
match source_stream.next().now_or_never() {
Some(Some(SourceMessageType::Message(message, cap, diff))) => {
Expand All @@ -420,10 +420,7 @@ where

if prev_status != new_status_update {
prev_status = new_status_update.clone();
health_output
.activate()
.session(&health_cap)
.give((worker_id, new_status_update));
health_output.give(&health_cap, (worker_id, new_status_update)).await;
}

if let Ok(message) = &message {
Expand All @@ -438,7 +435,7 @@ where
// If cap is not beyond emit_cap we can't re-use emit_cap so
// flush the current batch
Some(emit_cap) => if !PartialOrder::less_equal(emit_cap, &cap) {
output.session(&emit_cap).give_container(&mut batch);
output.give_container(&*emit_cap, &mut batch).await;
batch.clear();
*emit_cap = cap;
},
Expand All @@ -448,12 +445,12 @@ where
}
Some(Some(SourceMessageType::SourceStatus(new_status))) => {
prev_status = new_status.clone();
health_output.activate().session(&health_cap).give((worker_id, new_status));
health_output.give(&health_cap, (worker_id, new_status)).await;
}
Some(None) => {
trace!("timely-{worker_id} source({id}): source ended, dropping capabilities");
if let Some(emit_cap) = emit_cap.take() {
output.session(&emit_cap).give_container(&mut batch);
output.give_container(&emit_cap, &mut batch).await;
batch.clear();
}
return;
Expand All @@ -462,14 +459,10 @@ where
}
}
if let Some(emit_cap) = emit_cap.take() {
output.session(&emit_cap).give_container(&mut batch);
output.give_container(&emit_cap, &mut batch).await;
batch.clear();
}
assert!(batch.is_empty());
// Now we drop the activated output handle to force timely to emit any pending
// batch. It's crucial that this happens before our attempt to yield otherwise
// the buffer would get stuck in this operator.
drop(output);
if timer.elapsed() > YIELD_INTERVAL {
tokio::task::yield_now().await;
}
Expand Down Expand Up @@ -742,15 +735,10 @@ where
&initial_batch.updates
);

// Out of an abundance of caution, do not hold the output handle
// across an await, and drop it before we downgrade the capability.
{
let mut remap_output = remap_output.activate();
let cap = cap_set.delayed(cap_set.first().unwrap());
let mut session = remap_output.session(&cap);
session.give_vec(&mut initial_batch.updates);
cap_set.downgrade(initial_batch.upper);
}
let cap = cap_set.delayed(cap_set.first().unwrap());
remap_output.give_container(&cap, &mut initial_batch.updates).await;
drop(cap);
cap_set.downgrade(initial_batch.upper);

let mut ticker = tokio::time::interval(timestamp_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -771,14 +759,8 @@ where
remap_trace_batch.upper.pretty()
);

// Out of an abundance of caution, do not hold the output handle
// across an await, and drop it before we downgrade the capability.
{
let mut remap_output = remap_output.activate();
let cap = cap_set.delayed(cap_set.first().unwrap());
let mut session = remap_output.session(&cap);
session.give_vec(&mut remap_trace_batch.updates);
}
let cap = cap_set.delayed(cap_set.first().unwrap());
remap_output.give_container(&cap, &mut remap_trace_batch.updates).await;

// If the last remap trace closed the input, we no longer
// need to (or can) advance the timestamper.
Expand All @@ -790,14 +772,8 @@ where

let mut remap_trace_batch = timestamper.advance().await;

// Out of an abundance of caution, do not hold the output handle
// across an await, and drop it before we downgrade the capability.
{
let mut remap_output = remap_output.activate();
let cap = cap_set.delayed(cap_set.first().unwrap());
let mut session = remap_output.session(&cap);
session.give_vec(&mut remap_trace_batch.updates);
}
let cap = cap_set.delayed(cap_set.first().unwrap());
remap_output.give_container(&cap, &mut remap_trace_batch.updates).await;

cap_set.downgrade(remap_trace_batch.upper);
}
Expand Down Expand Up @@ -973,7 +949,6 @@ where
// Accumulate updates to offsets for Prometheus and system table metrics collection
let mut metric_updates = BTreeMap::new();

let mut output = reclocked_output.activate();
let mut total_processed = 0;
for ((message, from_ts, diff), into_ts) in timestamper.reclock(msgs) {
let into_ts = into_ts.expect("reclock for update not beyond upper failed");
Expand All @@ -983,11 +958,11 @@ where
diff,
&mut bytes_read,
&cap_set,
&mut output,
&mut reclocked_output,
&mut metric_updates,
into_ts,
id,
);
).await;
total_processed += 1;
}
// The loop above might have completely emptied batches. We can now remove them
Expand Down Expand Up @@ -1132,15 +1107,15 @@ where
///
/// TODO: This function is a bit of a mess rn but hopefully this function makes
/// the existing mess more obvious and points towards ways to improve it.
fn handle_message<K, V, T, D>(
async fn handle_message<K, V, T, D>(
message: Result<SourceMessage<K, V>, SourceReaderError>,
time: T,
diff: D,
bytes_read: &mut usize,
cap_set: &CapabilitySet<mz_repr::Timestamp>,
output_handle: &mut OutputHandle<
output_handle: &mut AsyncOutputHandle<
mz_repr::Timestamp,
(usize, Result<SourceOutput<K, V, D>, SourceError>),
Vec<(usize, Result<SourceOutput<K, V, D>, SourceError>)>,
Tee<mz_repr::Timestamp, (usize, Result<SourceOutput<K, V, D>, SourceError>)>,
>,
metric_updates: &mut BTreeMap<PartitionId, (MzOffset, mz_repr::Timestamp, Diff)>,
Expand Down Expand Up @@ -1191,7 +1166,7 @@ fn handle_message<K, V, T, D>(
};

let ts_cap = cap_set.delayed(&ts);
output_handle.session(&ts_cap).give(output);
output_handle.give(&ts_cap, output).await;
match metric_updates.entry(partition) {
Entry::Occupied(mut entry) => {
entry.insert((offset, ts, entry.get().2 + 1));
Expand Down
Loading

0 comments on commit a5849e8

Please sign in to comment.