Skip to content

Commit

Permalink
clippy & format
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 7, 2024
1 parent 6091a22 commit ae6a850
Show file tree
Hide file tree
Showing 54 changed files with 622 additions and 397 deletions.
4 changes: 1 addition & 3 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use petgraph::{Direction, EdgeDirection};
use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;
use std::string::ParseError;
use std::time::{Duration, SystemTime};

use crate::{compiler_service, connection_profiles, jobs, types};
Expand All @@ -25,7 +24,7 @@ use arroyo_rpc::api_types::{JobCollection, PaginationQueryParams, PipelineCollec
use arroyo_rpc::grpc::api::{ArrowProgram, ConnectorOp};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_datastream::logical::{LogicalNode, LogicalProgram, OperatorName};
use arroyo_datastream::logical::{LogicalProgram, OperatorName};
use arroyo_df::{ArroyoSchemaProvider, CompiledSql, SqlConfig};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_rpc::formats::Format;
Expand Down Expand Up @@ -55,7 +54,6 @@ use crate::{connection_tables, to_micros};
use arroyo_rpc::config::config;
use arroyo_types::to_millis;
use cornucopia_async::{Database, DatabaseSource};
use petgraph::prelude::EdgeRef;

async fn compile_sql<'a>(
query: String,
Expand Down
8 changes: 5 additions & 3 deletions crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ impl Connector for DeltaLakeConnector {
LocalParquetFileSystemSink::new(write_path.to_string(), table, config),
)))
}
(Some(FormatSettings::Parquet { .. }), false) => Ok(ConstructedOperator::from_operator(
Box::new(ParquetFileSystemSink::new(table, config)),
)),
(Some(FormatSettings::Parquet { .. }), false) => {
Ok(ConstructedOperator::from_operator(Box::new(
ParquetFileSystemSink::new(table, config),
)))
}
_ => bail!("Delta Lake sink only supports Parquet format"),
}
}
Expand Down
16 changes: 9 additions & 7 deletions crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,17 @@ impl Connector for FileSystemConnector {
config: OperatorConfig,
) -> Result<ConstructedOperator> {
match &table.table_type {
TableType::Source { .. } => {
Ok(ConstructedOperator::from_source(Box::new(FileSystemSourceFunc {
TableType::Source { .. } => Ok(ConstructedOperator::from_source(Box::new(
FileSystemSourceFunc {
table: table.table_type.clone(),
format: config
.format
.ok_or_else(|| anyhow!("format required for FileSystem source"))?,
framing: config.framing.clone(),
bad_data: config.bad_data.clone(),
file_states: HashMap::new(),
})))
}
},
))),
TableType::Sink {
file_settings: _,
format_settings,
Expand All @@ -264,9 +264,11 @@ impl Connector for FileSystemConnector {
LocalJsonFileSystemSink::new(write_path.to_string(), table, config),
)))
}
(Some(FormatSettings::Json { .. }), false) => Ok(ConstructedOperator::from_operator(
Box::new(JsonFileSystemSink::new(table, config)),
)),
(Some(FormatSettings::Json { .. }), false) => {
Ok(ConstructedOperator::from_operator(Box::new(
JsonFileSystemSink::new(table, config),
)))
}
(None, _) => bail!("have to have some format settings"),
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use arroyo_operator::context::Collector;
use arroyo_operator::{context::OperatorContext, operator::ArrowOperator};
use arroyo_rpc::{
grpc::rpc::{GlobalKeyedTableConfig, TableConfig, TableEnum},
Expand All @@ -12,8 +13,7 @@ use bincode::config;
use prost::Message;
use std::fmt::Debug;
use std::{collections::HashMap, time::SystemTime};
use tracing::{info};
use arroyo_operator::context::Collector;
use tracing::info;

pub struct TwoPhaseCommitterOperator<TPC: TwoPhaseCommitter> {
committer: TPC,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl<TPC: TwoPhaseCommitter> ArrowOperator for TwoPhaseCommitterOperator<TPC> {
);
tables
}

fn is_committing(&self) -> bool {
true
}
Expand Down Expand Up @@ -184,7 +184,12 @@ impl<TPC: TwoPhaseCommitter> ArrowOperator for TwoPhaseCommitterOperator<TPC> {
}
}

async fn process_batch(&mut self, batch: RecordBatch, _ctx: &mut OperatorContext, _: &mut dyn Collector) {
async fn process_batch(
&mut self,
batch: RecordBatch,
_ctx: &mut OperatorContext,
_: &mut dyn Collector,
) {
self.committer
.insert_batch(batch)
.await
Expand Down
19 changes: 15 additions & 4 deletions crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ impl SourceOperator for FileSystemSourceFunc {
"FileSystem".to_string()
}

async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
async fn run(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> SourceFinishType {
match self.run_int(ctx, collector).await {
Ok(s) => s,
Err(e) => {
Expand All @@ -83,7 +87,11 @@ impl FileSystemSourceFunc {
}
}

async fn run_int(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> Result<SourceFinishType, UserError> {
async fn run_int(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> Result<SourceFinishType, UserError> {
let (storage_provider, regex_pattern) = match &self.table {
TableType::Source {
path,
Expand Down Expand Up @@ -164,7 +172,10 @@ impl FileSystemSourceFunc {
continue;
}

if let Some(finish_type) = self.read_file(ctx, collector, &storage_provider, &obj_key).await? {
if let Some(finish_type) = self
.read_file(ctx, collector, &storage_provider, &obj_key)
.await?
{
return Ok(finish_type);
}
}
Expand Down Expand Up @@ -399,7 +410,7 @@ impl FileSystemSourceFunc {
async fn process_control_message(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
collector: &mut SourceCollector,
control_message: ControlMessage,
) -> Option<SourceFinishType> {
match control_message {
Expand Down
30 changes: 16 additions & 14 deletions crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ impl Connector for FluvioConnector {
config: OperatorConfig,
) -> anyhow::Result<ConstructedOperator> {
match table.type_ {
TableType::Source { offset } => {
Ok(ConstructedOperator::from_source(Box::new(FluvioSourceFunc {
TableType::Source { offset } => Ok(ConstructedOperator::from_source(Box::new(
FluvioSourceFunc {
topic: table.topic,
endpoint: table.endpoint.clone(),
offset_mode: offset,
Expand All @@ -185,18 +185,20 @@ impl Connector for FluvioConnector {
.ok_or_else(|| anyhow!("format required for fluvio source"))?,
framing: config.framing,
bad_data: config.bad_data,
})))
}
TableType::Sink { .. } => Ok(ConstructedOperator::from_operator(Box::new(FluvioSinkFunc {
topic: table.topic,
endpoint: table.endpoint,
producer: None,
serializer: ArrowSerializer::new(
config
.format
.ok_or_else(|| anyhow!("format required for fluvio sink"))?,
),
}))),
},
))),
TableType::Sink { .. } => Ok(ConstructedOperator::from_operator(Box::new(
FluvioSinkFunc {
topic: table.topic,
endpoint: table.endpoint,
producer: None,
serializer: ArrowSerializer::new(
config
.format
.ok_or_else(|| anyhow!("format required for fluvio sink"))?,
),
},
))),
}
}
}
Expand Down
25 changes: 18 additions & 7 deletions crates/arroyo-connectors/src/fluvio/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,23 @@ impl ArrowOperator for FluvioSinkFunc {
self.producer = Some(producer);
}
Err(e) => {
ctx.error_reporter.report_error(
"Failed to construct Fluvio producer".to_string(),
e.to_string(),
)
.await;
ctx.error_reporter
.report_error(
"Failed to construct Fluvio producer".to_string(),
e.to_string(),
)
.await;
panic!("Failed to construct Fluvio producer: {:?}", e);
}
}
}

async fn process_batch(&mut self, batch: RecordBatch, _: &mut OperatorContext, _: &mut dyn Collector) {
async fn process_batch(
&mut self,
batch: RecordBatch,
_: &mut OperatorContext,
_: &mut dyn Collector,
) {
let values = self.serializer.serialize(&batch);
for v in values {
self.producer
Expand All @@ -60,7 +66,12 @@ impl ArrowOperator for FluvioSinkFunc {
}
}

async fn handle_checkpoint(&mut self, _: CheckpointBarrier, _: &mut OperatorContext, _: &mut dyn Collector) {
async fn handle_checkpoint(
&mut self,
_: CheckpointBarrier,
_: &mut OperatorContext,
_: &mut dyn Collector,
) {
self.producer.as_mut().unwrap().flush().await.unwrap();
}
}
Expand Down
23 changes: 16 additions & 7 deletions crates/arroyo-connectors/src/fluvio/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl SourceOperator for FluvioSourceFunc {
global_table_config("f", "fluvio source state")
}

async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
async fn run(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> SourceFinishType {
collector.initialize_deserializer(
self.format.clone(),
self.framing.clone(),
Expand Down Expand Up @@ -108,7 +112,9 @@ impl FluvioSourceFunc {
let has_state = !state.is_empty();

let parts: Vec<_> = (0..partitions)
.filter(|i| *i % ctx.task_info.parallelism as usize == ctx.task_info.task_index as usize)
.filter(|i| {
*i % ctx.task_info.parallelism as usize == ctx.task_info.task_index as usize
})
.map(|i| {
let offset = state
.get(&(i as u32))
Expand Down Expand Up @@ -139,7 +145,11 @@ impl FluvioSourceFunc {
Ok(streams)
}

async fn run_int(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> Result<SourceFinishType, UserError> {
async fn run_int(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> Result<SourceFinishType, UserError> {
let mut streams = self
.get_consumer(ctx)
.await
Expand All @@ -148,10 +158,9 @@ impl FluvioSourceFunc {
if streams.is_empty() {
warn!("Fluvio Consumer {}-{} is subscribed to no partitions, as there are more subtasks than partitions... setting idle",
ctx.task_info.operator_id, ctx.task_info.task_index);
collector.broadcast(SignalMessage::Watermark(
Watermark::Idle,
))
.await;
collector
.broadcast(SignalMessage::Watermark(Watermark::Idle))
.await;
}

let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
Expand Down
9 changes: 4 additions & 5 deletions crates/arroyo-connectors/src/kafka/sink/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use std::time::{Duration, SystemTime};
use arrow::array::{RecordBatch, UInt32Array};
use arrow::datatypes::Field;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use async_trait::async_trait;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::{Collector, OperatorContext};
use arroyo_operator::context::OperatorContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, JsonFormat};
Expand Down Expand Up @@ -156,7 +155,7 @@ async fn test_kafka_checkpoint_flushes() {

sink_with_writes
.sink
.process_batch(batch, &mut sink_with_writes.ctx, &mut DummyCollector{})
.process_batch(batch, &mut sink_with_writes.ctx, &mut DummyCollector {})
.await;
}
let barrier = CheckpointBarrier {
Expand All @@ -167,7 +166,7 @@ async fn test_kafka_checkpoint_flushes() {
};
sink_with_writes
.sink
.handle_checkpoint(barrier, &mut sink_with_writes.ctx, &mut DummyCollector{})
.handle_checkpoint(barrier, &mut sink_with_writes.ctx, &mut DummyCollector {})
.await;

for message in 1u32..200 {
Expand All @@ -194,7 +193,7 @@ async fn test_kafka() {

sink_with_writes
.sink
.process_batch(batch, &mut sink_with_writes.ctx, &mut DummyCollector{})
.process_batch(batch, &mut sink_with_writes.ctx, &mut DummyCollector {})
.await;
sink_with_writes
.sink
Expand Down
Loading

0 comments on commit ae6a850

Please sign in to comment.