-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update Pilatus Controller, Driver to match development of ADAravis, T…
…etrAMM (#191) * Update PilatusDriver, Controller to match Aravis patterns * Add facility generic PilatusDetector
- Loading branch information
1 parent
a2d9dfb
commit 87f89b8
Showing
7 changed files
with
215 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 36 additions & 24 deletions
60
src/ophyd_async/epics/areadetector/controllers/pilatus_controller.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,49 +1,61 @@ | ||
import asyncio | ||
from typing import Optional, Set | ||
from typing import Optional | ||
|
||
from ophyd_async.core import AsyncStatus, DetectorControl, DetectorTrigger | ||
from ophyd_async.core.async_status import AsyncStatus | ||
from ophyd_async.core.detector import DetectorControl, DetectorTrigger | ||
from ophyd_async.epics.areadetector.drivers.ad_base import ( | ||
DEFAULT_GOOD_STATES, | ||
DetectorState, | ||
start_acquiring_driver_and_ensure_status, | ||
) | ||
|
||
from ..drivers.pilatus_driver import PilatusDriver, TriggerMode | ||
from ..utils import ImageMode, stop_busy_record | ||
|
||
TRIGGER_MODE = { | ||
DetectorTrigger.internal: TriggerMode.internal, | ||
DetectorTrigger.constant_gate: TriggerMode.ext_enable, | ||
DetectorTrigger.variable_gate: TriggerMode.ext_enable, | ||
} | ||
from ophyd_async.epics.areadetector.drivers.pilatus_driver import ( | ||
PilatusDriver, | ||
PilatusTriggerMode, | ||
) | ||
from ophyd_async.epics.areadetector.utils import ImageMode, stop_busy_record | ||
|
||
|
||
class PilatusController(DetectorControl): | ||
_supported_trigger_types = { | ||
DetectorTrigger.internal: PilatusTriggerMode.internal, | ||
DetectorTrigger.constant_gate: PilatusTriggerMode.ext_enable, | ||
DetectorTrigger.variable_gate: PilatusTriggerMode.ext_enable, | ||
} | ||
|
||
def __init__( | ||
self, | ||
driver: PilatusDriver, | ||
good_states: Set[DetectorState] = set(DEFAULT_GOOD_STATES), | ||
) -> None: | ||
self.driver = driver | ||
self.good_states = good_states | ||
self._drv = driver | ||
|
||
def get_deadtime(self, exposure: float) -> float: | ||
return 0.001 | ||
# Cite: https://media.dectris.com/User_Manual-PILATUS2-V1_4.pdf | ||
"""The required minimum time difference between ExpPeriod and ExpTime | ||
(readout time) is 2.28 ms""" | ||
return 2.28e-3 | ||
|
||
async def arm( | ||
self, | ||
num: int, | ||
trigger: DetectorTrigger = DetectorTrigger.internal, | ||
exposure: Optional[float] = None, | ||
) -> AsyncStatus: | ||
if exposure is not None: | ||
await self._drv.acquire_time.set(exposure) | ||
await asyncio.gather( | ||
self.driver.trigger_mode.set(TRIGGER_MODE[trigger]), | ||
self.driver.num_images.set(999_999 if num == 0 else num), | ||
self.driver.image_mode.set(ImageMode.multiple), | ||
) | ||
return await start_acquiring_driver_and_ensure_status( | ||
self.driver, good_states=self.good_states | ||
self._drv.trigger_mode.set(self._get_trigger_mode(trigger)), | ||
self._drv.num_images.set(999_999 if num == 0 else num), | ||
self._drv.image_mode.set(ImageMode.multiple), | ||
) | ||
return await start_acquiring_driver_and_ensure_status(self._drv) | ||
|
||
@classmethod | ||
def _get_trigger_mode(cls, trigger: DetectorTrigger) -> PilatusTriggerMode: | ||
if trigger not in cls._supported_trigger_types.keys(): | ||
raise ValueError( | ||
f"{cls.__name__} only supports the following trigger " | ||
f"types: {cls._supported_trigger_types.keys()} but was asked to " | ||
f"use {trigger}" | ||
) | ||
return cls._supported_trigger_types[trigger] | ||
|
||
async def disarm(self): | ||
await stop_busy_record(self.driver.acquire, False, timeout=1) | ||
await stop_busy_record(self._drv.acquire, False, timeout=1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from typing import Optional, Sequence | ||
|
||
from bluesky.protocols import Hints | ||
|
||
from ophyd_async.core import DirectoryProvider | ||
from ophyd_async.core.detector import StandardDetector | ||
from ophyd_async.core.signal import SignalR | ||
from ophyd_async.epics.areadetector.controllers.pilatus_controller import ( | ||
PilatusController, | ||
) | ||
from ophyd_async.epics.areadetector.drivers.ad_base import ADBaseShapeProvider | ||
from ophyd_async.epics.areadetector.drivers.pilatus_driver import PilatusDriver | ||
from ophyd_async.epics.areadetector.writers.hdf_writer import HDFWriter | ||
from ophyd_async.epics.areadetector.writers.nd_file_hdf import NDFileHDF | ||
|
||
|
||
class PilatusDetector(StandardDetector): | ||
"""A Pilatus StandardDetector writing HDF files""" | ||
|
||
_controller: PilatusController | ||
_writer: HDFWriter | ||
|
||
def __init__( | ||
self, | ||
prefix: str, | ||
name: str, | ||
directory_provider: DirectoryProvider, | ||
driver: PilatusDriver, | ||
hdf: NDFileHDF, | ||
config_sigs: Optional[Sequence[SignalR]] = None, | ||
**scalar_sigs: str, | ||
): | ||
self.drv = driver | ||
self.hdf = hdf | ||
|
||
super().__init__( | ||
PilatusController(self.drv), | ||
HDFWriter( | ||
self.hdf, | ||
directory_provider, | ||
lambda: self.name, | ||
ADBaseShapeProvider(self.drv), | ||
**scalar_sigs, | ||
), | ||
config_sigs=config_sigs or (self.drv.acquire_time,), | ||
name=name, | ||
) | ||
|
||
@property | ||
def hints(self) -> Hints: | ||
return self._writer.hints |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import pytest | ||
from bluesky.run_engine import RunEngine | ||
|
||
from ophyd_async.core import ( | ||
DetectorTrigger, | ||
DeviceCollector, | ||
DirectoryProvider, | ||
TriggerInfo, | ||
set_sim_value, | ||
) | ||
from ophyd_async.epics.areadetector.controllers.pilatus_controller import ( | ||
PilatusController, | ||
) | ||
from ophyd_async.epics.areadetector.drivers.pilatus_driver import ( | ||
PilatusDriver, | ||
PilatusTriggerMode, | ||
) | ||
from ophyd_async.epics.areadetector.pilatus import PilatusDetector | ||
from ophyd_async.epics.areadetector.writers.nd_file_hdf import NDFileHDF | ||
|
||
|
||
@pytest.fixture | ||
async def pilatus_driver(RE: RunEngine) -> PilatusDriver: | ||
async with DeviceCollector(sim=True): | ||
driver = PilatusDriver("DRV:") | ||
|
||
return driver | ||
|
||
|
||
@pytest.fixture | ||
async def pilatus_controller( | ||
RE: RunEngine, pilatus_driver: PilatusDriver | ||
) -> PilatusController: | ||
async with DeviceCollector(sim=True): | ||
controller = PilatusController(pilatus_driver) | ||
|
||
return controller | ||
|
||
|
||
@pytest.fixture | ||
async def hdf(RE: RunEngine) -> NDFileHDF: | ||
async with DeviceCollector(sim=True): | ||
hdf = NDFileHDF("HDF:") | ||
|
||
return hdf | ||
|
||
|
||
@pytest.fixture | ||
async def pilatus( | ||
RE: RunEngine, | ||
static_directory_provider: DirectoryProvider, | ||
pilatus_driver: PilatusDriver, | ||
hdf: NDFileHDF, | ||
) -> PilatusDetector: | ||
async with DeviceCollector(sim=True): | ||
pilatus = PilatusDetector( | ||
"PILATUS:", | ||
"pilatus", | ||
static_directory_provider, | ||
driver=pilatus_driver, | ||
hdf=hdf, | ||
) | ||
|
||
return pilatus | ||
|
||
|
||
async def test_deadtime_invariant( | ||
pilatus_controller: PilatusController, | ||
): | ||
# deadtime invariant with exposure time | ||
assert pilatus_controller.get_deadtime(0) == 2.28e-3 | ||
assert pilatus_controller.get_deadtime(500) == 2.28e-3 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"detector_trigger,expected_trigger_mode", | ||
[ | ||
(DetectorTrigger.internal, PilatusTriggerMode.internal), | ||
(DetectorTrigger.internal, PilatusTriggerMode.internal), | ||
(DetectorTrigger.internal, PilatusTriggerMode.internal), | ||
], | ||
) | ||
async def test_trigger_mode_set( | ||
pilatus: PilatusDetector, | ||
detector_trigger: DetectorTrigger, | ||
expected_trigger_mode: PilatusTriggerMode, | ||
): | ||
async def trigger_and_complete(): | ||
await pilatus.controller.arm(num=1, trigger=detector_trigger) | ||
# Prevent timeouts | ||
set_sim_value(pilatus.controller._drv.acquire, True) | ||
|
||
# Default TriggerMode | ||
assert (await pilatus.drv.trigger_mode.get_value()) == PilatusTriggerMode.internal | ||
|
||
await trigger_and_complete() | ||
|
||
# TriggerSource changes | ||
assert (await pilatus.drv.trigger_mode.get_value()) == expected_trigger_mode | ||
|
||
|
||
async def test_hints_from_hdf_writer(pilatus: PilatusDetector): | ||
assert pilatus.hints == {"fields": ["pilatus"]} | ||
|
||
|
||
async def test_unsupported_trigger_excepts(pilatus: PilatusDetector): | ||
with pytest.raises( | ||
ValueError, | ||
# str(EnumClass.value) handling changed in Python 3.11 | ||
match=r"PilatusController only supports the following trigger types: .* but", | ||
): | ||
await pilatus.prepare(TriggerInfo(1, DetectorTrigger.edge_trigger, 1, 1)) |