Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to reconnect inside the Client context manager #334

Open
mirko opened this issue Sep 8, 2024 · 9 comments
Open

How to reconnect inside the Client context manager #334

mirko opened this issue Sep 8, 2024 · 9 comments

Comments

@mirko
Copy link

mirko commented Sep 8, 2024

I read the docs and examples and tickets about reconnection.

My problem is: I don't want to wrap everything in a while-loop and re-execute everything just because of a connection issue, but potentially re-exec a failed call - while the messages are being iterated over in its own asyncio task.

Let's start - this is what I gathered from the docs, and essentially works:

import asyncio
import aiomqtt

async def msg_loop(mqtt_client):
    try:
        async for msg in mqtt_client.messages:
            print("[M]", msg)
    except aiomqtt.exceptions.MqttError as exc:
        print("[W]", "Connection issue while waiting on MQTT bus, waiting for reconnect..", exc)
        await asyncio.sleep(5)


async def main():
    print("[*]", "Connecting to MQTT broker", end='\r')
    mqtt_client = aiomqtt.Client("test.mosquitto.org", 1883)

    #asyncio.create_task(connect(mqtt_client))

    while True:
        try:
            async with mqtt_client:
                await mqtt_client.subscribe("tpc/sub")
                tsk_msg_loop = asyncio.create_task(msg_loop(mqtt_client))

                print("[>]")

                print("[1]", "Step 1..")
                await mqtt_client.publish("Foo")

                print("[2]", "Step 2..")
                await mqtt_client.publish("Bar")

                print("[N]", "Step N..")
                await mqtt_client.publish("End")

                print("[X]", "Done")

                tsk_msg_loop.cancel()

                break

        except aiomqtt.exceptions.MqttError as exc:
            print("[W]", "Connection error", exc)
            continue

asyncio.run(main())

The problem is: whenever there's a connection issue during a publish call, the whole code clock gets executed again. This is not what I want. I want to e.g. decide, whether to try again, to skip, or do whatever.

As far as I understand, I do have to do all that within the same context, so I cant just wrap every publish call around a(nother) async with mqtt_client.

I was also trying to put the (re)connect routine into its own task, which again doesn't work, because then the client context is not available in main() anymore. I'm losing hair over this seemingly simple issue and would very much appreciate some pointer on how to achieve that, as I can't see the wood for the trees anymore.

Thanks in advance!

@empicano
Copy link
Owner

empicano commented Sep 8, 2024

Hi there, thanks for opening this issue! 🙂

You already said it, reconnection is trickier than it looks. The connection can break at any time and it's not always possible to reconnect. We have multiple options of how a reconnection interface could look like:

  1. "Magically" in the background: The client reconnects automatically on failure. On disconnection, outbound messages are queued until we can reconnect, no exceptions (only log messages) are raised.
  2. Manually: Calls to publish/subscribe/etc. will fail while the client is disconnected. The client provides something like a reconnect() method that we can manually call inside the context manager.
  3. Reconnect by exiting and reentering the client (which is the current state).
  4. Others?

There's some additional discussion in #287, I would be very interested to hear what you think is best!


As to your issue, an async context manager provides two magic methods, __aenter__ and __aexit__ that get called when we enter and exit the async with block. As a workaround, you could call these methods manually to reconnect inside the context manager (You might have to catch an aiomqtt.MqttError somewhere):

client.__aexit__(None, None, None)
client.__aenter__()

Let me know if that helps!

@mirko
Copy link
Author

mirko commented Sep 9, 2024

Thanks for your reply!

We have multiple options of how a reconnection interface could look like

We as in "user of aiomqtt" or "developer how to design the interface"?

In my world I would be able to call a connect()/disconnect() (which as far as I understand existed but got purged) and/or being able to check on whether the MQTT conn is actually alive and working and wait and give it all some time if it isn't.

As to your issue

I'm not sure I totally understand the approach looking at my example - I guess this is /not/ what it would/should look like?

    while True:
        try:
            await mqtt_client.publish("Foo")
            break
        except aiomqtt.exceptions.MqttError:
            client.__aexit__(None, None, None)
            client.__aenter__()

Taking aside that this looks very clumsy (to me), this, once there was a conn issue - according to my understanding - wouldn't (a)wait the connection being re-established, but infinitely(?) try to publish, fail, aexit and aenter.

