Skip to content

Commit

Permalink
Support producer serializer config in Python kafka connector (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Sep 6, 2023
1 parent 7760d41 commit 796b3d6
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ name: "Exclamation processor"
topics:
- name: TEST_TOPIC_0
creation-mode: create-if-not-exists
schema:
type: string
keySchema:
type: string
- name: TEST_TOPIC_1
creation-mode: create-if-not-exists
schema:
type: string
keySchema:
type: string
pipeline:
- name: "Process using Python"
id: "test-python-processor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,29 @@
from langstream import Record, CommitCallback, SimpleRecord
from .kafka_serialization import (
STRING_SERIALIZER,
DOUBLE_SERIALIZER,
LONG_SERIALIZER,
BOOLEAN_SERIALIZER,
SHORT_SERIALIZER,
INTEGER_SERIALIZER,
LONG_SERIALIZER,
FLOAT_SERIALIZER,
DOUBLE_SERIALIZER,
BYTEARRAY_SERIALIZER,
)
from .topic_connector import TopicConsumer, TopicProducer

STRING_DESERIALIZER = StringDeserializer()

SERIALIZERS = {
"org.apache.kafka.common.serialization.StringSerializer": STRING_SERIALIZER,
"org.apache.kafka.common.serialization.BooleanSerializer": BOOLEAN_SERIALIZER,
"org.apache.kafka.common.serialization.ShortSerializer": SHORT_SERIALIZER,
"org.apache.kafka.common.serialization.IntegerSerializer": INTEGER_SERIALIZER,
"org.apache.kafka.common.serialization.LongSerializer": LONG_SERIALIZER,
"org.apache.kafka.common.serialization.FloatSerializer": FLOAT_SERIALIZER,
"org.apache.kafka.common.serialization.DoubleSerializer": DOUBLE_SERIALIZER,
"org.apache.kafka.common.serialization.ByteArraySerializer": BYTEARRAY_SERIALIZER,
}


def apply_default_configuration(streaming_cluster, configs):
if "admin" in streaming_cluster["configuration"]:
Expand Down Expand Up @@ -308,8 +323,8 @@ class KafkaTopicProducer(TopicProducer):
def __init__(self, configs):
self.configs = configs.copy()
self.topic = self.configs.pop("topic")
self.key_serializer = self.configs.pop("key.serializer")
self.value_serializer = self.configs.pop("value.serializer")
self.key_serializer = SERIALIZERS[self.configs.pop("key.serializer")]
self.value_serializer = SERIALIZERS[self.configs.pop("value.serializer")]
self.producer: Optional[Producer] = None
self.commit_callback: Optional[CommitCallback] = None
self.delivery_failure: Optional[Exception] = None
Expand Down Expand Up @@ -342,8 +357,8 @@ def write(self, records: List[Record]):
)
self.producer.produce(
self.topic,
value=STRING_SERIALIZER(record.value()),
key=STRING_SERIALIZER(record.key()),
value=self.value_serializer(record.value()),
key=self.key_serializer(record.key()),
headers=headers,
on_delivery=self.on_delivery,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,46 @@ def __call__(self, obj, ctx=None):
return b"\x01" if obj else b"\x00"


class ShortSerializer(Serializer):
"""
Serializes int to int16 bytes.
See Also:
`ShortSerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java>`_
""" # noqa: E501

def __call__(self, obj, ctx=None):
"""
Serializes int as int16 bytes.
Args:
obj (object): object to be serialized
ctx (SerializationContext): Metadata pertaining to the serialization
operation
Note:
None objects are represented as Kafka Null.
Raises:
SerializerError if an error occurs during serialization
Returns:
int16 bytes if obj is not None, else None
"""

if obj is None:
return None

try:
return _struct.pack(">h", obj)
except _struct.error as e:
raise SerializationError(str(e))


class LongSerializer(Serializer):
"""
Serializes int to int32 bytes.
Serializes int to int64 bytes.
See Also:
`LongSerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java>`_
Expand Down Expand Up @@ -97,8 +134,83 @@ def __call__(self, obj, ctx=None):
raise SerializationError(str(e))


class FloatSerializer(Serializer):
"""
Serializes float to IEEE 754 binary32.
See Also:
`FloatSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/FloatSerializer.html>`_
""" # noqa: E501

def __call__(self, obj, ctx=None):
"""
Args:
obj (object): object to be serialized
ctx (SerializationContext): Metadata pertaining to the serialization
operation
Note:
None objects are represented as Kafka Null.
Raises:
SerializerError if an error occurs during serialization.
Returns:
IEEE 764 binary32 bytes if obj is not None, otherwise None
"""

if obj is None:
return None

try:
return _struct.pack(">f", obj)
except _struct.error as e:
raise SerializationError(str(e))


class ByteArraySerializer(Serializer):
"""
Serializes bytes.
See Also:
`ByteArraySerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/ByteArraySerializer.html>`_
""" # noqa: E501

def __call__(self, obj, ctx=None):
"""
Args:
obj (object): object to be serialized
ctx (SerializationContext): Metadata pertaining to the serialization
operation
Note:
None objects are represented as Kafka Null.
Raises:
SerializerError if an error occurs during serialization.
Returns:
the bytes
"""

if obj is None:
return None

if not isinstance(obj, bytes):
raise SerializationError(f"ByteArraySerializer cannot serialize {obj}")

return obj


STRING_SERIALIZER = StringSerializer()
DOUBLE_SERIALIZER = DoubleSerializer()
BOOLEAN_SERIALIZER = BooleanSerializer()
SHORT_SERIALIZER = ShortSerializer()
INTEGER_SERIALIZER = IntegerSerializer()
LONG_SERIALIZER = LongSerializer()
BOOLEAN_SERIALIZER = BooleanSerializer()
FLOAT_SERIALIZER = FloatSerializer()
DOUBLE_SERIALIZER = DoubleSerializer()
BYTEARRAY_SERIALIZER = ByteArraySerializer()
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_kafka_topic_connection():
agentId: testAgentId
configuration:
className: langstream_runtime.tests.test_kafka_connection.TestSuccessProcessor
""" # noqa
""" # noqa: E501

config = yaml.safe_load(config_yaml)

Expand Down Expand Up @@ -241,7 +241,7 @@ def test_kafka_dlq():
errorHandlerConfiguration:
retries: 5
onFailure: dead-letter
""" # noqa
""" # noqa: E501

config = yaml.safe_load(config_yaml)

Expand Down Expand Up @@ -293,6 +293,60 @@ def test_producer_error():
sink.write([SimpleRecord("will fail")])


def test_producer_serializers():
with KafkaContainer(image=KAFKA_IMAGE) as container:
bootstrap_server = container.get_bootstrap_server()

consumer = Consumer(
{
"bootstrap.servers": bootstrap_server,
"group.id": "foo",
"auto.offset.reset": "earliest",
}
)
consumer.subscribe([OUTPUT_TOPIC])

config_yaml = f"""
streamingCluster:
type: kafka
configuration:
admin:
bootstrap.servers: {bootstrap_server}
"""

config = yaml.safe_load(config_yaml)

for serializer, record_value, message_value in [
("StringSerializer", "test", b"test"),
("BooleanSerializer", True, b"\x01"),
("ShortSerializer", 42, b"\x00\x2A"),
("IntegerSerializer", 42, b"\x00\x00\x00\x2A"),
("LongSerializer", 42, b"\x00\x00\x00\x00\x00\x00\x00\x2A"),
("FloatSerializer", 42.0, b"\x42\x28\x00\x00"),
("DoubleSerializer", 42.0, b"\x40\x45\x00\x00\x00\x00\x00\x00"),
("ByteArraySerializer", b"test", b"test"),
]:
sink = kafka_connection.create_topic_producer(
"id",
config["streamingCluster"],
{
"topic": OUTPUT_TOPIC,
"key.serializer": "org.apache.kafka.common.serialization."
+ serializer,
"value.serializer": "org.apache.kafka.common.serialization."
+ serializer,
},
)
sink.start()
sink.write([SimpleRecord(record_value, key=record_value)])

message = consumer.poll(5)
assert message.value() == message_value
assert message.key() == message_value

sink.close()


class TestSuccessProcessor(SingleRecordProcessor):
def process_record(self, record: Record) -> List[Record]:
headers = record.headers().copy()
Expand Down

0 comments on commit 796b3d6

Please sign in to comment.