-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connection pooling and cancellation fixes #117
Conversation
The previous implementation was very racy. Suppose there are two messages waiting in the Redis asgi message queue: - the first receiver creates the receiver loop, - the loop gets the first message and puts it in the appropriate queue, - the loop is then free to loop again to get the next message, - the receiver pops the first message off the queue - after processing the message, the loop is cancelled; - if it had already sent a BLPOP command, the response will be lost. The problem can arise when using connection pools: because a pre-existing connection won't have the initial handshake overhead, it'll be able to fire off a BLPOP command to Redis immediately after being taken out of the pool. The receive() code, on the other hand, won't be aware of this and will, having processed the message it wanted, proceed to cancel the receiver loop mid-command. If the Redis server had already sent its response by the time the client-side connection was dropped by the cancellation, the message will now be lost. The root problem here is the race condition between the loop proceeding to the next message (in cases where there aren't any other immediately running receivers) and the final receiver cancelling it. To avoid this, the loop has to be synchronized with its receivers. However, merely counting the currently running receivers is insufficient, because the loop might get messages from Redis for channels which aren't being waited on. If this could somehow be overcome, there is still the problem of initial startup: between starting the loop and getting to a point where the initial receiver can signal to the loop that it needs a message for a certain channel, the loop might already have got to the point where it checks for its terminating conditions and ending prematurely. The solution implemented in this commit approaches the problem from a different point of view. Each receiver is itself responsible for getting messages out of Redis; if the message is for a different channel, the old receive_buffer is used, letting any other receivers still get the message that was just ignored. To avoid having multiple simultaneous receivers contacting Redis independently, a token is used, which is shared between competing receivers through an asyncio.Queue, ensuring they all get a chance to pull from Redis. Because there is only one token, these chances will be serialized. As previously, the last receiver to exit also shuts down the queue; this must happen in order to be compatible with the channel layer being used from multiple event loops, since an asyncio.Queue will bind to an event loop at construction time.
Previously while you were blocked on getting the token, you could not make progress even if something arrived into your queue. This changes it so that we block on either getting a lock or getting a new message from the queue, whichever comes first. Also instead of using a queue of tokens, this changes it to use Lock objects provided by asyncio.
The one remaining loophole is Redis `BLPOP` cancellation, which was also the original problem when changing from per-request connections to connection pools. Once a `BLPOP` command is started, Python will switch to some other task while waiting for the server to respond. If that other task decides to cancel the `receive`, then one of two things will happen: - the connection is dropped soon enough for the server to notice, so that it won't send the response at all, or - the response already reached Python's process, but the task processing it is flagged for cancellation. In the latter scenario, the message will be lost. To solve this, `BLPOP` could be shielded from cancellation, but this would lead to problematic timeout handling, where a read command to Redis could easily outlive its event loop, leading to all sorts of problems for shutdown code or environments where the global channels layer instance is used from multiple event loops. Redis documentation suggests using `BRPOPLPUSH` instead, where messages are atomically returned and copied to an auxiliary list. This makes the Python implementation easier inasmuch as cancellation during the `BRPOPLPUSH` no longer needs to be handled; but the message must still be removed from the auxiliary list once processed - if and only if it is processed. One option for implementing this is to make the auxiliary removal completely uninterruptible from the point of view of the code path executing it. In a usual single-threaded asyncio program, the `BRPOPLPUSH` would be the last interruptible operation before returning a valid message; the code following this command is essentially atomic with respect to any other task attached to the event loop. Any other task might then reasonably expect that either: - the receive succeeded completely, will return a valid message and has cleaned the auxiliary list, or - `BRPOPLPUSH` was interrupted, there will be no message to speak of and there is a backup of it in the auxiliary list. Making removal atomic violates the second expectation and also a fairly basic principle of single-threaded asyncio code. Because it is still an interruptible operation, other tasks will be run during this "atomic" piece of code, meaning it could be cancelled. If it is cancelled, then that cancellation must become visible outside, otherwise we've just "run" two independent code paths at the same time in a single thread, and also made life difficult for anyone wanting to make sure tasks are complete in a timely manner - the receive cancellation would essentially be gobbled and the task would need to be cancelled again at its next interruption point. If the cancellation must proceed, however, we could be left in the following situation: the message has been unpacked, but the cancellation occured too late to stop the removal operation, so we're left with a message that we somehow have to return, because there's no backup of it in Redis. There would be a way to solve this e.g. by subclassing the cancellation exception and having it carry the message as a payload, but that would be messy. A second option is to create a detached removal task. This lets us still have a properly atomic code path from when the message is unpacked to when the function returns, and the removal will happen at some point "soon", because there are no delays in it. Notably, ordering is not a problem: if two close receives both fire off removals for their own messages, the end result will still be that two messages will be deleted from a list into which two messages were put earlier by Redis. On the other hand, it is a problem if a receive commences after a previous receive, but before its cleanup: the message backup would be moved back into the regular message queue, and the second receive would get it again even though the earlier receive processed it successfully. The solution to this is rather heavy-handed but easily reasoned about: the stretch of time between starting a removal task and finishing it can be protected by a per-channel lock. Acquiring the lock is interruptible but doesn't change the cancellation semantics of the design, since cancelling lock acquisition has the same effect as cancelling `BRPOPLPUSH` before the command reaches Redis - no message is received and Redis state is not corrupted. *NOTE*: It is useful to observe that `BRPOPLPUSH` is also the only Redis command that requires this level of special handling. The others used by `channels_redis` are either read-only, such as `LLEN`, or are destructive without expecting a return value, such as `ZREM`. `BRPOPLPUSH` is at the same time destructive (an element is removed server-side from the list) and with an important return value.
Honestly this all looks pretty good except that I'm not quite sure why the switch happened from |
The switch was needed for the reliable queue implementation: Redis only has a |
Aha! That was the key thing I was missing, thanks. That does mean that this will become a backwards-incompatible deploy technically, but honestly the worst thing it will do is reverse message priority on deploy so I'm alright with that, so I'll merge it in. Thanks for your work on this! |
Does it mean that we finally have a persistent connection? and that #114 and #104 are not needed? We @Quarticai have been following these PRs for long as this is perhaps the most awaited patch (at least for us). |
@pr4n I suggest you try the code and see if it works for you. |
NB: connection pooling (#117) does not work when using async_to_sync() around group_send() See: #223 (comment) |
This pull request contains everything we at @genialis had to do to make connection pooling work. The commits aren't squashed so that the commit logs are preserved; they describe the problems we stumbled upon.
The changes implement event loop-local connection pooling and introduce some cleanup hooks needed to avoid coroutine leaks and the like.
Message receiving also needed some work because of cancellation problems. We've seen locally that some environments cancel requests rather aggressively, which led to massive message loss with the old code. The reworked handling utilises Redis' concept of reliable queues.
Some of the original problems were due to bugs in the asyncio locking implementation, which were only fixed recently. At the time of this writing, the fixes are in Python 3.6.5 but haven't been backported further, so this code requires at least Python 3.6.5.
All tests pass and we also haven't seen any problems internally that could be attributed to this in a while. Please review.