If anyhow possible, I would really appreciate if you could provide modifications to the initial example dealing with connection issues in main(). Thanks a lot!

@empicano
Copy link
Owner

To clarify, there's currently no official way to reconnect other than exiting the context manager and entering it again. That approach is lacking and I'd like to implement something better, but I don't yet know how I want it to look like.

So, anything else is going to be a workaround. Here's a self-contained example to reconnect inside the context manager:

import asyncio
import aiomqtt
import contextlib


async def main():
    async with aiomqtt.Client("localhost") as client:
        while True:
            await client.subscribe("test/+")
            try:
                async for message in client.messages:
                    print(message.payload)
            except aiomqtt.MqttError:
                print("Lost connection to the MQTT broker")
                while True:
                    print("Attempting to reconnect to the MQTT broker")
                    with contextlib.suppress(aiomqtt.MqttError):
                        await client.__aexit__(None, None, None)
                    with contextlib.suppress(aiomqtt.MqttError):
                        await client.__aenter__()
                        print("Successfully reconnected to the MQTT broker")
                        break
                    await asyncio.sleep(1)


asyncio.run(main())

The internal (again, workaround, might break without notice) Client._client._disconnected Future might help if you want to reconnect in the background in a separate task.

@empicano empicano changed the title Can't wrap my head about reconnection without everything wrapped in a loop and being executed again and again How to reconnect inside the Client context manager Sep 10, 2024
@mirko
Copy link
Author

mirko commented Sep 10, 2024

Thank you very much!

I wonder, though, how the example would look like, if I had the iteration over messages in its own task and the publish calls in the main function - can I aexit and aenter in both functions "concurrently"? I know, asyncio is technically not concurrent, but I wonder if the publish calls in main() would still succeed after I called aexit()/aenter() in a task.

I tried adding my thoughts into #287 - not sure my input helps, though, as it's coming from a user's perspective (currently) only thinking about his very issue.

@mirko
Copy link
Author

mirko commented Sep 10, 2024

I wonder, though, how the example would look like, if I had the iteration over messages in its own task and the publish calls in the main function - can I aexit and aenter in both functions "concurrently"? I know, asyncio is technically not concurrent, but I wonder if the publish calls in main() would still succeed after I called aexit()/aenter() in a task.

I just noticed the subscription / iteration over messages isn't even guarded by a try block - so my question doesn't really make sense, at least not with reference to your example.

@empicano
Copy link
Owner

... can I aexit and aenter in both functions "concurrently"?

That's a good point, I can see how that can lead to race conditions. A "correct" implementation of a reconnect method would probably not use __aenter__ and __aexit__ directly and internally avoid such race conditions with a lock. There is, however, the issue that it's easy to loose control (as a user) of the number and rate of reconnection attempts when we call publish/subscribe/etc. (and thus reconnect) from multiple tasks.

Thanks much for your comment on #287! It's super helpful to hear different opinions on this and you raise some very good points 👍 At the moment, I think failing on publish/subscribe/etc. when the client is disconnected and doing reconnection internally in a background task (Your option a) is really the best option. The tradeoff is that the user doesn't really know when to retry a failed call to publish, but all of the different approaches have tradeoffs and this one really seems to be the most intuitive and transparent one 👍

I wonder if the publish calls in main() would still succeed after I called aexit()/aenter() in a task.

If you're not only publishing messages, it should be enough to put the reconnection logic only on the message iteration:

import asyncio
import aiomqtt
import contextlib


async def publish(client):
    while True:
        with contextlib.suppress(aiomqtt.MqttError):
            await client.publish("test/example", 28.4)
        await asyncio.sleep(1)


async def main():
    async with aiomqtt.Client("localhost") as client, asyncio.TaskGroup() as group:
        group.create_task(publish(client))
        while True:
            await client.subscribe("test/+")
            try:
                async for message in client.messages:
                    print(message.payload)
            except aiomqtt.MqttError:
                print("Lost connection to the MQTT broker")
                while True:
                    print(f"Attempting to reconnect to the MQTT broker in 2 seconds")
                    await asyncio.sleep(2)
                    with contextlib.suppress(aiomqtt.MqttError):
                        await client.__aexit__(None, None, None)
                    with contextlib.suppress(aiomqtt.MqttError):
                        await client.__aenter__()
                        print("Successfully reconnected to the MQTT broker")
                        break


