From bc153ead462d2f222f0d8aeb859aff214f22b277 Mon Sep 17 00:00:00 2001 From: "Tom C (DLS)" <101418278+coretl@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:50:27 +0000 Subject: [PATCH] Yield in each loop of observe_value (#648) This helps in the very specific case of an observe_value directly or indirectly modifying the signal that is being updated. This creates a busy loop which will not be interrupted by wrapping in asyncio.wait_for. To demonstrate, added test_observe_value_times_out_with_no_external_task --- src/ophyd_async/core/_signal.py | 5 +- tests/core/test_observe.py | 90 +++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 tests/core/test_observe.py diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index cd6152ef0..d7cbb179d 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -502,7 +502,10 @@ def queue_value(value: SignalDatatypeT, signal=signal): try: while True: - item = await get_value() + # yield here in case something else is filling the queue + # like in test_observe_value_times_out_with_no_external_task() + await asyncio.sleep(0) + item = await asyncio.wait_for(q.get(), timeout) if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/tests/core/test_observe.py b/tests/core/test_observe.py new file mode 100644 index 000000000..e68b08465 --- /dev/null +++ b/tests/core/test_observe.py @@ -0,0 +1,90 @@ +import asyncio +import time + +import pytest + +from ophyd_async.core import AsyncStatus, observe_value, soft_signal_r_and_setter + + +async def test_observe_value_working_correctly(): + sig, setter = soft_signal_r_and_setter(float) + + async def tick(): + for i in range(2): + await asyncio.sleep(0.01) + setter(i + 1) + + recv = [] + status = AsyncStatus(tick()) + async for val in observe_value(sig, done_status=status): + recv.append(val) + assert recv == [0, 1, 2] + await status + + +async def test_observe_value_times_out(): + sig, setter = soft_signal_r_and_setter(float) + + async def tick(): + for i in range(5): + await asyncio.sleep(0.1) + setter(i + 1) + + recv = [] + + async def watch(): + async for val in observe_value(sig): + recv.append(val) + + t = asyncio.create_task(tick()) + start = time.time() + try: + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.2) + assert recv == [0, 1] + assert time.time() - start == pytest.approx(0.2, abs=0.05) + finally: + t.cancel() + + +async def test_observe_value_times_out_with_busy_sleep(): + sig, setter = soft_signal_r_and_setter(float) + + async def tick(): + for i in range(5): + await asyncio.sleep(0.1) + setter(i + 1) + + recv = [] + + async def watch(): + async for val in observe_value(sig): + time.sleep(0.15) + recv.append(val) + + t = asyncio.create_task(tick()) + start = time.time() + try: + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.2) + assert recv == [0, 1] + assert time.time() - start == pytest.approx(0.3, abs=0.05) + finally: + t.cancel() + + +async def test_observe_value_times_out_with_no_external_task(): + sig, setter = soft_signal_r_and_setter(float) + + recv = [] + + async def watch(): + async for val in observe_value(sig): + recv.append(val) + setter(val + 1) + + start = time.time() + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.1) + assert recv + assert time.time() - start == pytest.approx(0.1, abs=0.05)