diff --git a/CHANGES b/CHANGES index 8750128b05..021d41c2c4 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Fix #3416, prevent blocking in async Pipeline with many commands * Move doctests (doc code examples) to main branch * Update `ResponseT` type hint * Allow to control the minimum SSL version diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 9508849703..3a92ed9630 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1362,8 +1362,10 @@ async def _execute_transaction( # noqa: C901 pre: CommandT = (("MULTI",), {}) post: CommandT = (("EXEC",), {}) cmds = (pre, *commands, post) - all_cmds = connection.pack_commands( - args for args, options in cmds if EMPTY_RESPONSE not in options + # Run pack_commands in a thread to prevent blocking + all_cmds = await asyncio.to_thread( + connection.pack_commands, + (args for args, options in cmds if EMPTY_RESPONSE not in options) ) await connection.send_packed_command(all_cmds) errors = [] @@ -1387,6 +1389,9 @@ async def _execute_transaction( # noqa: C901 except ResponseError as err: self.annotate_exception(err, i + 1, command[0]) errors.append((i, err)) + # Release back to event loop to prevent blocking + if i % 100 == 0: + await asyncio.sleep(0) # parse the EXEC. try: @@ -1419,7 +1424,8 @@ async def _execute_transaction( # noqa: C901 # We have to run response callbacks manually data = [] - for r, cmd in zip(response, commands): + # Enumerate and then release back to event loop to prevent blocking + for i, (r, cmd) in enumerate(zip(response, commands)): if not isinstance(r, Exception): args, options = cmd command_name = args[0] @@ -1432,6 +1438,8 @@ async def _execute_transaction( # noqa: C901 if inspect.isawaitable(r): r = await r data.append(r) + if i % 100 == 0: + await asyncio.sleep(0) return data async def _execute_pipeline(