Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmentation faults (and the like) and failure to get assignments in multithreaded/asyncio pytest environment #1797

Open
5 of 7 tasks
andreaimprovised opened this issue Aug 13, 2024 · 3 comments

Comments

@andreaimprovised
Copy link

Description

I have a pytest suite that will:

  1. Create a kafka testcontainer as a module fixture
  2. Create a bunch of topics as a module fixture using the admin client
  3. For each test case:
    i. Create one Consumer object per topic in a threadpool, all with auto.offset.reset=earliest
    ii. Subscribe to them in a threadpool.
    iii. Wait for each Consumer to receive an assignment.
    iv. In a threadpool, create a bunch of Producers, send messages to the topics, and flush.
    v. Wait for each Consumer to process the correct number of messages.
    vi. Finally, close all the consumers, regardless of exceptions raised elsewhere.

This test suite is notoriously prone to segmentation faults and the like that crash the entire interpreter and are very disruptive.

I have heard the confluent_kafka is thread safe, but I have not experienced that to be the case. And I'm open to the possibility that this is user error on my part. If so, please, show me the way.

The errors tend to happen after a first test case has run and during the second test case where the Consumer is attempting to monitor for assignments.

There are many different presentations:

INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type XMIT_BUF (0x8)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
Fatal Python error: Aborted
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type NODE_UPDATE (0x7)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type CONNECT (0x35)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type REPLY:GET_REBALANCE_PROTOCOL (0x4000003a)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.
segmentation fault
INTERNAL ERROR: librdkafka rd_kafka_poll_cb:4113: Can't handle op type REPLY:NODE_UPDATE (0x40000007)
Assertion failed: (!*"INTERNAL ERROR IN LIBRDKAFKA"), function rd_kafka_poll_cb, file rdkafka.c, line 4113.

Additionally, if the timeout argument to poll() is set, the consumer never appears to get an assignment at all.

How to reproduce

Here's a pretty elaborate script that reliably reproduces the total range of errors that I see with a lot of different knobs to tweak.

import argparse
import asyncio
import logging
import os
import sys
import threading
import time
from asyncio import CancelledError, Task
from contextlib import ExitStack
from functools import partial

import anyio.to_thread
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer
from confluent_kafka.admin import AdminClient, NewTopic

logging.basicConfig(level=logging.INFO)
confluent_kafka_logger = logging.getLogger("confluent_kafka")
confluent_kafka_logger.setLevel(logging.INFO)
confluent_kafka_logger.addHandler(logging.StreamHandler())


CONFIG = {
    "NUM_TOPICS": 1,
    "NUM_MESSAGES": 1,
    "NUM_EXPERIMENTS": sys.maxsize,
    "TOPIC_PREFIX": "test-topic",
    "MAX_NUM_THREADS": 200,
    "KAFKA_BOOTSTRAP_SERVERS": None,
    "LOCK_CONSUMER_OPERATIONS": False,
    "POLL_TIMEOUT": None,
    "CHECK_ASSIGNMENTS_TIMEOUT": 15,
    "CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT": 15,
    **os.environ,
}


SINGLETONS = {}


def recreate_topic(topic_name):
    kafka_config = {
        "bootstrap.servers": CONFIG["KAFKA_BOOTSTRAP_SERVERS"],
    }
    admin_client = AdminClient(kafka_config)

    topics = admin_client.list_topics(timeout=10).topics

    if topic_name in topics:
        print(f"Deleting existing topic '{topic_name}'...")
        fs = admin_client.delete_topics([topic_name], operation_timeout=30)
        for topic, f in fs.items():
            try:
                f.result()
                print(f"Topic '{topic}' successfully deleted.")
            except KafkaException as e:
                print(f"Failed to delete topic '{topic}': {e}")

    while topic_name in admin_client.list_topics().topics:
        time.sleep(1)

    print(f"Confirmed that topic {topic_name} does not exist.")

    print(f"Creating topic '{topic_name}'...")
    new_topic = NewTopic(topic_name, num_partitions=3, replication_factor=1)
    try:
        admin_client.create_topics([new_topic], operation_timeout=30)
        print(f"Topic '{topic_name}' created.")
    except KafkaException as e:
        print(f"Failed to create topic '{topic_name}': {e}")

    # Loop until topic exists
    while topic_name not in admin_client.list_topics().topics:
        time.sleep(1)

    print(f"Confirmed that topic {topic_name} exists.")


