diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml index 0b9d15595..b6dd7891c 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml @@ -21,8 +21,16 @@ name: "Exclamation processor" topics: - name: TEST_TOPIC_0 creation-mode: create-if-not-exists + schema: + type: string + keySchema: + type: string - name: TEST_TOPIC_1 creation-mode: create-if-not-exists + schema: + type: string + keySchema: + type: string pipeline: - name: "Process using Python" id: "test-python-processor" diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_connection.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_connection.py index b88aff991..26669cb5f 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_connection.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_connection.py @@ -26,14 +26,29 @@ from langstream import Record, CommitCallback, SimpleRecord from .kafka_serialization import ( STRING_SERIALIZER, - DOUBLE_SERIALIZER, - LONG_SERIALIZER, BOOLEAN_SERIALIZER, + SHORT_SERIALIZER, + INTEGER_SERIALIZER, + LONG_SERIALIZER, + FLOAT_SERIALIZER, + DOUBLE_SERIALIZER, + BYTEARRAY_SERIALIZER, ) from .topic_connector import TopicConsumer, TopicProducer STRING_DESERIALIZER = StringDeserializer() +SERIALIZERS = { + "org.apache.kafka.common.serialization.StringSerializer": STRING_SERIALIZER, + "org.apache.kafka.common.serialization.BooleanSerializer": BOOLEAN_SERIALIZER, + "org.apache.kafka.common.serialization.ShortSerializer": SHORT_SERIALIZER, + "org.apache.kafka.common.serialization.IntegerSerializer": INTEGER_SERIALIZER, + "org.apache.kafka.common.serialization.LongSerializer": LONG_SERIALIZER, + "org.apache.kafka.common.serialization.FloatSerializer": FLOAT_SERIALIZER, + "org.apache.kafka.common.serialization.DoubleSerializer": DOUBLE_SERIALIZER, + "org.apache.kafka.common.serialization.ByteArraySerializer": BYTEARRAY_SERIALIZER, +} + def apply_default_configuration(streaming_cluster, configs): if "admin" in streaming_cluster["configuration"]: @@ -308,8 +323,8 @@ class KafkaTopicProducer(TopicProducer): def __init__(self, configs): self.configs = configs.copy() self.topic = self.configs.pop("topic") - self.key_serializer = self.configs.pop("key.serializer") - self.value_serializer = self.configs.pop("value.serializer") + self.key_serializer = SERIALIZERS[self.configs.pop("key.serializer")] + self.value_serializer = SERIALIZERS[self.configs.pop("value.serializer")] self.producer: Optional[Producer] = None self.commit_callback: Optional[CommitCallback] = None self.delivery_failure: Optional[Exception] = None @@ -342,8 +357,8 @@ def write(self, records: List[Record]): ) self.producer.produce( self.topic, - value=STRING_SERIALIZER(record.value()), - key=STRING_SERIALIZER(record.key()), + value=self.value_serializer(record.value()), + key=self.key_serializer(record.key()), headers=headers, on_delivery=self.on_delivery, ) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_serialization.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_serialization.py index faf6651fd..1b6a2fd72 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_serialization.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_serialization.py @@ -60,9 +60,46 @@ def __call__(self, obj, ctx=None): return b"\x01" if obj else b"\x00" +class ShortSerializer(Serializer): + """ + Serializes int to int16 bytes. + + See Also: + `ShortSerializer Javadoc `_ + """ # noqa: E501 + + def __call__(self, obj, ctx=None): + """ + Serializes int as int16 bytes. + + Args: + obj (object): object to be serialized + + ctx (SerializationContext): Metadata pertaining to the serialization + operation + + Note: + None objects are represented as Kafka Null. + + Raises: + SerializerError if an error occurs during serialization + + Returns: + int16 bytes if obj is not None, else None + """ + + if obj is None: + return None + + try: + return _struct.pack(">h", obj) + except _struct.error as e: + raise SerializationError(str(e)) + + class LongSerializer(Serializer): """ - Serializes int to int32 bytes. + Serializes int to int64 bytes. See Also: `LongSerializer Javadoc `_ @@ -97,8 +134,83 @@ def __call__(self, obj, ctx=None): raise SerializationError(str(e)) +class FloatSerializer(Serializer): + """ + Serializes float to IEEE 754 binary32. + + See Also: + `FloatSerializer Javadoc `_ + + """ # noqa: E501 + + def __call__(self, obj, ctx=None): + """ + Args: + obj (object): object to be serialized + + ctx (SerializationContext): Metadata pertaining to the serialization + operation + + Note: + None objects are represented as Kafka Null. + + Raises: + SerializerError if an error occurs during serialization. + + Returns: + IEEE 764 binary32 bytes if obj is not None, otherwise None + """ + + if obj is None: + return None + + try: + return _struct.pack(">f", obj) + except _struct.error as e: + raise SerializationError(str(e)) + + +class ByteArraySerializer(Serializer): + """ + Serializes bytes. + + See Also: + `ByteArraySerializer Javadoc `_ + + """ # noqa: E501 + + def __call__(self, obj, ctx=None): + """ + Args: + obj (object): object to be serialized + + ctx (SerializationContext): Metadata pertaining to the serialization + operation + + Note: + None objects are represented as Kafka Null. + + Raises: + SerializerError if an error occurs during serialization. + + Returns: + the bytes + """ + + if obj is None: + return None + + if not isinstance(obj, bytes): + raise SerializationError(f"ByteArraySerializer cannot serialize {obj}") + + return obj + + STRING_SERIALIZER = StringSerializer() -DOUBLE_SERIALIZER = DoubleSerializer() +BOOLEAN_SERIALIZER = BooleanSerializer() +SHORT_SERIALIZER = ShortSerializer() INTEGER_SERIALIZER = IntegerSerializer() LONG_SERIALIZER = LongSerializer() -BOOLEAN_SERIALIZER = BooleanSerializer() +FLOAT_SERIALIZER = FloatSerializer() +DOUBLE_SERIALIZER = DoubleSerializer() +BYTEARRAY_SERIALIZER = ByteArraySerializer() diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/tests/test_kafka_connection.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/tests/test_kafka_connection.py index 9a4585dee..80bf9e4b4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/tests/test_kafka_connection.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/tests/test_kafka_connection.py @@ -82,7 +82,7 @@ def test_kafka_topic_connection(): agentId: testAgentId configuration: className: langstream_runtime.tests.test_kafka_connection.TestSuccessProcessor - """ # noqa + """ # noqa: E501 config = yaml.safe_load(config_yaml) @@ -241,7 +241,7 @@ def test_kafka_dlq(): errorHandlerConfiguration: retries: 5 onFailure: dead-letter - """ # noqa + """ # noqa: E501 config = yaml.safe_load(config_yaml) @@ -293,6 +293,60 @@ def test_producer_error(): sink.write([SimpleRecord("will fail")]) +def test_producer_serializers(): + with KafkaContainer(image=KAFKA_IMAGE) as container: + bootstrap_server = container.get_bootstrap_server() + + consumer = Consumer( + { + "bootstrap.servers": bootstrap_server, + "group.id": "foo", + "auto.offset.reset": "earliest", + } + ) + consumer.subscribe([OUTPUT_TOPIC]) + + config_yaml = f""" + streamingCluster: + type: kafka + configuration: + admin: + bootstrap.servers: {bootstrap_server} + """ + + config = yaml.safe_load(config_yaml) + + for serializer, record_value, message_value in [ + ("StringSerializer", "test", b"test"), + ("BooleanSerializer", True, b"\x01"), + ("ShortSerializer", 42, b"\x00\x2A"), + ("IntegerSerializer", 42, b"\x00\x00\x00\x2A"), + ("LongSerializer", 42, b"\x00\x00\x00\x00\x00\x00\x00\x2A"), + ("FloatSerializer", 42.0, b"\x42\x28\x00\x00"), + ("DoubleSerializer", 42.0, b"\x40\x45\x00\x00\x00\x00\x00\x00"), + ("ByteArraySerializer", b"test", b"test"), + ]: + sink = kafka_connection.create_topic_producer( + "id", + config["streamingCluster"], + { + "topic": OUTPUT_TOPIC, + "key.serializer": "org.apache.kafka.common.serialization." + + serializer, + "value.serializer": "org.apache.kafka.common.serialization." + + serializer, + }, + ) + sink.start() + sink.write([SimpleRecord(record_value, key=record_value)]) + + message = consumer.poll(5) + assert message.value() == message_value + assert message.key() == message_value + + sink.close() + + class TestSuccessProcessor(SingleRecordProcessor): def process_record(self, record: Record) -> List[Record]: headers = record.headers().copy()