asyncio.run(main())

@mirko
Copy link
Author

mirko commented Sep 16, 2024

I tried to adapt your example the way I'd like to use it (iterating over messages in a task):

import asyncio
import aiomqtt
import contextlib

#MQTT_HOST = "test.mosquitto.org"
MQTT_HOST = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_TOPIC = "foo/bar"

async def msg_loop(mqtt_client):
    while True:
        print("[*]", "Subscribing to MQTT topic", end='\r')
        await mqtt_client.subscribe(MQTT_TOPIC + "/test")
        print("[>]")
        try:
            async for msg in mqtt_client.messages:
                print("[<]", msg)
        except (aiomqtt.MqttError,aiomqtt.MqttCodeError) as exc:
            print("[W]", "Connection issue while waiting on MQTT bus, waiting for reconnect", exc)
            while True:
                await asyncio.sleep(2)
                print("[*]", "Attempting reconnect to MQTT bus")
                with contextlib.suppress(aiomqtt.MqttError):
                    print("[D]", 1)
                    await client.__aexit__(None, None, None)
                    print("[D]", 2)
                with contextlib.suppress(aiomqtt.MqttError):
                    print("[D]", 3)
                    await client.__aenter__()
                    print("[>]", "Successfully reconnected to MQTT bus")
                    break

async def main():
    print("[*]", "Connecting to MQTT broker", end='\r')
    mqtt_client = aiomqtt.Client(MQTT_HOST, MQTT_PORT, timeout=5)
    async with mqtt_client:
        tsk_msg_loop = asyncio.create_task(msg_loop(mqtt_client))
        print("[>]")
        while True:
            #await mqtt_client.publish(MQTT_TOPIC + "/test", "FOOBAR")
            await asyncio.sleep(5)

asyncio.run(main())

Once a connection issue is detected ("simulated" by ip route add 34.243.217.54/32 via 127.0.0.1) I see:

[>] Connecting to MQTT broker
[>] Subscribing to MQTT topic
[W] Connection issue while waiting on MQTT bus, waiting for reconnect Disconnected during message iteration
[*] Attempting reconnect to MQTT bus
[D] 1

However even after restoring the routing, it does not reconnect. [D] 1 is the last line I see using above example, indicating it's stuck at await client.__aexit__(None, None, None)

@mirko
Copy link
Author

mirko commented Sep 24, 2024

@empicano Sorry to bother you about that once again, but if you could provide an idea on how to solve re-connection, where iteration over messages is happening in its own task, that would be tremendously appreciated - as I'm still losing my hair over this.

@empicano
Copy link
Owner

Thanks for the detailed test! I tried around with this yesterday, but somehow can't reproduce the getting stuck part on MacOS. This is the __aexit__ method:

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        """Disconnect from the broker."""
        if self._disconnected.done():
            # Return early if the client is already disconnected
            if self._lock.locked():
                self._lock.release()
            if (exc := self._disconnected.exception()) is not None:
                # If the disconnect wasn't intentional, raise the error that caused it
                raise exc
            return
        # Try to gracefully disconnect from the broker
        rc = self._client.disconnect()
        if rc == mqtt.MQTT_ERR_SUCCESS:
            # Wait for acknowledgement
            await self._wait_for(self._disconnected, timeout=None)
            # Reset `_connected` if it's still in completed state after disconnecting
            if self._connected.done():
                self._connected = asyncio.Future()
        else:
            self._logger.warning(
                "Could not gracefully disconnect: %d. Forcing disconnection.", rc
            )
        # Force disconnection if we cannot gracefully disconnect
        if not self._disconnected.done():
            self._disconnected.set_result(None)
        # Release the reusability lock
        if self._lock.locked():
            self._lock.release()

The only part where I can imagine it getting stuck is the call to paho's self._client.disconnect(). Can you confirm that that's the case? If that's it, then we have to see if it's a paho bug or if we're calling the method in a wrong way (or get stuck in the callbacks). Also, just to be sure, you're not using timeout=math.inf?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants