Skip to content

Commit

Permalink
Yield in each loop of observe_value (#648)
Browse files Browse the repository at this point in the history
This helps in the very specific case of an observe_value directly
or indirectly modifying the signal that is being updated. This
creates a busy loop which will not be interrupted by wrapping
in asyncio.wait_for.  To demonstrate, added
test_observe_value_times_out_with_no_external_task
  • Loading branch information
coretl authored and ZohebShaikh committed Nov 14, 2024
1 parent ad910fb commit bc153ea
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,10 @@ def queue_value(value: SignalDatatypeT, signal=signal):

try:
while True:
item = await get_value()
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await asyncio.wait_for(q.get(), timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
Expand Down
90 changes: 90 additions & 0 deletions tests/core/test_observe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import time

import pytest

from ophyd_async.core import AsyncStatus, observe_value, soft_signal_r_and_setter


async def test_observe_value_working_correctly():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(2):
await asyncio.sleep(0.01)
setter(i + 1)

recv = []
status = AsyncStatus(tick())
async for val in observe_value(sig, done_status=status):
recv.append(val)
assert recv == [0, 1, 2]
await status


async def test_observe_value_times_out():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(5):
await asyncio.sleep(0.1)
setter(i + 1)

recv = []

async def watch():
async for val in observe_value(sig):
recv.append(val)

t = asyncio.create_task(tick())
start = time.time()
try:
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.2)
assert recv == [0, 1]
assert time.time() - start == pytest.approx(0.2, abs=0.05)
finally:
t.cancel()


async def test_observe_value_times_out_with_busy_sleep():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(5):
await asyncio.sleep(0.1)
setter(i + 1)

recv = []

async def watch():
async for val in observe_value(sig):
time.sleep(0.15)
recv.append(val)

t = asyncio.create_task(tick())
start = time.time()
try:
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.2)
assert recv == [0, 1]
assert time.time() - start == pytest.approx(0.3, abs=0.05)
finally:
t.cancel()


async def test_observe_value_times_out_with_no_external_task():
sig, setter = soft_signal_r_and_setter(float)

recv = []

async def watch():
async for val in observe_value(sig):
recv.append(val)
setter(val + 1)

start = time.time()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.1)
assert recv
assert time.time() - start == pytest.approx(0.1, abs=0.05)

0 comments on commit bc153ea

Please sign in to comment.