Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 9, 2024
1 parent 128c6c7 commit 3123ec6
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 113 deletions.
45 changes: 29 additions & 16 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use arroyo_rpc::api_types::{JobCollection, PaginationQueryParams, PipelineCollec
use arroyo_rpc::grpc::api::{ArrowProgram, ConnectorOp};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_datastream::logical::{LogicalProgram, OperatorName};
use arroyo_datastream::logical::{
ChainedLogicalOperator, LogicalNode, LogicalProgram, OperatorChain, OperatorName,
};
use arroyo_df::{ArroyoSchemaProvider, CompiledSql, SqlConfig};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_rpc::formats::Format;
Expand Down Expand Up @@ -54,6 +56,7 @@ use crate::{connection_tables, to_micros};
use arroyo_rpc::config::config;
use arroyo_types::to_millis;
use cornucopia_async::{Database, DatabaseSource};
use petgraph::prelude::EdgeRef;

async fn compile_sql<'a>(
query: String,
Expand Down Expand Up @@ -333,21 +336,31 @@ pub(crate) async fn create_pipeline_int<'a>(
};
if should_replace {
if enable_sinks {
todo!("enable sinks")
// let new_idx = g.add_node(LogicalNode {
// operator_id: format!("{}_1", g.node_weight(idx).unwrap().operator_id),
// description: "Preview sink".to_string(),
// operator_name: OperatorName::ConnectorSink,
// operator_config: default_sink().encode_to_vec(),
// parallelism: 1,
// });
// let edges: Vec<_> = g
// .edges_directed(idx, Direction::Incoming)
// .map(|e| (e.source(), e.weight().clone()))
// .collect();
// for (source, weight) in edges {
// g.add_edge(source, new_idx, weight);
// }
let new_idx = g.add_node(LogicalNode {
node_id: g.node_weights().map(|n| n.node_id).max().unwrap() + 1,
description: "Preview sink".to_string(),
operator_chain: OperatorChain::new(ChainedLogicalOperator {
operator_id: format!(
"{}_1",
g.node_weight(idx)
.unwrap()
.operator_chain
.first()
.operator_id
),
operator_name: OperatorName::ConnectorSink,
operator_config: default_sink().encode_to_vec(),
}),
parallelism: 1,
});

let edges: Vec<_> = g
.edges_directed(idx, Direction::Incoming)
.map(|e| (e.source(), e.weight().clone()))
.collect();
for (source, weight) in edges {
g.add_edge(source, new_idx, weight);
}
} else {
g.node_weight_mut(idx)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/mqtt/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl MqttTopicTester {
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("value", DataType::Utf8, false),
Field::new("value", DataType::UInt64, false),
])),
0,
)),
Expand Down
12 changes: 10 additions & 2 deletions crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ pub struct OperatorChain {
}

