Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yield in each loop of observe_value #648

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,20 +449,17 @@ async def observe_value(
"""

q: asyncio.Queue[SignalDatatypeT | Status] = asyncio.Queue()
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

if done_status is not None:
done_status.add_callback(q.put_nowait)

signal.subscribe_value(q.put_nowait)
try:
while True:
item = await get_value()
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await asyncio.wait_for(q.get(), timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
Expand Down
90 changes: 90 additions & 0 deletions tests/core/test_observe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import time

import pytest

from ophyd_async.core import AsyncStatus, observe_value, soft_signal_r_and_setter


async def test_observe_value_working_correctly():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(2):
await asyncio.sleep(0.01)
setter(i + 1)

recv = []
status = AsyncStatus(tick())
async for val in observe_value(sig, done_status=status):
recv.append(val)
assert recv == [0, 1, 2]
await status


async def test_observe_value_times_out():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(5):
await asyncio.sleep(0.1)
setter(i + 1)

recv = []

async def watch():
async for val in observe_value(sig):
recv.append(val)

t = asyncio.create_task(tick())
start = time.time()
try:
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.2)
assert recv == [0, 1]
assert time.time() - start == pytest.approx(0.2, abs=0.05)
finally:
t.cancel()


async def test_observe_value_times_out_with_busy_sleep():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(5):
await asyncio.sleep(0.1)
setter(i + 1)

recv = []

async def watch():
async for val in observe_value(sig):
time.sleep(0.15)
recv.append(val)

t = asyncio.create_task(tick())
start = time.time()
try:
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.2)
assert recv == [0, 1]
assert time.time() - start == pytest.approx(0.3, abs=0.05)
finally:
t.cancel()


async def test_observe_value_times_out_with_no_external_task():
sig, setter = soft_signal_r_and_setter(float)

recv = []

async def watch():
async for val in observe_value(sig):
recv.append(val)
setter(val + 1)

start = time.time()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.1)
assert recv
assert time.time() - start == pytest.approx(0.1, abs=0.05)