Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improper handling bad Kafka brokers #414

Open
NeejWeej opened this issue Dec 9, 2024 · 2 comments
Open

Improper handling bad Kafka brokers #414

NeejWeej opened this issue Dec 9, 2024 · 2 comments
Labels
adapter: kafka Issues and PRs related to the Apache Kafka adapter type: bug Concrete, reproducible bugs

Comments

@NeejWeej
Copy link
Collaborator

NeejWeej commented Dec 9, 2024

Describe the bug
If the broker specified is not running, then sometimes the graph just hangs, and cannot be killed with ctrl + C

If group_id is NOT set, then having either a publish or subscribe call from the KafkaAdapterManager will cause the graph to just hang, and not be killed by ctrl + C
If group_id IS set, then having JUST a subscribe call doesn't cause an issue (graph runs, no error, but subscribing doesnt work since the broker is bad), but having a publish call causes the hang described above.

To Reproduce

import os
from datetime import datetime, timedelta
import csp
from csp.adapters.kafka import (
    KafkaAdapterManager,
    RawTextMessageMapper,
)

class JSONMessage(csp.Struct):
    text: str

@csp.graph
def json_graph():
    # bad broker
    broker = "localhost:9092"
    group_id = None
    # group_id = "foo1919191"
    kerberos_keytab = "MY_KEYTAB"
    user_principal = "ME"
    ssl_ca_location = "/MY_SSL_LOCATION"
    kafka = KafkaAdapterManager(
        group_id=group_id,
        broker=broker,
        sasl_kerberos_keytab=kerberos_keytab,
        sasl_kerberos_principal=user_principal,
        ssl_ca_location=ssl_ca_location,
        auth=True,
    )
    topic = "my_test_topic"
    field_map = {"": "text"}
    msg_mapper = RawTextMessageMapper()
    data = kafka.subscribe(ts_type=JSONMessage, msg_mapper=msg_mapper, topic=topic, field_map=field_map, key="test_key124")
    csp.print("data", data)
    kafka.publish(msg_mapper, x=csp.const("heyyyy"), topic="my_test_topic", key="test_key124")
print('starting...')
csp.run(json_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

Expected behavior
An error or warning if the broker is invalid

@robambalu
Copy link
Collaborator

@NeejWeej please confirm if you see this with the latest 0.0.6 release

@NeejWeej
Copy link
Collaborator Author

NeejWeej commented Dec 9, 2024

On 0.0.6 I see this behavior:

Regardless of if group_id is set, having JUST a subscribe call throws this error now:

RuntimeError: KafkaAdapterManager.cpp:forceShutdown:140:RuntimeException: Kafka fatal error. Local: All broker connections are down1/1 brokers are down

Including a publish call (regardless of if group_id is set) still causes the hang.

@timkpaine timkpaine added adapter: kafka Issues and PRs related to the Apache Kafka adapter type: bug Concrete, reproducible bugs labels Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
adapter: kafka Issues and PRs related to the Apache Kafka adapter type: bug Concrete, reproducible bugs
Projects
None yet
Development

No branches or pull requests

3 participants