Skip to content

Commit

Permalink
Merge branch 'ArroyoSystems:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuliquan authored Nov 1, 2024
2 parents 3c87e6d + 4f56baf commit 5a94ee1
Show file tree
Hide file tree
Showing 50 changed files with 802 additions and 138 deletions.
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Connector for BlackholeConnector {
format: None,
bad_data: None,
framing: None,
metadata_fields: vec![],
};

Ok(Connection {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Connector for DeltaLakeConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl Connector for FileSystemConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl FileSystemSourceFunc {
line = line_reader.next() => {
match line.transpose()? {
Some(line) => {
ctx.deserialize_slice(line.as_bytes(), SystemTime::now()).await?;
ctx.deserialize_slice(line.as_bytes(), SystemTime::now(), None).await?;
records_read += 1;
if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl Connector for FluvioConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/fluvio/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl FluvioSourceFunc {
match message {
Some((_, Ok(msg))) => {
let timestamp = from_millis(msg.timestamp().max(0) as u64);
ctx.deserialize_slice(msg.value(), timestamp).await?;
ctx.deserialize_slice(msg.value(), timestamp, None).await?;

if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl Connector for ImpulseConnector {
format: None,
bad_data: None,
framing: None,
metadata_fields: vec![],
};

Ok(Connection {
Expand Down
32 changes: 28 additions & 4 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
use arroyo_operator::connector::{Connection, MetadataDef};
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
Expand Down Expand Up @@ -214,6 +215,7 @@ impl Connector for KafkaConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down Expand Up @@ -306,6 +308,27 @@ impl Connector for KafkaConnector {
}
}

fn metadata_defs(&self) -> &'static [MetadataDef] {
&[
MetadataDef {
name: "offset_id",
data_type: DataType::Int64,
},
MetadataDef {
name: "partition",
data_type: DataType::Int32,
},
MetadataDef {
name: "topic",
data_type: DataType::Utf8,
},
MetadataDef {
name: "timestamp",
data_type: DataType::Int64,
},
]
}

fn from_options(
&self,
name: &str,
Expand Down Expand Up @@ -383,6 +406,7 @@ impl Connector for KafkaConnector {
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.metadata_fields,
})))
}
TableType::Sink {
Expand Down Expand Up @@ -622,7 +646,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand All @@ -644,7 +668,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down Expand Up @@ -678,7 +702,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down
25 changes: 23 additions & 2 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use arroyo_formats::de::FieldValueType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp};
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};

use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
Expand Down Expand Up @@ -35,6 +36,7 @@ pub struct KafkaSourceFunc {
pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
pub client_configs: HashMap<String, String>,
pub messages_per_second: NonZeroU32,
pub metadata_fields: Vec<MetadataField>,
}

#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
Expand Down Expand Up @@ -178,7 +180,26 @@ impl KafkaSourceFunc {
.ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record",
"The message read from Kafka did not contain a message timestamp"))?;

ctx.deserialize_slice(v, from_millis(timestamp as u64)).await?;
let topic = msg.topic();

let connector_metadata = if !self.metadata_fields.is_empty() {
let mut connector_metadata = HashMap::new();
for f in &self.metadata_fields {
connector_metadata.insert(&f.field_name, match f.key.as_str() {
"offset_id" => FieldValueType::Int64(msg.offset()),
"partition" => FieldValueType::Int32(msg.partition()),
"topic" => FieldValueType::String(topic),
"timestamp" => FieldValueType::Int64(timestamp),
k => unreachable!("Invalid metadata key '{}'", k),
});
}
Some(connector_metadata)
} else {
None
};

ctx.deserialize_slice(v, from_millis(timestamp.max(0) as u64), connector_metadata.as_ref()).await?;


if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
101 changes: 100 additions & 1 deletion crates/arroyo-connectors/src/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use arroyo_operator::operator::SourceOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, RawStringFormat};
use arroyo_rpc::grpc::rpc::{CheckpointMetadata, OperatorCheckpointMetadata, OperatorMetadata};
use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp};
use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp, MetadataField};
use arroyo_types::{
single_item_hash_map, to_micros, ArrowMessage, CheckpointBarrier, SignalMessage, TaskInfo,
};
Expand Down Expand Up @@ -87,6 +87,7 @@ impl KafkaTopicTester {
schema_resolver: None,
client_configs: HashMap::new(),
messages_per_second: NonZeroU32::new(100).unwrap(),
metadata_fields: vec![],
});

let (to_control_tx, control_rx) = channel(128);
Expand Down Expand Up @@ -342,3 +343,101 @@ async fn test_kafka() {
)
.await;
}

#[tokio::test]
async fn test_kafka_with_metadata_fields() {
let mut kafka_topic_tester = KafkaTopicTester {
topic: "__arroyo-source-test_metadata".to_string(),
server: "0.0.0.0:9092".to_string(),
group_id: Some("test-consumer-group".to_string()),
};

let mut task_info = arroyo_types::get_test_task_info();
task_info.job_id = format!("kafka-job-{}", random::<u64>());

kafka_topic_tester.create_topic().await;

// Prepare metadata fields
let metadata_fields = vec![MetadataField {
field_name: "offset".to_string(),
key: "offset_id".to_string(),
}];

// Set metadata fields in KafkaSourceFunc
let mut kafka = KafkaSourceFunc {
bootstrap_servers: kafka_topic_tester.server.clone(),
topic: kafka_topic_tester.topic.clone(),
group_id: kafka_topic_tester.group_id.clone(),
group_id_prefix: None,
offset_mode: SourceOffset::Earliest,
format: Format::RawString(RawStringFormat {}),
framing: None,
bad_data: None,
schema_resolver: None,
client_configs: HashMap::new(),
messages_per_second: NonZeroU32::new(100).unwrap(),
metadata_fields,
};

let (_to_control_tx, control_rx) = channel(128);
let (command_tx, _from_control_rx) = channel(128);
let (data_tx, _recv) = batch_bounded(128);

let checkpoint_metadata = None;

let mut ctx = ArrowContext::new(
task_info.clone(),
checkpoint_metadata,
control_rx,
command_tx,
1,
vec![],
Some(ArroyoSchema::new_unkeyed(
Arc::new(Schema::new(vec![
Field::new(
"_timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("value", DataType::Utf8, false),
Field::new("offset", DataType::Int64, false),
])),
0,
)),
None,
vec![vec![data_tx]],
kafka.tables(),
)
.await;

tokio::spawn(async move {
kafka.run(&mut ctx).await;
});

let mut reader = kafka_topic_tester
.get_source_with_reader(task_info.clone(), None)
.await;
let mut producer = kafka_topic_tester.get_producer();

// Send test data
let expected_messages: Vec<_> = (1u64..=21)
.map(|i| {
let data = TestData { i };
producer.send_data(data.clone());
serde_json::to_string(&data).unwrap()
})
.collect();

// Verify received messages
reader
.assert_next_message_record_values(expected_messages.into())
.await;

reader
.to_control_tx
.send(ControlMessage::Stop {
mode: arroyo_rpc::grpc::rpc::StopMode::Graceful,
})
.await
.unwrap();
}
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl Connector for KinesisConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down
3 changes: 1 addition & 2 deletions crates/arroyo-connectors/src/kinesis/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ impl KinesisSourceFunc {
for record in records {
let data = record.data.into_inner();
let timestamp = record.approximate_arrival_timestamp.unwrap();

ctx.deserialize_slice(&data, from_nanos(timestamp.as_nanos() as u128))
ctx.deserialize_slice(&data, from_nanos(timestamp.as_nanos() as u128), None)
.await?;

if ctx.should_flush() {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub(crate) fn source_field(name: &str, field_type: FieldType) -> SourceField {
r#type: field_type,
},
nullable: false,
metadata_key: None,
}
}

Expand Down
12 changes: 11 additions & 1 deletion crates/arroyo-connectors/src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::mqtt::sink::MqttSinkFunc;
use crate::mqtt::source::MqttSourceFunc;
use crate::pull_opt;
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::connector::{Connection, Connector, MetadataDef};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
Expand Down Expand Up @@ -183,6 +184,7 @@ impl Connector for MqttConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down Expand Up @@ -237,6 +239,13 @@ impl Connector for MqttConnector {
}
}

fn metadata_defs(&self) -> &'static [MetadataDef] {
&[MetadataDef {
name: "topic",
data_type: DataType::Utf8,
}]
}

fn from_options(
&self,
name: &str,
Expand Down Expand Up @@ -282,6 +291,7 @@ impl Connector for MqttConnector {
)
.unwrap(),
subscribed: Arc::new(AtomicBool::new(false)),
metadata_fields: config.metadata_fields,
})),
TableType::Sink { retain } => OperatorNode::from_operator(Box::new(MqttSinkFunc {
config: profile,
Expand Down
Loading

0 comments on commit 5a94ee1

Please sign in to comment.