diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index d4d4d7ffe..cf16ea798 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -449,12 +449,6 @@ async def observe_value( """ q: asyncio.Queue[SignalDatatypeT | Status] = asyncio.Queue() - if timeout is None: - get_value = q.get - else: - - async def get_value(): - return await asyncio.wait_for(q.get(), timeout) if done_status is not None: done_status.add_callback(q.put_nowait) @@ -462,7 +456,10 @@ async def get_value(): signal.subscribe_value(q.put_nowait) try: while True: - item = await get_value() + # yield here in case something else is filling the queue + # like in test_observe_value_times_out_with_no_external_task() + await asyncio.sleep(0) + item = await asyncio.wait_for(q.get(), timeout) if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/tests/core/test_observe.py b/tests/core/test_observe.py new file mode 100644 index 000000000..e68b08465 --- /dev/null +++ b/tests/core/test_observe.py @@ -0,0 +1,90 @@ +import asyncio +import time + +import pytest + +from ophyd_async.core import AsyncStatus, observe_value, soft_signal_r_and_setter + + +async def test_observe_value_working_correctly(): + sig, setter = soft_signal_r_and_setter(float) + + async def tick(): + for i in range(2): + await asyncio.sleep(0.01) + setter(i + 1) + + recv = [] + status = AsyncStatus(tick()) + async for val in observe_value(sig, done_status=status): + recv.append(val) + assert recv == [0, 1, 2] + await status + + +async def test_observe_value_times_out(): + sig, setter = soft_signal_r_and_setter(float) + + async def tick(): + for i in range(5): + await asyncio.sleep(0.1) + setter(i + 1) + + recv = [] + + async def watch(): + async for val in observe_value(sig): + recv.append(val) + + t = asyncio.create_task(tick()) + start = time.time() + try: + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.2) + assert recv == [0, 1] + assert time.time() - start == pytest.approx(0.2, abs=0.05) + finally: + t.cancel() + + +async def test_observe_value_times_out_with_busy_sleep(): + sig, setter = soft_signal_r_and_setter(float) + + async def tick(): + for i in range(5): + await asyncio.sleep(0.1) + setter(i + 1) + + recv = [] + + async def watch(): + async for val in observe_value(sig): + time.sleep(0.15) + recv.append(val) + + t = asyncio.create_task(tick()) + start = time.time() + try: + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.2) + assert recv == [0, 1] + assert time.time() - start == pytest.approx(0.3, abs=0.05) + finally: + t.cancel() + + +async def test_observe_value_times_out_with_no_external_task(): + sig, setter = soft_signal_r_and_setter(float) + + recv = [] + + async def watch(): + async for val in observe_value(sig): + recv.append(val) + setter(val + 1) + + start = time.time() + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.1) + assert recv + assert time.time() - start == pytest.approx(0.1, abs=0.05)