class LockingConsumer:
    """A patched version of the Consumer class that locks operations."""

    def __init__(self, config: dict, logger):
        self.config = config
        self.consumer = Consumer(config, logger=logger)
        self.lock = threading.Lock()

    def __getattr__(self, item):
        obj = getattr(self.consumer, item)
        if callable(obj):
            return self._wrap_callable(obj)

    def _wrap_callable(self, func):

        def wrapped(*args, **kwargs):
            with self.lock:
                return func(*args, **kwargs)

        return wrapped


class AsyncConsumer:

    def __init__(self, topic_name, num_messages):
        self.topic_name = topic_name
        self.num_messages = num_messages
        consumer_config = {
            "bootstrap.servers": CONFIG["KAFKA_BOOTSTRAP_SERVERS"],
            "group.id": f"test_group.{topic_name}",
            "auto.offset.reset": "latest",
            "enable.auto.commit": "false",
        }
        if CONFIG["LOCK_CONSUMER_OPERATIONS"]:
            consumer_class = LockingConsumer
        else:
            consumer_class = Consumer

        self.consumer = consumer_class(consumer_config, logger=confluent_kafka_logger)

        self.task: Task | None = None

    def _log(self, msg):
        print(f"Consumer({self.topic_name}): {msg}.")

    async def assignments(self):
        return await anyio.to_thread.run_sync(
            self.consumer.assignment, limiter=SINGLETONS["limiter"]
        )

    async def start(self):
        self._log("Subscribing.")
        await anyio.to_thread.run_sync(
            self.consumer.subscribe, [self.topic_name], limiter=SINGLETONS["limiter"]
        )
        try:
            self.task = asyncio.create_task(self.poll_loop())
        except CancelledError:
            self._log("Timeout.")
        except KafkaException as e:
            self._log(f"{e}")

    async def close(self):
        if self.task is not None and not self.task.done():
            self._log("Cancelling poll_loop().")
            self.task.cancel()
        else:
            self._log("poll_loop() already done.")
        self._log("Closing consumer")
        await anyio.to_thread.run_sync(
            self.consumer.close, limiter=SINGLETONS["limiter"]
        )
        self._log("Consumer closed")

    async def poll_loop(self):
        while True:
            self._log("Polling.")
            msg = await anyio.to_thread.run_sync(
                (
                    self.consumer.poll
                    if CONFIG["POLL_TIMEOUT"] is None
                    else partial(self.consumer.poll, timeout=CONFIG["POLL_TIMEOUT"])
                ),
                limiter=SINGLETONS["limiter"],
            )
            self._log(f"Message received: {msg.value()}")
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    raise KafkaException(msg.error())
            await anyio.to_thread.run_sync(
                self.consumer.commit, limiter=SINGLETONS["limiter"]
            )
            self.num_messages -= 1
            self._log(f"{self.num_messages} messages left.")


async def add_messages(topic_name):
    producer_config = {"bootstrap.servers": CONFIG["KAFKA_BOOTSTRAP_SERVERS"]}
    producer = Producer(producer_config)
    for i in range(CONFIG["NUM_MESSAGES"]):
        print(f"Producer({topic_name}) - producing message {i}.")
        await anyio.to_thread.run_sync(
            producer.produce, topic_name, f"{i}", limiter=SINGLETONS["limiter"]
        )
        await anyio.to_thread.run_sync(producer.poll, 0, limiter=SINGLETONS["limiter"])
    await anyio.to_thread.run_sync(producer.flush, limiter=SINGLETONS["limiter"])


async def check_assignments(consumer_handlers):
    assignments = []
    while not assignments:
        handler_assignments = await asyncio.gather(
            *[consumer_handler.assignments() for consumer_handler in consumer_handlers]
        )
        is_assigned = [len(assignment) > 0 for assignment in handler_assignments]
        print(f"Assignments: {is_assigned}")
        if sum(is_assigned) != len(consumer_handlers):
            print(f"Only {sum(is_assigned)} handlers have assignments, sleeping")
            await asyncio.sleep(1)
        else:
            print("All handlers have assignments.")
            return


async def check_messages_are_consumed(consumer_handlers):
    while any(
        consumer_handler.num_messages > 0 for consumer_handler in consumer_handlers
    ):
        await asyncio.sleep(0.1)


async def run_test_case(topic_names):
    print("--- Starting consumers. ---")
    consumer_handlers = [
        AsyncConsumer(topic_name, CONFIG["NUM_MESSAGES"]) for topic_name in topic_names
    ]
    await asyncio.gather(
        *[consumer_handler.start() for consumer_handler in consumer_handlers]
    )

    try:
        print("--- Waiting for assignments. ---")
        await asyncio.wait_for(
            check_assignments(consumer_handlers),
            timeout=CONFIG["CHECK_ASSIGNMENTS_TIMEOUT"],
        )

        print("--- Adding messages. ---")
        await asyncio.gather(*[add_messages(topic_name) for topic_name in topic_names])
        print("--- Waiting for messages to be consumed. ---")
        await asyncio.wait_for(
            check_messages_are_consumed(consumer_handlers),
            timeout=CONFIG["CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT"],
        )
    finally:
        print("--- Closing consumers. ---")
        await asyncio.gather(
            *[consumer_handler.close() for consumer_handler in consumer_handlers]
        )
        print("--- Test case done. ---")


async def run_experiments(topic_names):
    for i in range(CONFIG["NUM_EXPERIMENTS"]):
        print(f"------ Running experiment {i} ------")
        await run_test_case(topic_names)


async def create_topics():
    tasks = []
    topic_names = []
    for i in range(CONFIG["NUM_TOPICS"]):
        topic_name = f"{CONFIG['TOPIC_PREFIX']}-{i}"
        topic_names.append(topic_name)
        tasks.append(
            anyio.to_thread.run_sync(
                recreate_topic, topic_name, limiter=SINGLETONS["limiter"]
            )
        )
    await asyncio.gather(*tasks)
    return topic_names


