Skip to content

Commit

Permalink
Allow soft_signal methods to set initial value of SimSignalBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
jsouter authored and James Souter committed Apr 17, 2024
1 parent b02299f commit 7ddac6c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
16 changes: 12 additions & 4 deletions src/ophyd_async/core/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,20 +254,28 @@ def set_sim_callback(signal: Signal[T], callback: ReadingValueCallback[T]) -> No


def soft_signal_rw(
datatype: Optional[Type[T]], name: str, source_prefix: str
datatype: Optional[Type[T]],
name: str,
source_prefix: str,
initial_value: Optional[T] = None,
) -> SignalRW[T]:
"""Creates a read-writable Signal with a SimSignalBackend"""
return SignalRW(SimSignalBackend(datatype, f"sim://{source_prefix}:{name}"))
return SignalRW(
SimSignalBackend(datatype, f"sim://{source_prefix}:{name}", initial_value)
)


def soft_signal_r_and_backend(
datatype: Optional[Type[T]], name: str, source_prefix: str
datatype: Optional[Type[T]],
name: str,
source_prefix: str,
initial_value: Optional[T] = None,
) -> Tuple[SignalR[T], SimSignalBackend]:
"""Returns a tuple of a read-only Signal and its SimSignalBackend through
which the signal can be internally modified within the device. Use
soft_signal_rw if you want a device that is externally modifiable
"""
backend = SimSignalBackend(datatype, f"sim://{source_prefix}:{name}")
backend = SimSignalBackend(datatype, f"sim://{source_prefix}:{name}", initial_value)
signal = SignalR(backend)
return (signal, backend)

Expand Down
14 changes: 12 additions & 2 deletions src/ophyd_async/core/sim_signal_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,29 @@ class SimSignalBackend(SignalBackend[T]):
_timestamp: float
_severity: int

def __init__(self, datatype: Optional[Type[T]], source: str) -> None:
def __init__(
self,
datatype: Optional[Type[T]],
source: str,
initial_value: Optional[T] = None,
) -> None:
pv = re.split(r"://", source)[-1]
self.source = f"sim://{pv}"
self.datatype = datatype
self.pv = source
self.converter: SimConverter = DisconnectedSimConverter()
self._initial_value = initial_value
self.put_proceeds = asyncio.Event()
self.put_proceeds.set()
self.callback: Optional[ReadingValueCallback[T]] = None

async def connect(self, timeout: float = DEFAULT_TIMEOUT) -> None:
self.converter = make_converter(self.datatype)
self._initial_value = self.converter.make_initial_value(self.datatype)
if self._initial_value is None:
self._initial_value = self.converter.make_initial_value(self.datatype)
else:
# convert potentially unconverted initial value passed to init method
self._initial_value = self.converter.write_value(self._initial_value)
self._severity = 0

await self.put(None)
Expand Down
6 changes: 4 additions & 2 deletions tests/core/test_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ async def test_set_and_wait_for_value():
async def test_create_soft_signal(signal_method, signal_class):
TEST_PREFIX = "TEST-PREFIX"
SIGNAL_NAME = "SIGNAL"
INITIAL_VALUE = "INITIAL"
if signal_method == soft_signal_r_and_backend:
signal, backend = signal_method(str, SIGNAL_NAME, TEST_PREFIX)
signal, backend = signal_method(str, SIGNAL_NAME, TEST_PREFIX, INITIAL_VALUE)
elif signal_method == soft_signal_rw:
signal = signal_method(str, SIGNAL_NAME, TEST_PREFIX)
signal = signal_method(str, SIGNAL_NAME, TEST_PREFIX, INITIAL_VALUE)
backend = signal._backend
assert signal._backend.source == f"sim://{TEST_PREFIX}:{SIGNAL_NAME}"
assert isinstance(signal, signal_class)
assert isinstance(signal._backend, SimSignalBackend)
await signal.connect()
assert (await signal.get_value()) == INITIAL_VALUE
# connecting with sim=False uses existing SimSignalBackend
assert signal._backend is backend

0 comments on commit 7ddac6c

Please sign in to comment.