impl OperatorChain {
pub fn new(operator: ChainedLogicalOperator) -> Self {
Self {
operators: vec![operator],
edges: vec![],
}
}

pub fn iter(&self) -> impl Iterator<Item = (&ChainedLogicalOperator, Option<&ArroyoSchema>)> {
self.operators
.iter()
Expand Down Expand Up @@ -242,13 +249,14 @@ impl Debug for LogicalNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
"{}[{}]",
self.operator_chain
.operators
.iter()
.map(|op| op.operator_id.clone())
.collect::<Vec<_>>()
.join(" -> ")
.join(" -> "),
self.parallelism
)
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/arroyo-operator/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::time::{Instant, SystemTime};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify;
use tracing::warn;
use tracing::log::debug;
use tracing::{trace, warn};

pub type QueueItem = ArrowMessage;

Expand Down Expand Up @@ -697,6 +698,7 @@ impl ArrowCollector {
}

pub async fn broadcast(&mut self, message: SignalMessage) {
trace!("[{}] Broadcast {:?}", self.chain_info, message);
for out_node in &self.out_qs {
for q in out_node {
q.send(ArrowMessage::Signal(message.clone()))
Expand Down
12 changes: 7 additions & 5 deletions crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,11 +845,11 @@ impl ChainedOperator {
final_collector,
};

Box::pin(
self.operator
.on_close(final_message, &mut self.context, &mut collector),
)
.await;
self.operator
.on_close(final_message, &mut self.context, &mut collector)
.await;

Box::pin(next.on_close(final_message, final_collector)).await;
}
None => {
self.operator
Expand All @@ -874,6 +874,8 @@ async fn operator_run_behavior(

ready.wait().await;

info!("Running node {}", chain_info);

control_tx
.send(ControlResp::TaskStarted {
node_id: chain_info.node_id,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-rpc/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ worker-heartbeat-timeout = "30s"
healthy-duration = "2m"
worker-startup-time = "10m"
task-startup-time = "2m"
chaining.enabled = true
chaining.enabled = false

[pipeline.compaction]
enabled = false
Expand Down
90 changes: 45 additions & 45 deletions crates/arroyo-sql-testing/src/smoke_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,32 +243,36 @@ fn set_internal_parallelism(graph: &mut Graph<LogicalNode, LogicalEdge>, paralle
.any(|(c, _)| c.operator_name == OperatorName::ExpressionWatermark)
})
.collect();

let indices: Vec<_> = graph
.node_indices()
.filter(|index| {
graph
.node_weight(*index)
.unwrap()
.operator_chain
.iter()
.any(|(c, _)| match c.operator_name {
OperatorName::ExpressionWatermark
| OperatorName::ConnectorSource
| OperatorName::ConnectorSink => false,
_ => {
for watermark_node in watermark_nodes.iter() {
if has_path_connecting(&graph.clone(), *watermark_node, *index, None) {
return true;
!watermark_nodes.contains(index)
&& graph
.node_weight(*index)
.unwrap()
.operator_chain
.iter()
.any(|(c, _)| match c.operator_name {
OperatorName::ExpressionWatermark
| OperatorName::ConnectorSource
| OperatorName::ConnectorSink => false,
_ => {
for watermark_node in watermark_nodes.iter() {
if has_path_connecting(&*graph, *watermark_node, *index, None) {
return true;
}
}
false
}
false
}
})
})
})
.collect();

for node in indices {
graph.node_weight_mut(node).unwrap().parallelism = parallelism;
}

if parallelism > 1 {
let mut edges_to_make_shuffle = vec![];
for node in graph.externals(Direction::Outgoing) {
Expand Down Expand Up @@ -298,10 +302,10 @@ fn set_internal_parallelism(graph: &mut Graph<LogicalNode, LogicalEdge>, paralle
async fn run_and_checkpoint(
job_id: Arc<String>,
program: Program,
tasks_per_operator: HashMap<String, usize>,
control_rx: &mut Receiver<ControlResp>,
checkpoint_interval: i32,
) {
let tasks_per_operator = program.tasks_per_operator();
let engine = Engine::for_local(program, job_id.to_string());
let running_engine = engine.start().await;
info!("Smoke test checkpointing enabled");
Expand Down Expand Up @@ -353,34 +357,38 @@ async fn finish_from_checkpoint(
run_until_finished(&running_engine, control_rx).await;
}

fn tasks_per_operator(graph: &LogicalGraph) -> HashMap<String, usize> {
graph
.node_weights()
.flat_map(|node| {
node.operator_chain
.iter()
.map(|(op, _)| (op.operator_id.clone(), node.parallelism))
})
.collect()
}

#[allow(clippy::too_many_arguments)]
async fn run_pipeline_and_assert_outputs(
job_id: &str,
mut graph: LogicalGraph,
restore_from: Option<CheckpointMetadata>,
checkpoint_interval: i32,
output_location: String,
golden_output_location: String,
udfs: &[LocalUdf],
primary_keys: Option<&[&str]>,
) {
let (control_tx, mut control_rx) = channel(16);

// remove output_location before running the pipeline
if std::path::Path::new(&output_location).exists() {
std::fs::remove_file(&output_location).unwrap();
}

println!("Running completely");

let (control_tx, mut control_rx) = channel(128);
run_completely(
job_id,
Program::local_from_logical(
job_id.to_string(),
&graph,
udfs,
restore_from.as_ref(),
control_tx.clone(),
)
.await,
Program::local_from_logical(job_id.to_string(), &graph, udfs, None, control_tx).await,
output_location.clone(),
golden_output_location.clone(),
primary_keys,
Expand All @@ -394,16 +402,13 @@ async fn run_pipeline_and_assert_outputs(
set_internal_parallelism(&mut graph, 2);
}

let (control_tx, mut control_rx) = channel(128);

println!("Run and checkpoint");
run_and_checkpoint(
Arc::new(job_id.to_string()),
Program::local_from_logical(
job_id.to_string(),
&graph,
udfs,
restore_from.as_ref(),
control_tx.clone(),
)
.await,
Program::local_from_logical(job_id.to_string(), &graph, udfs, None, control_tx).await,
tasks_per_operator(&graph),
&mut control_rx,
checkpoint_interval,
)
Expand All @@ -413,16 +418,12 @@ async fn run_pipeline_and_assert_outputs(
set_internal_parallelism(&mut graph, 3);
}

let (control_tx, mut control_rx) = channel(128);

println!("Finish from checkpoint");
finish_from_checkpoint(
job_id,
Program::local_from_logical(
job_id.to_string(),
&graph,
udfs,
restore_from.as_ref(),
control_tx,
)
.await,
Program::local_from_logical(job_id.to_string(), &graph, udfs, Some(3), control_tx).await,
&mut control_rx,
)
.await;
Expand Down Expand Up @@ -673,7 +674,6 @@ pub async fn correctness_run_codegen(
run_pipeline_and_assert_outputs(
&test_name,
logical_program.graph,
None,
checkpoint_interval,
physical_output,
golden_output_location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ impl ArrowOperator for SessionAggregatingWindowFunc {
}

async fn on_start(&mut self, ctx: &mut OperatorContext) {
let start_times_map: &mut GlobalKeyedView<usize, Option<SystemTime>> =
let start_times_map: &mut GlobalKeyedView<u32, Option<SystemTime>> =
ctx.table_manager.get_global_keyed_state("e").await.unwrap();
let start_time = start_times_map
.get_all()
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/arrow/watermark_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl ArrowOperator for WatermarkGenerator {
// send final watermark on close
collector
.broadcast_watermark(
// this is in the year 2554, far enough out be close to inifinity,
// this is in the year 2554, far enough out be close to infinity,
// but can still be formatted.
Watermark::EventTime(from_nanos(u64::MAX as u128)),
)
Expand Down
Loading

0 comments on commit 3123ec6

Please sign in to comment.