async def run_test_application():
    topic_names = await create_topics()
    await run_experiments(topic_names)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num-topics", type=int, default=CONFIG["NUM_TOPICS"])
    parser.add_argument("--num-messages", type=int, default=CONFIG["NUM_MESSAGES"])
    parser.add_argument(
        "--num-experiments", type=int, default=CONFIG["NUM_EXPERIMENTS"]
    )
    parser.add_argument("--topic-prefix", type=str, default=CONFIG["TOPIC_PREFIX"])
    parser.add_argument(
        "--max-num-threads", type=int, default=CONFIG["MAX_NUM_THREADS"]
    )
    parser.add_argument("--lock-consumer-operations", action="store_true")
    parser.add_argument("--poll-timeout", type=float, default=CONFIG["POLL_TIMEOUT"])
    parser.add_argument(
        "--check-assignments-timeout",
        type=float,
        default=CONFIG["CHECK_ASSIGNMENTS_TIMEOUT"],
    )
    parser.add_argument(
        "--check-messages-are-consumed-timeout",
        type=float,
        default=CONFIG["CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT"],
    )
    parser.add_argument(
        "--kafka-bootstrap-servers", type=str, default=CONFIG["KAFKA_BOOTSTRAP_SERVERS"]
    )
    args = parser.parse_args()
    CONFIG["NUM_TOPICS"] = args.num_topics
    CONFIG["NUM_MESSAGES"] = args.num_messages
    CONFIG["NUM_EXPERIMENTS"] = args.num_experiments
    CONFIG["TOPIC_PREFIX"] = args.topic_prefix
    CONFIG["KAFKA_BOOTSTRAP_SERVERS"] = args.kafka_bootstrap_servers
    CONFIG["MAX_NUM_THREADS"] = args.max_num_threads
    CONFIG["LOCK_CONSUMER_OPERATIONS"] = args.lock_consumer_operations
    CONFIG["POLL_TIMEOUT"] = args.poll_timeout
    CONFIG["CHECK_ASSIGNMENTS_TIMEOUT"] = args.check_assignments_timeout
    CONFIG["CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT"] = (
        args.check_messages_are_consumed_timeout
    )

    with ExitStack() as stack:
        if CONFIG["KAFKA_BOOTSTRAP_SERVERS"] is None:
            import testcontainers.kafka

            kafka = stack.enter_context(testcontainers.kafka.KafkaContainer())
            CONFIG["KAFKA_BOOTSTRAP_SERVERS"] = kafka.get_bootstrap_server()

        print(f"NUM_TOPICS: {CONFIG['NUM_TOPICS']}")
        print(f"NUM_MESSAGES: {CONFIG['NUM_MESSAGES']}")
        print(f"NUM_EXPERIMENTS: {CONFIG['NUM_EXPERIMENTS']}")
        print(f"TOPIC_PREFIX: {CONFIG['TOPIC_PREFIX']}")
        print(f"KAFKA_BOOTSTRAP_SERVERS: {CONFIG['KAFKA_BOOTSTRAP_SERVERS']}")
        print(f"MAX_NUM_THREADS: {CONFIG['MAX_NUM_THREADS']}")
        print(f"LOCK_CONSUMER_OPERATIONS: {CONFIG['LOCK_CONSUMER_OPERATIONS']}")
        print(f"POLL_TIMEOUT: {CONFIG['POLL_TIMEOUT']}")
        print(f"CHECK_ASSIGNMENTS_TIMEOUT: {CONFIG['CHECK_ASSIGNMENTS_TIMEOUT']}")
        print(
            f"CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT: {CONFIG['CHECK_MESSAGES_ARE_CONSUMED_TIMEOUT']}"
        )

        # This breaks wait_for_assignments() because the first call to poll() causes
        # a lock issue
        if CONFIG["LOCK_CONSUMER_OPERATIONS"] and CONFIG["POLL_TIMEOUT"] in (None, -1):
            print(
                "Warning, blocking poll() call will prevent another thread from acquiring the lock."
            )

        # For whatever reason, consumers don't get assigned partitions if this is
        # set to a value other than None. Even passing in None to poll() causes issues
        if CONFIG["POLL_TIMEOUT"] not in (None, -1):
            print(
                "Warning, setting POLL_TIMEOUT typically prevents assignments from ever happening."
            )

        SINGLETONS["limiter"] = anyio.CapacityLimiter(CONFIG["MAX_NUM_THREADS"])

        asyncio.run(run_test_application(), debug=True)

Assignments never happening when poll(1.0)

https://gist.github.com/andreaimprovised/6221cba7c0be98ee3189dd517998bda3

INTERNAL ERROR with 3 topics

https://gist.github.com/andreaimprovised/d80eedeea6ef7beb44fff228df1942da

segmentation fault with 12 topics

https://gist.github.com/andreaimprovised/5bc6acdc05fecb35d7cb7f20295c31f7

Segmentation fault with just 1 topic and 1 message per test case

https://gist.github.com/andreaimprovised/1fe7b9f8be0d34d8a6dc40827c802934

Additional requirements:

testcontainers==4.7.2
anyio==4.4.0

I'm currently using python 3.10.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):

In [2]: confluent_kafka.version()
Out[2]: ('2.5.0', 33882112)

  • Apache Kafka broker version:

confluentinc/cp-kafka:7.6.0

  • Client configuration: {...}

It's in the code.

  • Operating system:

I've seen this on darwin arm64 and linux x86_64.

  • Provide client logs (with 'debug': '..' as necessary)

Here is an example

  • Provide broker log excerpts

Hmmm, I'll try to figure out how to get these.

  • Critical issue

It's not critical, but it depends on how critical you think automated test suites are.

@ntr-switchdin
Copy link

ntr-switchdin commented Aug 20, 2024

I've seen similar in our testing setup.

We use async subscribers from FastStream but the end result seems to be the same as it uses confluent under the hood.
At first it appeared to be related to our pants+nix test setup on python 3.11 but I have reproduced it running pytest within a venv on 3.12 as well.

I also run our tests within a docker compose application, and have never seen it occur there.
The main difference between the environments is between linux_aarch64 and macos_amd64 I suppose but I have also noticed the async selector is different:

  • KqueueSelector locally
  • EpollSelector within the containers

A coworker not on macOS has reported the issue as well and it's also appeared in CI (GH Actions) so seemingly not macOS specific.

(venv) ~/org/components/gsf git:GSF-654-convert-threaded* -> python -m pytest
===================================================== test session starts =====================================================
platform darwin -- Python 3.12.5, pytest-7.4.4, pluggy-1.5.0
rootdir: /Users/ntr/org/components/gsf
configfile: pyproject.toml
testpaths: tests
plugins: asyncio-0.23.8, env-1.1.3, cov-4.1.0, anyio-3.7.1, docker-3.1.1
asyncio: mode=Mode.AUTO
collecting ... Variable Name: ENABLE_METRICS, Value: true
Variable Name: ENABLE_LOADTESTING, Value: None
Variable Name: SEND_NOTIFICATIONS, Value: true
Variable Name: ENABLE_SENTRY, Value: None
Variable Name: ENABLE_SENTRY_TRACING, Value: None
Variable Name: IEEE_LOGGING_CLOUDWATCH, Value: None
collected 108 items

tests/gsf_tests/test_adaptors.py ....                                                                [  3%]
tests/gsf_tests/test_config_manager.py ..                                                            [  5%]
tests/gsf_tests/test_control_responses.py ..Fatal Python error: Segmentation fault

Current thread 0x000000016ea73000 (most recent call first):
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 807 in run
  File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1075 in _bootstrap_inner
  File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1032 in _bootstrap

Thread 0x00000001f57a8f40 (most recent call first):
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 1289 in create_module
  File "<frozen importlib._bootstrap>", line 813 in module_from_spec
  File "<frozen importlib._bootstrap>", line 921 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1415 in _handle_fromlist
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/multidict/_compat.py", line 12 in <module>
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 995 in exec_module
  File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/multidict/__init__.py", line 9 in <module>
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 995 in exec_module
  File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/aiohttp/hdrs.py", line 7 in <module>
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 995 in exec_module
  File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1415 in _handle_fromlist
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/aiohttp/__init__.py", line 5 in <module>
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 995 in exec_module
  File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "/Users/ntr/org/components/gsf/src/gsf/data_plane/entrypoints/upstream.py", line 11 in <module>
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 995 in exec_module
  File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "/Users/ntr/org/components/gsf/src/gsf/data_plane/entrypoints/control_responses_subscriber.py", line 8 in <module>
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 995 in exec_module
  File "<frozen importlib._bootstrap>", line 935 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1331 in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1360 in _find_and_load
  File "/Users/ntr/org/components/gsf/tests/gsf_tests/test_control_responses.py", line 86 in test_control_responses_subscriber
  File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/events.py", line 88 in _run
  File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 1986 in _run_once
  File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 641 in run_forever
  File "/opt/homebrew/Cellar/[email protected]/3.12.5/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 674 in run_until_complete
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py", line 906 in inner
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/python.py", line 194 in pytest_pyfunc_call
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/python.py", line 1792 in runtest
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py", line 440 in runtest
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 169 in pytest_runtest_call
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 262 in <lambda>
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 341 in from_call
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 261 in call_runtest_hook
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 222 in call_and_report
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 133 in runtestprotocol
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/runner.py", line 114 in pytest_runtest_protocol
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 350 in pytest_runtestloop
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 325 in _main
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 271 in wrap_session
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/main.py", line 318 in pytest_cmdline_main
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103 in _multicall
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120 in _hookexec
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513 in __call__
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 169 in main
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 192 in console_main
  File "/Users/ntr/org/components/gsf/venv/lib/python3.12/site-packages/pytest/__main__.py", line 5 in <module>
  File "<frozen runpy>", line 88 in _run_code
  File "<frozen runpy>", line 198 in _run_module_as_main

Extension modules: lxml._elementpath, lxml.etree, _cffi_backend, confluent_kafka.cimpl, charset_normalizer.md (total: 5)
[1]    14261 segmentation fault  python -m pytest

the test is basically this:

    message_received_event = Event()

    async def subscriber(event: CloudEvent):
        assert event.data
        if mrid in event.data.get("request_body"):
            message_received_event.set()

    broker.subscriber(CONTROL_RESPONSES_TOPIC["name"], group_id="test_event_router")(subscriber)

    # Start listening
    await broker.start()

    payload = get_control_response_payload(status=1, subject=mrid)
    res = await async_der_client.post(url=url, content=payload)
    assert res.status_code == 201

    # Wait for the event to be set with a timeout to avoid hanging indefinitely
    try:
        await asyncio.wait_for(message_received_event.wait(), timeout=5.0)
    except asyncio.TimeoutError:
        log.error("Timed out waiting for message received event")

    assert message_received_event.is_set()
    await broker.close()

@ntr-switchdin
Copy link

Reproduced it with lldb, looks pretty cut and dry:

tests/gsf_tests/test_config_manager.py ..             [  5%]
Process 23727 stopped
* thread #9, stop reason = EXC_BAD_ACCESS (code=1, address=0xae8)
    frame #0: 0x0000000105d9b5b8 librdkafka.dylib`rd_kafka_q_pop_serve + 888
librdkafka.dylib`rd_kafka_q_pop_serve:
->  0x105d9b5b8 <+888>: ldr    w8, [x19, #0xae8]
    0x105d9b5bc <+892>: cmp    w8, #0x1
    0x105d9b5c0 <+896>: b.ne   0x105d9b608               ; <+968>
    0x105d9b5c4 <+900>: add    x0, sp, #0x10
Target 0: (Python) stopped.
(lldb) bt
* thread #9, stop reason = EXC_BAD_ACCESS (code=1, address=0xae8)
  * frame #0: 0x0000000105d9b5b8 librdkafka.dylib`rd_kafka_q_pop_serve + 888
    frame #1: 0x0000000105d70488 librdkafka.dylib`rd_kafka_consume0 + 236
    frame #2: 0x0000000102959cb8 cimpl.cpython-312-darwin.so`Consumer_poll + 132
    frame #3: 0x0000000100ac26d0 Python`cfunction_call + 72
    frame #4: 0x0000000100a70c94 Python`_PyObject_MakeTpCall + 128
    frame #5: 0x0000000100b849f4 Python`context_run + 104
    frame #6: 0x0000000100ac1db8 Python`cfunction_vectorcall_FASTCALL_KEYWORDS + 92
    frame #7: 0x0000000100b66bf8 Python`_PyEval_EvalFrameDefault + 50272
    frame #8: 0x0000000100a73d5c Python`method_vectorcall + 372
    frame #9: 0x0000000100c36720 Python`thread_run + 144
    frame #10: 0x0000000100bcc850 Python`pythread_wrapper + 48
    frame #11: 0x000000018d959f94 libsystem_pthread.dylib`_pthread_start + 136
    ```

@joncourt
Copy link

joncourt commented Dec 12, 2024

our experience is that if you call close on the consumer while it's polling then segfaults or hangs occur. We've added some locking around the poll/consume and close calls so they can't be called together which seems to have resolved the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants