From 059f1a71a83b269162e327976c6ddedf03bd2b82 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 19 Jan 2017 11:08:49 -0500 Subject: [PATCH] bringing bluesky development up to date --- startup/10-motors.py | 14 +- startup/11-motors_utilities.py | 129 ++ startup/19-exp_shutter.py | 60 + startup/22-area-detector-utilities.py | 13 + startup/25-scalers.py | 111 +- startup/45-olog.py | 14 + startup/81-beam.py | 1298 ++++++++++++++++++-- startup/90-bluesky.py | 0 startup/91-fit_scan.py | 825 +++++++++++++ startup/94-sample.py | 1601 +++++++++++++++++++++++++ startup/96-automation.py | 346 ++++++ startup/98-user.py | 63 + 12 files changed, 4366 insertions(+), 108 deletions(-) create mode 100644 startup/11-motors_utilities.py create mode 100755 startup/19-exp_shutter.py create mode 100755 startup/22-area-detector-utilities.py mode change 100644 => 100755 startup/25-scalers.py create mode 100644 startup/45-olog.py mode change 100644 => 100755 startup/90-bluesky.py create mode 100644 startup/91-fit_scan.py create mode 100644 startup/94-sample.py create mode 100644 startup/96-automation.py create mode 100644 startup/98-user.py diff --git a/startup/10-motors.py b/startup/10-motors.py index 76ec313..66577a1 100755 --- a/startup/10-motors.py +++ b/startup/10-motors.py @@ -87,16 +87,23 @@ class Blades(Device): sth = EpicsMotor('XF:11BMB-ES{Chm:Smpl-Ax:theta}Mtr', name='sth') schi = EpicsMotor('XF:11BMB-ES{Chm:Smpl-Ax:chi}Mtr', name='schi') sphi = EpicsMotor('XF:11BMB-ES{Chm:Smpl-Ax:phi}Mtr', name='sphi') +srot = EpicsMotor('XF:11BMB-ES{SM:1-Ax:Srot}Mtr', name='srot') +strans = EpicsMotor('XF:11BMB-ES{SM:1-Ax:Strans}Mtr', name='strans') -## stages for sample camera +## stages for on-axis sample camera mirror/lens camx = EpicsMotor('XF:11BMB-ES{Cam:OnAxis-Ax:X1}Mtr', name='camx') camy = EpicsMotor('XF:11BMB-ES{Cam:OnAxis-Ax:Y1}Mtr', name='camy') +## stages for off-axis sample camera +cam2x = EpicsMotor('XF:11BMB-ES{Cam:OnAxis-Ax:X2}Mtr', name='cam2x') +cam2z = EpicsMotor('XF:11BMB-ES{Cam:OnAxis-Ax:Y2}Mtr', name='cam2z') + ## stages for sample exchanger armz = EpicsMotor('XF:11BMB-ES{SM:1-Ax:Z}Mtr', name='armz') armx = EpicsMotor('XF:11BMB-ES{SM:1-Ax:X}Mtr', name='armx') armphi = EpicsMotor('XF:11BMB-ES{SM:1-Ax:Yaw}Mtr', name='armphi') army = EpicsMotor('XF:11BMB-ES{SM:1-Ax:Y}Mtr', name='army') +armr = EpicsMotor('XF:11BMB-ES{SM:1-Ax:ArmR}Mtr', name='armr') ## stages for detectors DETx = EpicsMotor('XF:11BMB-ES{Det:Stg-Ax:X}Mtr', name='DETx') @@ -106,4 +113,9 @@ class Blades(Device): SAXSx = EpicsMotor('XF:11BMB-ES{Det:SAXS-Ax:X}Mtr', name='SAXSx') SAXSy = EpicsMotor('XF:11BMB-ES{Det:SAXS-Ax:Y}Mtr', name='SAXSy') +## stages for beamstops +bsx = EpicsMotor('XF:11BMB-ES{BS:SAXS-Ax:X}Mtr', name='bsx') +bsy = EpicsMotor('XF:11BMB-ES{BS:SAXS-Ax:Y}Mtr', name='bsy') +bsphi = EpicsMotor('XF:11BMB-ES{BS:SAXS-Ax:phi}Mtr', name='bsphi') + diff --git a/startup/11-motors_utilities.py b/startup/11-motors_utilities.py new file mode 100644 index 0000000..a689c86 --- /dev/null +++ b/startup/11-motors_utilities.py @@ -0,0 +1,129 @@ +def wh_all(): + wh_pos([mono_bragg,mono_pitch2,mono_roll2,mono_perp2]) + wh_pos([mir_usx,mir_dsx,mir_usy,mir_dsyi,mir_dsyo,mir_bend]) + wh_pos(s0) + wh_pos(s1) + wh_pos(s2) + wh_pos(s3) + wh_pos(s4) + wh_pos(s5) + wh_pos([bim3y,fs3y,bim4y,bim5y]) + wh_pos([smx,smy,sth,schi,sphi,srot,strans]) + wh_pos([camx,camy]) + wh_pos([DETx,DETy,WAXSx,SAXSx,SAXSy]) + wh_pos([bsx,bsy,bsphi]) + wh_pos([armz,armx,armphi,army,armr]) + + +def wh_offsets(): + print('Direction: 0--Pos, 1--Neg\n') + + ## mono + wh_pos([mono_bragg,mono_pitch2,mono_roll2,mono_perp2]) + print('mono_bragg: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mono:DMM-Ax:Bragg}Mtr.OFF'),caget('XF:11BMA-OP{Mono:DMM-Ax:Bragg}Mtr.DIR'))) + print('mono_pitch2: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mono:DMM-Ax:P2}Mtr.OFF'),caget('XF:11BMA-OP{Mono:DMM-Ax:P2}Mtr.DIR'))) + print('mono_roll2: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mono:DMM-Ax:R2}Mtr.OFF'),caget('XF:11BMA-OP{Mono:DMM-Ax:R2}Mtr.DIR'))) + print('mono_perp2: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mono:DMM-Ax:Y2}Mtr.OFF'),caget('XF:11BMA-OP{Mono:DMM-Ax:Y2}Mtr.DIR'))) + + ## mirror + wh_pos([mir_usx,mir_dsx,mir_usy,mir_dsyi,mir_dsyo,mir_bend]) + print('mir_usx: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mir:Tor-Ax:XU}Mtr.OFF'),caget('XF:11BMA-OP{Mir:Tor-Ax:XU}Mtr.DIR'))) + print('mir_dsx: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mir:Tor-Ax:XD}Mtr.OFF'),caget('XF:11BMA-OP{Mir:Tor-Ax:XD}Mtr.DIR'))) + print('mir_usy: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mir:Tor-Ax:YU}Mtr.OFF'),caget('XF:11BMA-OP{Mir:Tor-Ax:YU}Mtr.DIR'))) + print('mir_dsyi: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mir:Tor-Ax:YDI}Mtr.OFF'),caget('XF:11BMA-OP{Mir:Tor-Ax:YDI}Mtr.DIR'))) + print('mir_dsyo: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mir:Tor-Ax:YDO}Mtr.OFF'),caget('XF:11BMA-OP{Mir:Tor-Ax:YDO}Mtr.DIR'))) + print('mir_bend: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Mir:Tor-Ax:UB}Mtr.OFF'),caget('XF:11BMA-OP{Mir:Tor-Ax:UB}Mtr.DIR'))) + + ## slits S0 + wh_pos(s0) + print('s0.tp: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Slt:0-Ax:T}Mtr.OFF'),caget('XF:11BMA-OP{Slt:0-Ax:T}Mtr.DIR'))) + print('s0.bt: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Slt:0-Ax:B}Mtr.OFF'),caget('XF:11BMA-OP{Slt:0-Ax:B}Mtr.DIR'))) + print('s0.ob: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Slt:0-Ax:O}Mtr.OFF'),caget('XF:11BMA-OP{Slt:0-Ax:O}Mtr.DIR'))) + print('s0.ib: offset = %f, direction = %d' % (caget('XF:11BMA-OP{Slt:0-Ax:I}Mtr.OFF'),caget('XF:11BMA-OP{Slt:0-Ax:I}Mtr.DIR'))) + + ## slits S1 + wh_pos(s1) + print('s1.xc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:1-Ax:XC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:1-Ax:XC}Mtr.DIR'))) + print('s1.xg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:1-Ax:XG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:1-Ax:XG}Mtr.DIR'))) + print('s1.yc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:1-Ax:YC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:1-Ax:YC}Mtr.DIR'))) + print('s1.yg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:1-Ax:YG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:1-Ax:YG}Mtr.DIR'))) + + ## slits S2 + wh_pos(s2) + print('s2.xc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:2-Ax:XC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:2-Ax:XC}Mtr.DIR'))) + print('s2.xg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:2-Ax:XG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:2-Ax:XG}Mtr.DIR'))) + print('s2.yc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:2-Ax:YC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:2-Ax:YC}Mtr.DIR'))) + print('s2.yg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:2-Ax:YG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:2-Ax:YG}Mtr.DIR'))) + + ## slits S3 + wh_pos(s3) + print('s3.xc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:3-Ax:XC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:3-Ax:XC}Mtr.DIR'))) + print('s3.xg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:3-Ax:XG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:3-Ax:XG}Mtr.DIR'))) + print('s3.yc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:3-Ax:YC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:3-Ax:YC}Mtr.DIR'))) + print('s3.yg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:3-Ax:YG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:3-Ax:YG}Mtr.DIR'))) + + ## slits S4 + wh_pos(s4) + print('s4.xc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:4-Ax:XC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:4-Ax:XC}Mtr.DIR'))) + print('s4.xg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:4-Ax:XG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:4-Ax:XG}Mtr.DIR'))) + print('s4.yc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:4-Ax:YC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:4-Ax:YC}Mtr.DIR'))) + print('s4.yg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:4-Ax:YG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:4-Ax:YG}Mtr.DIR'))) + + ## slits S5 + wh_pos(s5) + print('s5.xc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:5-Ax:XC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:5-Ax:XC}Mtr.DIR'))) + print('s5.xg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:5-Ax:XG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:5-Ax:XG}Mtr.DIR'))) + print('s5.yc: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:5-Ax:YC}Mtr.OFF'),caget('XF:11BMB-OP{Slt:5-Ax:YC}Mtr.DIR'))) + print('s5.yg: offset = %f, direction = %d' % (caget('XF:11BMB-OP{Slt:5-Ax:YG}Mtr.OFF'),caget('XF:11BMB-OP{Slt:5-Ax:YG}Mtr.DIR'))) + + + ## diagnostic stages + wh_pos([bim3y,fs3y,bim4y,bim5y]) + print('bim3y: offset = %f, direction = %d' % (caget('XF:11BMB-BI{IM:3-Ax:Y}Mtr.OFF'),caget('XF:11BMB-BI{IM:3-Ax:Y}Mtr.DIR'))) + print('fs3y: offset = %f, direction = %d' % (caget('XF:11BMB-BI{FS:3-Ax:Y}Mtr.OFF'),caget('XF:11BMB-BI{FS:3-Ax:Y}Mtr.DIR'))) + print('bim4y: offset = %f, direction = %d' % (caget('XF:11BMB-BI{IM:4-Ax:Y}Mtr.OFF'),caget('XF:11BMB-BI{IM:4-Ax:Y}Mtr.DIR'))) + print('bim5y: offset = %f, direction = %d' % (caget('XF:11BMB-BI{IM:5-Ax:Y}Mtr.OFF'),caget('XF:11BMB-BI{IM:5-Ax:Y}Mtr.DIR'))) + + + ## sample stages + wh_pos([smx,smy,sth,schi,sphi,srot,strans]) + print('smx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Chm:Smpl-Ax:X}Mtr.OFF'),caget('XF:11BMB-ES{Chm:Smpl-Ax:X}Mtr.DIR'))) + print('smy: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Chm:Smpl-Ax:Z}Mtr.OFF'),caget('XF:11BMB-ES{Chm:Smpl-Ax:Z}Mtr.DIR'))) + print('sth: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Chm:Smpl-Ax:theta}Mtr.OFF'),caget('XF:11BMB-ES{Chm:Smpl-Ax:theta}Mtr.DIR'))) + print('schi: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Chm:Smpl-Ax:chi}Mtr.OFF'),caget('XF:11BMB-ES{Chm:Smpl-Ax:chi}Mtr.DIR'))) + print('sphi: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Chm:Smpl-Ax:phi}Mtr.OFF'),caget('XF:11BMB-ES{Chm:Smpl-Ax:phi}Mtr.DIR'))) + print('srot: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:Srot}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:Srot}Mtr.DIR'))) + print('strans: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:Strans}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:Strans}Mtr.DIR'))) + + + ## camera + wh_pos([camx,camy]) + print('camx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Cam:OnAxis-Ax:X1}Mtr.OFF'),caget('XF:11BMB-ES{Cam:OnAxis-Ax:X1}Mtr.DIR'))) + print('camy: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Cam:OnAxis-Ax:Y1}Mtr.OFF'),caget('XF:11BMB-ES{Cam:OnAxis-Ax:Y1}Mtr.OFF'))) + + + ## detector stages + wh_pos([DETx,DETy,WAXSx,SAXSx,SAXSy]) + print('DETx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Det:Stg-Ax:X}Mtr.OFF'),caget('XF:11BMB-ES{Det:Stg-Ax:X}Mtr.DIR'))) + print('DETy: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Det:Stg-Ax:Y}Mtr.OFF'),caget('XF:11BMB-ES{Det:Stg-Ax:Y}Mtr.DIR'))) + print('WAXSx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Det:WAXS-Ax:X}Mtr.OFF'),caget('XF:11BMB-ES{Det:WAXS-Ax:X}Mtr.DIR'))) + print('SAXSx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Det:SAXS-Ax:X}Mtr.OFF'),caget('XF:11BMB-ES{Det:SAXS-Ax:X}Mtr.DIR'))) + print('SAXSy: offset = %f, direction = %d' % (caget('XF:11BMB-ES{Det:SAXS-Ax:Y}Mtr.OFF'),caget('XF:11BMB-ES{Det:SAXS-Ax:Y}Mtr.DIR'))) + + + ## beamstop + wh_pos([bsx,bsy,bsphi]) + print('bsx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{BS:SAXS-Ax:X}Mtr.OFF'),caget('XF:11BMB-ES{BS:SAXS-Ax:X}Mtr.DIR'))) + print('bsy: offset = %f, direction = %d' % (caget('XF:11BMB-ES{BS:SAXS-Ax:Y}Mtr.OFF'),caget('XF:11BMB-ES{BS:SAXS-Ax:Y}Mtr.DIR'))) + print('bsphi: offset = %f, direction = %d' % (caget('XF:11BMB-ES{BS:SAXS-Ax:phi}Mtr.OFF'),caget('XF:11BMB-ES{BS:SAXS-Ax:phi}Mtr.DIR'))) + + + ## sample exchanger + wh_pos([armz,armx,armphi,army,armr]) + print('armz: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:Z}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:Z}Mtr.DIR'))) + print('armx: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:X}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:X}Mtr.DIR'))) + print('armphi: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:Yaw}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:Yaw}Mtr.DIR'))) + print('army: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:Y}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:Y}Mtr.DIR'))) + print('armr: offset = %f, direction = %d' % (caget('XF:11BMB-ES{SM:1-Ax:ArmR}Mtr.OFF'),caget('XF:11BMB-ES{SM:1-Ax:ArmR}Mtr.DIR'))) + + diff --git a/startup/19-exp_shutter.py b/startup/19-exp_shutter.py new file mode 100755 index 0000000..b8fb4fc --- /dev/null +++ b/startup/19-exp_shutter.py @@ -0,0 +1,60 @@ + + +##### Experimental shutters ##### +# These shutters are controlled by sending a 5V pulse via QEM output on the Delta Tau controller MC06 +# (the same unit that controls slits S5). Both the opening and closing of the shutter are triggered +# by the rise of the pulse. +# +# Note: +# - PV for the QEM output on MC06 is: +# XF:11BMB-CT{MC:06}Asyn.AOUT +# - This PV is located under Slit 5/Asyn --> asynRecord/More... --> asynOctet interface I/O --> ASCII +# - 'M112=1' sets the state to high +# - 'M112=0' sets the state to low +# - 'M111=1' launches the change in state +# - A sleep time of ~2 ms between successive caput commands is needed to get proper response; 1 ms is too short +##### + +#global xshutter_state +xshutter_state=0 ## TODO: read the shutter state and set this accordingly + +## Open shutter +def xshutter_trigger(): + sleep_time = 0.005 + caput('XF:11BMB-CT{MC:06}Asyn.AOUT','M112=1') + sleep(sleep_time) + caput('XF:11BMB-CT{MC:06}Asyn.AOUT','M111=1') + sleep(sleep_time) + caput('XF:11BMB-CT{MC:06}Asyn.AOUT','M112=0') + sleep(sleep_time) + caput('XF:11BMB-CT{MC:06}Asyn.AOUT','M111=1') + + +def xshutter(inout,q=0): + global xshutter_state + + if inout=='o' or inout=='open' or inout==1: + if xshutter_state==0: + xshutter_trigger() + xshutter_state = 1 + if q==0: + print('Experimental shutter opened') + return(xshutter_state) + elif xshutter_state==1: + print('Experimental shutter is already open; no changes made') + else: + print('xshutter_state is neither 0 nor 1; no changes made') + + if inout=='c' or inout=='close' or inout==0: + if xshutter_state==1: + xshutter_trigger() + xshutter_state = 0 + if q==0: + print('Experimental shutter closed') + return(xshutter_state) + elif xshutter_state==0: + print('Experimental shutter is already closed; no changes made') + else: + print('xshutter_state is neither 0 nor 1; no changes made') + + diff --git a/startup/22-area-detector-utilities.py b/startup/22-area-detector-utilities.py new file mode 100755 index 0000000..3656828 --- /dev/null +++ b/startup/22-area-detector-utilities.py @@ -0,0 +1,13 @@ + +def xp_set(seconds): +# sleep_time=0.002 + caput('XF:11BMB-ES{Det:SAXS}:cam1:AcquireTime',seconds) +# sleep(sleep_time) + caput('XF:11BMB-ES{Det:SAXS}:cam1:AcquirePeriod',seconds+0.1) + +def xp(seconds): + sleep_time=0.1 + caput('XF:11BMB-ES{Det:SAXS}:cam1:Acquire',1) + sleep(seconds+sleep_time) + + diff --git a/startup/25-scalers.py b/startup/25-scalers.py old mode 100644 new mode 100755 index a6be0f4..ca54b86 --- a/startup/25-scalers.py +++ b/startup/25-scalers.py @@ -1,12 +1,115 @@ import ophyd -quad_electrometer1 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current1:MeanValue_RBV", name='quad_electrometer1') -quad_electrometer2 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current2:MeanValue_RBV", name='quad_electrometer2') -quad_electrometer3 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current3:MeanValue_RBV", name='quad_electrometer3') -quad_electrometer4 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current4:MeanValue_RBV", name='quad_electrometer4') +##### FOE ##### +quad_electrometer1_1 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current1:MeanValue_RBV", name='quad_electrometer1_1') +quad_electrometer1_2 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current2:MeanValue_RBV", name='quad_electrometer1_2') +quad_electrometer1_3 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current3:MeanValue_RBV", name='quad_electrometer1_3') +quad_electrometer1_4 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current4:MeanValue_RBV", name='quad_electrometer1_4') +bim1 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current1:MeanValue_RBV", name='bim1') +bim2 = ophyd.EpicsSignalRO("XF:11BMA-BI{IM:1}EM180:Current2:MeanValue_RBV", name='bim2') + + +##### Endstation ##### +## TODO: fix 'precision' and 'units' at EPICS level ion_chamber1 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:3}:IC1_MON", name='ion_chamber1') ion_chamber2 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:3}:IC2_MON", name='ion_chamber2') ion_chamber3 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:3}:IC3_MON", name='ion_chamber3') ion_chamber4 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:3}:IC4_MON", name='ion_chamber4') + +class ScaleSignal(ophyd.signal.DerivedSignal): + def __init__(self, *args, factor, **kwargs): + super().__init__(*args, **kwargs) + self._factor = factor + def inverse(self, value): + return self._factor * value + def forward(self, value): + return value / self._factor + def describe(self): + desc = super().describe() + wd = desc[self.name] + wd['derived_type'] = 'ScaleSignal' + wd['factor'] = self._factor + return desc + +scaled_ic1 = ScaleSignal(ion_chamber1, factor=1e9, name='scaled_ic1') +scaled_ic2 = ScaleSignal(ion_chamber2, factor=1e9, name='scaled_ic2') +scaled_ic3 = ScaleSignal(ion_chamber3, factor=1e9, name='scaled_ic3') +scaled_ic4 = ScaleSignal(ion_chamber4, factor=1e9, name='scaled_ic4') + + +quad_electrometer2_1 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV", name='quad_electrometer2_1') +quad_electrometer2_2 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:2}EM180:Current2:MeanValue_RBV", name='quad_electrometer2_2') +quad_electrometer2_3 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:2}EM180:Current3:MeanValue_RBV", name='quad_electrometer2_3') +quad_electrometer2_4 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:2}EM180:Current4:MeanValue_RBV", name='quad_electrometer2_4') + + +# bim6 is the monitor after the sample (called dsmon on X9) +# The monitor sits on an arm on the DET system, so it can be moved with DETx and DETy +#bim6 = ophyd.EpicsSignalRO("XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV", name='bim6') +class EpicsSignalROWait(ophyd.EpicsSignalRO): + '''Customized version of EpicsSignal that has a 'wait_time' in the 'read()' + function. This can be used for signals that need some time to settle before + a value is read.''' + + def __init__(self, *args, wait_time=None, **kwargs): + + if wait_time is not None: + self._wait_time = wait_time + else: + self._wait_time = 0 + + super().__init__(*args, **kwargs) + + def read(self, *args, **kwargs): + + #print('waiting {} s'.format(self._wait_time)) + sleep(self._wait_time) + return super().read(*args, **kwargs) + + +bim6 = EpicsSignalROWait("XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV", wait_time=1, name='bim6') + +class EpicsSignalROIntegrate(ophyd.EpicsSignalRO): + '''Customized version of EpicsSignal that has manually integrates (averages + a few values). This can be used for signals that are otherwise too erratic.''' + + def __init__(self, *args, wait_time=None, integrate_num=1, integrate_delay=0.01, **kwargs): + + if wait_time is not None: + self._wait_time = wait_time + else: + self._wait_time = 0 + + self._integrate_num = integrate_num + self._integrate_delay = integrate_delay + + super().__init__(*args, **kwargs) + + + def read(self, *args, **kwargs): + + #print('waiting {} s'.format(self._wait_time)) + sleep(self._wait_time) + + value = 0.0 + num = 0.0 + for i in range(self._integrate_num): + value_current = super().read(*args, **kwargs)[self.name]['value'] + #print(value_current) + value += value_current + num += 1.0 + sleep(self._integrate_delay) + + value /= num + + ret = super().read(*args, **kwargs) + ret[self.name]['value'] = value + + return ret + + +bim6_integrating = EpicsSignalROIntegrate("XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV", wait_time=0.5, integrate_num=8, integrate_delay=0.1, name='bim6') + + diff --git a/startup/45-olog.py b/startup/45-olog.py new file mode 100644 index 0000000..ba8dd7b --- /dev/null +++ b/startup/45-olog.py @@ -0,0 +1,14 @@ +from functools import partial +from pyOlog import SimpleOlogClient +from bluesky.callbacks.olog import logbook_cb_factory + +# Set up the logbook. This configures bluesky's summaries of +# data acquisition (scan type, ID, etc.). + +LOGBOOKS = ['Data Acquisition'] # list of logbook names to publish to +simple_olog_client = SimpleOlogClient() +generic_logbook_func = simple_olog_client.log +configured_logbook_func = partial(generic_logbook_func, logbooks=LOGBOOKS) + +cb = logbook_cb_factory(configured_logbook_func) +RE.subscribe('start', cb) diff --git a/startup/81-beam.py b/startup/81-beam.py index 7cbeaa5..847b398 100644 --- a/startup/81-beam.py +++ b/startup/81-beam.py @@ -39,13 +39,20 @@ class BeamlineElement(object): + '''Defines a component of the beamline that (may) intersect the x-ray beam.''' - def __init__(self, name, zposition, description="", **args): + def __init__(self, name, zposition, description="", pv=None, **args): self.name = name self.zposition = zposition self.description = description + self.conversion_factor = 1 + + self._pv_main = pv + + self.has_flux = True + def state(self): """ @@ -59,14 +66,20 @@ def state(self): return "out" - def transmission(self): + def transmission(self, verbosity=0): """ Returns the predicted transmission of this beamline element, based on its current state. """ + tr_tot = 1.0 + + if verbosity>=2: + print('{:s} transmission = {:.6g}'.format(self.name, tr_tot)) + + # Assume a generic beamline element doesn't block/perturb the beam - return 1.0 + return tr_tot def flux(self, verbosity=3): @@ -75,38 +88,105 @@ def flux(self, verbosity=3): flux = self.conversion_factor*reading # ph/s if verbosity>=2: - print('flux = {:.4g} ph/s') + print('flux = {:.4g} ph/s'.format(flux)) - return flux + return flux + + class Shutter(BeamlineElement): - pass + # Example + # XF:11BMA-PPS{PSh}Enbl-Sts + # Status: XF:11BMA-PPS{PSh}Pos-Sts 0 for open, 1 for close + # Open: XF:11BMA-PPS{PSh}Cmd:Opn-Cmd + # Close: XF:11BMA-PPS{PSh}Cmd:Cls-Cmd + + def __init__(self, name, zposition, description="", pv=None, **args): + + super().__init__(name=name, zposition=zposition, description=description, pv=pv, **args) + self.has_flux = False + + + def state(self): + """ + Returns the current state of the beamline element. Common states: + out - Element is out of the way of the beam (and should not be blocking). + in - Element is in the beam (but should not be blocking). + block - Element is in the beam, and should be blocking the beam. + undefined - Element is in an unexpected state. + """ + + state_n = caget(self._pv_main+'Pos-Sts') + + if state_n is 0: + return "out" + elif state_n is 1: + return "block" + else: + return "undefined" + + + def open(self, verbosity=3): + + if verbosity>=3: + print('Opening {:s}...'.format(self.name)) + + # E.g. #XF:11BMB-VA{Slt:4-GV:1}Cmd:Opn-Cmd + pv = self._pv_main + 'Cmd:Opn-Cmd' + #caput(pv, 1) # TODO: Test this. + + def close(self, verbosity=3): + + if verbosity>=3: + print('Closing {:s}...'.format(self.name)) + + pv = self._pv_main + 'Cmd:Cls-Cmd' + #caput(pv, 1) # TODO: Test this. + -class Monitor(BeamlineElement): - - pass -class GateValve(BeamlineElement): +class GateValve(Shutter): - def open(self, verbosity=3): - pass + # Example + # Status: XF:11BMB-VA{Slt:4-GV:1}Pos-Sts 1 for open, 0 for close + # Open: XF:11BMB-VA{Slt:4-GV:1}Cmd:Opn-Cmd + # Close: XF:11BMB-VA{Slt:4-GV:1}Cmd:Cls-Cmd + + + def state(self): + """ + Returns the current state of the beamline element. Common states: + out - Element is out of the way of the beam (and should not be blocking). + in - Element is in the beam (but should not be blocking). + block - Element is in the beam, and should be blocking the beam. + undefined - Element is in an unexpected state. + """ + + state_n = caget(self._pv_main+'Pos-Sts') + + if state_n is 1: + return "out" + elif state_n is 0: + return "block" + else: + return "undefined" - def close(self, verbosity=3): - pass class ThreePoleWiggler(BeamlineElement): def __init__(self, name='3PW', zposition=0.0, description='Three-pole wiggler source of x-rays', **args): + + super().__init__(name=name, zposition=zposition, description=description, **args) + # TODO: Find out the right conversion factor self.conversion_factor = 3e18/500.0 #(ph/s)/mA - super().__init__(name=name, zposition=zposition, description=description, **args) def state(self): """ @@ -149,19 +229,301 @@ def reading(self, verbosity=3): return 0 +class Monitor(BeamlineElement): + + def quickReading(self, verbosity=3, delay=1.0): + """ + Puts the diagnostic into the beam, takes a reading, and removes the + diagnostic. + """ + + self.insert() + sleep(delay) + value = self.reading(verbosity=verbosity) + self.retract() + sleep(delay) + return value + + + +class DiagnosticScreen(Monitor): + + #XF:11BMB-BI{FS:4}Pos-Sts + + def __init__(self, name, zposition, description="", pv=None, epics_signal=None, **args): + + super().__init__(name=name, zposition=zposition, description=description, pv=pv, **args) + self.epics_signal = epics_signal + self.has_flux = False + + + def state(self): + """ + Returns the current state of the beamline element. Common states: + out - Element is out of the way of the beam (and should not be blocking). + in - Element is in the beam (but should not be blocking). + block - Element is in the beam, and should be blocking the beam. + undefined - Element is in an unexpected state. + """ + + state_n = caget(self._pv_main+'Pos-Sts') + + if state_n is 0: + return "out" + elif state_n is 1: + return "block" + else: + return "undefined" + + + def insert(self, verbosity=3): + + if verbosity>=3: + print('Inserting {:s}...'.format(self.name)) + + # E.g. #XF:11BMB-VA{Slt:4-GV:1}Cmd:Opn-Cmd + pv = self._pv_main + 'Cmd:In-Cmd' + caput(pv, 1) + + def retract(self, verbosity=3): + + if verbosity>=3: + print('Retracting {:s}...'.format(self.name)) + + pv = self._pv_main + 'Cmd:Out-Cmd' + caput(pv, 1) + + + def reading(self, verbosity=3): + + value = self.epics_signal.stats1.total.value + + if self.state() is 'block': + + ring_current = caget('SR:OPS-BI{DCCT:1}I:Real-I') + if verbosity>=2: + print('{:s} is inserted; reading = {:.4g}'.format(self.name, value)) + + return value + + else: + if verbosity>=2: + print('{:s} is not inserted.'.format(self.name)) + + return 0 + + + + +class PointDiode_CMS(Monitor): + + def __init__(self, name='bim6 point diode', zposition=58.3, description="Bar holding a point-diode, downstream of sample.", pv='XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV', epics_signal=None, **args): + + super().__init__(name=name, zposition=zposition, description=description, pv=pv, **args) + self.has_flux = True + + if epics_signal is None: + + #bim6 = EpicsSignalROWait("XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV", wait_time=1, name='bim6') + #bim6_integrating = EpicsSignalROIntegrate("XF:11BMB-BI{IM:2}EM180:Current1:MeanValue_RBV", wait_time=0.5, integrate_num=8, integrate_delay=0.1, name='bim6') + + self.epics_signal = bim6_integrating + + else: + self.epics_signal = epics_signal + + + # The beam (at the ion chamber) is roughly 0.50x0.50 mm. + # If we slit down to 0.20x0.05 mm, we are capturing 0.4*0.25 = 0.1 of the beam. + # bim6 reads 70000 cts (of course this depends on settings) when ion chamber reads 1.3e11 ph/s. + # (settings: trans = 5e-4) + # So conversion_factor is roughly: + self.conversion_factor = 1.3e11*0.1/70000. # (ph/s)/cts + + self.in_position_x = 0.0 + self.in_position_y = 0.0 + + self.out_position_x = 0.0 + self.out_position_y = -16.0 + + self.position_tolerance = 0.1 + + + def state(self): + """ + Returns the current state of the beamline element. Common states: + out - Element is out of the way of the beam (and should not be blocking). + in - Element is in the beam (but should not be blocking). + block - Element is in the beam, and should be blocking the beam. + undefined - Element is in an unexpected state. + """ + + position_x = DETx.user_readback.value + position_y = DETy.user_readback.value + + if abs(position_x-self.out_position_x)=3: + print('Inserting {:s}...'.format(self.name)) + + mov( [DETx, DETy], [self.in_position_x, self.in_position_y] ) + + + def retract(self, verbosity=3): + + if verbosity>=3: + print('Retracting {:s}...'.format(self.name)) + + mov( [DETx, DETy], [self.out_position_x, self.out_position_y] ) + + + def reading(self, verbosity=3): + + value = self.epics_signal.read()[self.epics_signal.name]['value'] + + if self.state() is 'block': + + if verbosity>=2: + print('{:s} is inserted; reading = {:.4g}'.format(self.name, value)) + + return value + + else: + if verbosity>=2: + print('{:s} is not inserted.'.format(self.name)) + + return value + + + +class IonChamber_CMS(Monitor): + + def __init__(self, name='bim3 ionchamber', zposition=49, description="Ion chamber (FMB Oxford I404) at start of endstation hutch", pv=None, beam=None, **args): + + super().__init__(name=name, zposition=zposition, description=description, pv=pv, **args) + self.has_flux = True + + self.beam = beam + + # PVs + import epics + self.v1 = epics.PV('XF:11BMB-BI{IM:3}:IC1_MON') + self.v2 = epics.PV('XF:11BMB-BI{IM:3}:IC2_MON') + self.h1 = epics.PV('XF:11BMB-BI{IM:3}:IC3_MON') + self.h2 = epics.PV('XF:11BMB-BI{IM:3}:IC4_MON') + + + def state(self): + + return "in" + + + def v_position(self): + + total = self.v1.value+self.v2.value + if total>0: + return (self.v1.value-self.v2.value)/(total) + else: + return 0 + + def h_position(self): + + total = self.h1.value+self.h2.value + if total>0: + return (self.h1.value-self.h2.value)/(total) + else: + return 0 + + def reading(self, verbosity=3): + + total = self.h1.value + self.h2.value + self.v1.value + self.v2.value + + if verbosity>=3: + print('Reading for {:s} ({:s})'.format(self.name, self.description)) + print(' Horizontal: {:9.4g} + {:9.4g} = {:9.4g}'.format(self.h1.value, self.h2.value, self.h1.value+self.h2.value)) + print(' position: {:.3f}'.format(self.h_position())) + print(' Vertical: {:9.4g} + {:9.4g} = {:9.4g}'.format(self.v1.value, self.v2.value, self.v1.value+self.v2.value)) + print(' position: {:.3f}'.format(self.v_position())) + + if verbosity>=2: + + print(' Total: {:9.4g}'.format(total)) + + return total + + + def current_to_flux(self, current): + + energy_keV = self.beam.energy(verbosity=0) + + V_ion = 0.036 ## ionization energy of N2 gas in [keV] + IC_len = 6.0 ## active length of Ion Chamber in [cm] + qe = 1.602e-19 ## electron charge in [C] + + ## Absorption length [cm] of gas N2 (1 atm, 1.131 g/L) vs E [keV] + # based on polynomial fit to the calculated abs length data from: henke.lbl.gov/optical_constants/atten2.html + # see /home/xf11bm/masa/atten_len_N2* + abs_len = 355.21 - 112.26*energy_keV + 11.200*np.square(energy_keV) - 0.10611*np.power(energy_keV,3.0) + + N_abs = current*V_ion/(qe*energy_keV) + flux = N_abs / (1.0 - np.exp(-IC_len/abs_len)) + + return flux + + + def flux(self, verbosity=3): + + if self.reading(verbosity=0) < 5e-10: + return 0.0 + + h1 = self.current_to_flux(self.h1.value) + h2 = self.current_to_flux(self.h2.value) + h_total = h1 + h2 + v1 = self.current_to_flux(self.v1.value) + v2 = self.current_to_flux(self.v2.value) + v_total = v1 + v2 + + total = h_total + v_total + avg = total*0.5 + + if verbosity>=3: + print('Flux for {:s} ({:s})'.format(self.name, self.description)) + print(' Horizontal: {:9.4g} + {:9.4g} = {:9.4g} ph/s'.format(h1, h2, h1+h2)) + print(' position: {:.3f}'.format(self.h_position())) + print(' Vertical: {:9.4g} + {:9.4g} = {:9.4g} ph/s'.format(v1, v2, v1+v2)) + print(' position: {:.3f}'.format(self.v_position())) + + if verbosity>=2: + + print(' Average: {:9.4g} ph/s'.format(avg)) + + return avg + + + +#ionchamber = IonChamber_CMS(beam=beam) + # CMSBeam ################################################################################ -# This class represents the 'beam' at the beamline. This collects together aspects -# of querying or changing beam properties, including the energy (or wavelength), -# the beam intensity (or measuring flux), and so forth. class CMSBeam(object): + """ + This class represents the 'beam' at the beamline. This collects together aspects + of querying or changing beam properties, including the energy (or wavelength), + the beam intensity (or measuring flux), and so forth. + """ def __init__(self): - self.mono_bragg_pv = 'XF:11BMA-OP{Mono:DMM-Ax:Bragg}Mtr.RBV' # (planck constant * speed of light)/(electronic charge) @@ -172,10 +534,84 @@ def __init__(self): self.dmm_dsp = 20.1 # Angstroms + + self.mono = BeamlineElement('monochromator', 27.0) + def transmission(verbosity=0): + return 1e-7 + self.mono.transmission = transmission + + + self.attenuator = BeamlineElement('attenuator', 50.0, description="Attenuator/filter box") + self.attenuator.has_flux = False + def reading(verbosity=0): + return self.transmission(verbosity=verbosity) + self.attenuator.reading = reading + self.attenuator.transmission = self.transmission + + if False: + self.fs1 = DiagnosticScreen( 'fs1', 27.5, pv='XF:11BMA-BI{FS:1}', epics_signal=StandardProsilica('XF:11BMA-BI{FS:1-Cam:1}', name='fs1') ) + #self.fs2 = DiagnosticScreen( 'fs2', 29.1, pv='XF:11BMA-BI{FS:2}', epics_signal=StandardProsilica('XF:11BMA-BI{FS:2-Cam:1}', name='fs2') ) + self.fs3 = DiagnosticScreen( 'fs3', 54.0, pv='XF:11BMB-BI{FS:3}', epics_signal=StandardProsilica('XF:11BMB-BI{FS:3-Cam:1}', name='fs3') ) + self.fs4 = DiagnosticScreen( 'fs4', 56.0, pv='XF:11BMB-BI{FS:4}', epics_signal=StandardProsilica('XF:11BMB-BI{FS:4-Cam:1}', name='fs4') ) + self.fs5 = DiagnosticScreen( 'fs5', 70.0, pv='XF:11BMB-BI{FS:Test-Cam:1}', epics_signal=StandardProsilica('XF:11BMB-BI{FS:4-Cam:1}', name='fs5') ) + else: + # Rely on the fact that these are defined in 20-area-detectors.py + self.fs1 = DiagnosticScreen( 'fs1', 27.5, pv='XF:11BMA-BI{FS:1}', epics_signal=fs1 ) + #self.fs2 = DiagnosticScreen( 'fs2', 29.1, pv='XF:11BMA-BI{FS:2}', epics_signal=fs2 ) + self.fs3 = DiagnosticScreen( 'fs3', 54.0, pv='XF:11BMB-BI{FS:3}', epics_signal=fs3 ) + self.fs4 = DiagnosticScreen( 'fs4', 56.0, pv='XF:11BMB-BI{FS:4}', epics_signal=fs4 ) + self.fs5 = DiagnosticScreen( 'fs5', 70.0, pv='XF:11BMB-BI{FS:Test-Cam:1}', epics_signal=fs5 ) + + + self.bim3 = IonChamber_CMS(beam=self) + self.beam_defining_slit = s4 + self.bim6 = PointDiode_CMS() + + self.GVdsbig = GateValve('GV ds big', 59.0, pv='XF:11BMB-VA{Chm:Det-GV:1}') + + + self.elements = [] + + # Front End self.elements.append(ThreePoleWiggler()) - self.elements.append(BeamlineElement('test02', 10.0)) - self.elements.append(BeamlineElement('test01', 5.0)) + #SR:C03-EPS{PLC:1}Sts:BM_BMPS_Opn-Sts BMPS + self.elements.append(GateValve('GV1', 20.0, pv='FE:C03A-VA{GV:1}DB:')) + self.elements.append(GateValve('GV2', 21.0, pv='FE:C03A-VA{GV:2}DB:')) + + + # FOE + self.elements.append(Shutter('FE shutter', 25.0, pv='XF:11BM-PPS{Sh:FE}')) + self.elements.append(GateValve('GV', 26.0, pv='FE:C11B-VA{GV:2}')) + self.elements.append(self.mono) + self.elements.append(self.fs1) + # bim1 + # slit0 + # bim2 + self.elements.append(GateValve('GV', 28.0, pv='XF:11BMA-VA{Slt:0-GV:1}')) + self.elements.append(BeamlineElement('mirror', 29.0)) + self.elements.append(GateValve('GV', 29.0, pv='XF:11BMA-VA{Mir:Tor-GV:1}')) + self.elements.append(BeamlineElement('fs2 (manual)', 29.1)) # self.elements.append(self.fs2) + self.elements.append(Shutter('photon shutter', 30.0, pv='XF:11BMA-PPS{PSh}')) + self.elements.append(GateValve('GV', 30.1, pv='XF:11BMA-VA{PSh:1-GV:1}')) + + # Endstation + self.elements.append(self.bim3) + # Experimental shutter 49.5 + self.elements.append(self.attenuator) + self.elements.append(self.fs3) + self.elements.append(BeamlineElement('KB mirrors', 55.0)) + self.elements.append(self.fs4) + # im4 + #self.elements.append(GateValve('GV us small', 57.0, pv='XF:11BMB-VA{Slt:4-GV:1}')) + + + self.elements.append(BeamlineElement('sample', 58.0)) + self.elements.append(self.bim6) # dsmon + self.elements.append(BeamlineElement('WAXS detector', 58.4)) + self.elements.append(self.GVdsbig) + self.elements.append(BeamlineElement('SAXS detector', 58+5)) + # Sort by position along the beam @@ -235,7 +671,7 @@ def wavelength(self, verbosity=3): return wavelength_A - def set_energy(self, energy_keV, verbosity=3): + def setEnergy(self, energy_keV, verbosity=3): """ Set the x-ray beam to the specified energy (by changing the monochromator angle. @@ -245,36 +681,237 @@ def set_energy(self, energy_keV, verbosity=3): wavelength_m = self.hc_over_e/energy_eV wavelength_A = wavelength_m*1.e10 - self.set_wavelength(wavelength_A, verbosity=verbosity) + self.setWavelength(wavelength_A, verbosity=verbosity) + + return self.energy(verbosity=0) + + + def setWavelength(self, wavelength_A, verbosity=3): + """ + Set the x-ray beam to the specified wavelength (by changing the + monochromator angle. + """ + + Bragg_deg_initial = caget(self.mono_bragg_pv) + wavelength_m = wavelength_A*1.e-10 + Bragg_rad = np.arcsin(wavelength_A/(2.*self.dmm_dsp)) + Bragg_deg = np.degrees(Bragg_rad) + + print('mono_bragg will move to {:.4f}g deg'.format(Bragg_deg)) + response = input(' Are you sure? (y/[n]) ') + if response is 'y' or response is 'Y': + + mov(mono_bragg, Bragg_deg) + + if verbosity>=1: + print('mono_bragg moved from {.4f} deg to {.4f} deg'.format(Bragg_deg_initial, Bragg_deg)) + + elif verbosity>=1: + print('No move was made.') + + return self.wavelength(verbosity=verbosity) + + + # Slits + ######################################## + + def size(self, verbosity=3): + """ + Returns the current beam size (rough estimate). + The return is (size_horizontal, size_vertical) (in mm). + """ + size_h = self.beam_defining_slit.xg.user_readback.value + size_v = self.beam_defining_slit.yg.user_readback.value + + if verbosity>=3: + print('Beam size:') + print(' horizontal = {:.3f} mm'.format(size_h)) + print(' vertical = {:.3f} mm'.format(size_v)) + + return size_h, size_v + + + def setSize(self, horizontal, vertical, verbosity=3): + """ + Sets the beam size. + """ + + h, v = self.size(verbosity=0) + + if verbosity>=3: + print('Changing horizontal beam size from {:.3f} mm to {:.3f} mm'.format(h, horizontal)) + self.beam_defining_slit.xg.user_setpoint.value = horizontal + + if verbosity>=3: + print('Changing vertical beam size from {:.3f} mm to {:.3f} mm'.format(v, vertical)) + + self.beam_defining_slit.yg.user_setpoint.value = vertical + + + def divergence(self, verbosity=3): + """ + Returns the beamline divergence. + This is based on the Front End (FE) slits. The return is + (horizontal, vertical) (in mrad). + """ + + distance_m = 10.0 # distance from source to slits + + horizontal_mm = caget('FE:C11B-OP{Slt:12-Ax:X}t2.C') + vertical_mm = caget('FE:C11B-OP{Slt:12-Ax:Y}t2.C') + + horizontal_mrad = horizontal_mm/distance_m + vertical_mrad = vertical_mm/distance_m + + if verbosity>=3: + print('Beam divergence:') + print(' horizontal = {:.3f} mrad'.format(horizontal_mrad)) + print(' vertical = {:.3f} mrad'.format(vertical_mrad)) + + return horizontal_mrad, vertical_mrad + + + def setDivergence(self, horizontal, vertical, verbosity=3): + """ + Set beamline divergence (in mrad). + This is controlled using the Front End (FE) slits. + """ + + h, v = self.divergence(verbosity=0) + + distance_m = 10.0 # distance from source to slits + + horizontal_mm = horizontal*distance_m + vertical_mm = vertical*distance_m + + if horizontal<0: + if verbosity>=1: + print("Horizontal divergence less than zero ({}) doesn't make sense.".format(horizontal)) + + elif horizontal>1.5: + if verbosity>=1: + print("Horizontal divergence should be less than 1.5 mrad.") + + else: + if verbosity>=3: + print('Changing horizontal divergence from {:.3f} mrad to {:.3f} mrad.'.format(h, horizontal)) + caput('FE:C11B-OP{Slt:12-Ax:X}size', horizontal_mm) + + + if vertical<0: + if verbosity>=1: + print("Vertical divergence less than zero ({}) doesn't make sense.".format(vertical)) + + elif vertical>0.15: + if verbosity>=1: + print("Vertical divergence should be less than 0.15 mrad.") + + else: + if verbosity>=3: + print('Changing vertical divergence from {:.3f} mrad to {:.3f} mrad.'.format(v, vertical)) + caput('FE:C11B-OP{Slt:12-Ax:Y}size', vertical_mm) - return self.energy(verbosity=0) + + # Experimental Shutter + ######################################## - def set_wavelength(self, wavelength_A, verbosity=3): - """ - Set the x-ray beam to the specified wavelength (by changing the - monochromator angle. - """ + def is_on(self, verbosity=3): + '''Returns true if the beam is on (experimental shutter open).''' - Bragg_deg_initial = caget(self.mono_bragg_pv) + blade1 = caget('XF:11BMB-OP{PSh:2}Pos:1-Sts') + blade2 = caget('XF:11BMB-OP{PSh:2}Pos:2-Sts') - wavelength_m = wavelength_A*1.e-10 - Bragg_rad = np.arcsin(wavelength_A/(2.*self.dmm_dsp)) - Bragg_deg = np.degrees(Bragg_rad) + if blade1==1 and blade2==1: + if verbosity>=4: + print('Beam on (shutter open).') + + return True - print('mono_bragg will move to {:.4f}g deg'.format(Bragg_deg)) - response = input(' Are you sure? (y/[n]) ') - if response is 'y' or response is 'Y': + else: + if verbosity>=4: + print('Beam off (shutter closed).') - mov(mono_bragg, Bragg_deg) + return False + + + def on(self, verbosity=3, wait_time=0.005, poling_period=0.10, retry_time=2.0, max_retries=5): + '''Turn on the beam (open experimental shutter).''' + + if self.is_on(verbosity=0): + if verbosity>=4: + print('Beam on (shutter already open.)') + + else: - if verbosity>=1: - print('mono_bragg moved from {.4f} deg to {.4f} deg'.format(Bragg_deg_initial, Bragg_deg)) + itry = 0 + while (not self.is_on(verbosity=0)) and itry=5: + print(' try {:d}, t = {:02.2f} s, state = {:s}'.format(itry+1, (time.time()-start_time), 'OPEN_____' if self.is_on(verbosity=0) else 'CLOSE====')) + sleep(poling_period) + + itry += 1 + + + if verbosity>=4: + if self.is_on(verbosity=0): + print('Beam on (shutter opened).') + else: + print("Beam off (shutter didn't open).") + + + def off(self, verbosity=3, wait_time=0.005, poling_period=0.10, retry_time=2.0, max_retries=5): + '''Turn off the beam (close experimental shutter).''' - elif verbosity>=1: - print('No move was made.') + if self.is_on(verbosity=0): + + itry = 0 + while self.is_on(verbosity=0) and itry=5: + print(' try {:d}, t = {:02.2f} s, state = {:s}'.format(itry+1, (time.time()-start_time), 'OPEN_____' if self.is_on(verbosity=0) else 'CLOSE====')) + sleep(poling_period) + + itry += 1 + + + + if verbosity>=4: + if self.is_on(verbosity=0): + print("Beam on (shutter didn't close).") + else: + print('Beam off (shutter closed).') + + else: + if verbosity>=4: + print('Beam off (shutter already closed).') - return self.wavelength(verbosity=verbosity) # Attenuator/Filter Box @@ -283,6 +920,7 @@ def set_wavelength(self, wavelength_A, verbosity=3): def transmission(self, verbosity=3): """ Returns the current beam transmission through the attenuator/filter box. + To change the transmission, use 'setTransmission'. """ energy_keV = self.energy(verbosity=0) @@ -315,7 +953,9 @@ def calc_transmission_filters(self, filter_settings, energy_keV=None, verbosity= If 'None', the current energy is used. If specified, the calculation is performed for the requested energy. - Returns + Returns + + ------- transmission : float The computed transmission value of the x-ray beam through the filter box. @@ -333,7 +973,8 @@ def calc_transmission_filters(self, filter_settings, energy_keV=None, verbosity= E = energy_keV E2 = np.square(E) E3 = np.power(E, 3) - + + # foil thickness blocking the beam N_Al = N[0] + 2*N[1] + 4*N[2] + 8*N[3] N_Nb = N[4] + 2*N[5] + 4*N[6] + 8*N[7] @@ -408,7 +1049,7 @@ def set_attenuation_filters(self, filter_settings, verbosity=3): print(' final: {} T = {:.6g}'.format(filters_final, self.calc_transmission_filters(filters_final, verbosity=0))) - def set_transmission(self, transmission, verbosity=3): + def setTransmission(self, transmission, verbosity=3): """ Sets the transmission through the attenuator/filter box. Because the filter box has a discrete set of foils, it is impossible to @@ -457,6 +1098,8 @@ def set_transmission(self, transmission, verbosity=3): if (dev_ij == min(dev)): N_Nb = i # number of Nb foils selected N_Al = j # number of Al foils selected + + N = [] @@ -476,51 +1119,7 @@ def set_transmission(self, transmission, verbosity=3): return self.transmission(verbosity=verbosity) - def __set_transmission_alternative_deprecated(self, transmission, verbosity=3): - """ - Sets the transmission through the attenuator/filter box. - Because the filter box has a discrete set of foils, it is impossible to - exactly match a given transmission value. A nearby value will be - selected. - """ - - energy_keV = self.energy(verbosity=0) - - if energy_keV < 6.0 or energy_keV > 18.0: - print('Transmission data not available at the current X-ray energy ({.2f} keV).'.format(energy_keV)) - - elif transmission > 1.0: - print('A transmission above 1.0 is not possible.') - - elif transmission < 1e-10: - print('A transmission this low ({:g}) cannot be reliably achieved.'.format(transmission)) - - else: - - num_foils = 8 - - min_deviation=1e300 - best_N = [] - - # Check every possible filter combination - # (This takes 0.17s (instead of 0.003s for other method); - # but either is small compared to the wait-time for the - # pneumatics on the filter box.) - for num in range(2**num_foils): - N = [int(d) for d in bin(num)[2:].zfill(num_foils)] # Convert (binary) number to array of 0/1 - trans_N = self.calc_transmission_filters(N, verbosity=0) - if verbosity>=5: - print('{} T = {:.6g}'.format(N, trans_N)) - - deviation = abs(transmission-trans_N) - if deviation 50 and last_z < 50: print('| Endstation | | | | |') last_z = element.zposition - + flux_expected if verbosity>=1: @@ -571,13 +1172,16 @@ def fluxes(self, verbosity=3): path = '(|)' else: path = '(-)' - elif state is 'out': + elif state is 'out': + + if beam: path = ' | ' else: path = '---' elif state is 'block': path = '[X]' + beam = False elif state is 'undefined': if beam: @@ -588,21 +1192,41 @@ def fluxes(self, verbosity=3): else: path = '???' + + + + + if flux_expected is None or not beam: + flux_expected_str = '' + else: + flux_expected_str = '{:11.3g}'.format(flux_expected) + flux_expected *= element.transmission(verbosity=0) + + - flux_expected = 1.23456789e12 # TODO: Fix if callable(getattr(element, 'reading', None)): reading_str = '{:11.3g}'.format(element.reading(verbosity=0)) - flux_str = '{:11.3g}'.format(element.flux(verbosity=0)) + state = element.state() + if element.has_flux and (state=='in' or state=='block'): + flux_cur = element.flux(verbosity=0) + flux_expected = flux_cur + flux_str = '{:11.3g}'.format(flux_cur) + else: + flux_str = '' + else: reading_str = '' flux_str = '' - flux_expected_str = '' + print('|{:5.1f} m | {:16.16} | {:s} | {:11.11} | {:11.11} | {:11.11} |'.format(element.zposition, element.name, path, reading_str, flux_str, flux_expected_str)) + #beam = True # For testing + + if verbosity>=1: print('+--------+------------------+-----+-------------+-------------+-------------+') @@ -615,19 +1239,487 @@ def fluxes(self, verbosity=3): +beam = CMSBeam() -beam = CMSBeam() +class Beamline(object): + '''Generic class that encapsulates different aspects of the beamline. + The intention for this object is to have methods that activate various 'standard' + protocols or sequences of actions.''' + + def __init__(self, **kwargs): + + self.md = {} + self.current_mode = 'undefined' + + + def mode(self, new_mode): + '''Tells the instrument to switch into the requested mode. This may involve + moving detectors, moving the sample, enabling/disabling detectors, and so + on.''' + + getattr(self, 'mode'+new_mode)() + + + def get_md(self, prefix=None, **md): + '''Returns a dictionary of the current metadata. + The 'prefix' argument is prepended to all the md keys, which allows the + metadata to be grouped with other metadata in a clear way. (Especially, + to make it explicit that this metadata came from the beamline.)''' + + # Update internal md + #self.md['key'] = value + + md_return = self.md.copy() + + # Add md that may change + md_return['mode'] = self.current_mode + + # Include the user-specified metadata + md_return.update(md) + # Add an optional prefix + if prefix is not None: + md_return = { '{:s}{:s}'.format(prefix, key) : value for key, value in md_return.items() } + + return md_return + + + def comment(self, text, logbooks=None, tags=None, append_md=True, **md): + + text += '\n\n[comment for beamline: {}]'.format(self.__class__.__name__) + + if append_md: + + # Global md + md_current = { k : v for k, v in RE.md.items() } + + # Beamline md + md_current.update(self.get_md()) + + # Specified md + md_current.update(md) + + text += '\n\n\nMetadata\n----------------------------------------' + for key, value in sorted(md_current.items()): + text += '\n{}: {}'.format(key, value) + + logbook.log(text, logbooks=logbooks, tags=tags) + + + def log_motors(self, motors, verbosity=3, **md): + + log_text = 'Motors\n----------------------------------------\nname | position | offset | direction |\n' + + for motor in motors: + offset = float(caget(motor.prefix+'.OFF')) + direction = int(caget(motor.prefix+'.DIR')) + log_text += '{} | {} | {} | {} |\n'.format(motor.name, motor.user_readback.value, offset, direction) + + + md_current = { k : v for k, v in RE.md.items() } + md_current.update(md) + log_text += '\nMetadata\n----------------------------------------\n' + for k, v in sorted(md_current.items()): + log_text += '{}: {}\n'.format(k, v) + + if verbosity>=3: + print(log_text) + + self.comment(log_text) + + -# To test: -# cd /opt/ipython_profiles/profile_collection/startup -# ipython -#from ophyd import EpicsMotor, Device, Component as Cpt -#armx = EpicsMotor('XF:11BMB-ES{SM:1-Ax:X}Mtr', name='armx') -#armx.user_setpoint.set(-3) -#armx.user_readback.value + +class CMS_Beamline(Beamline): + '''This object collects together various standard protocols and sequences + of action used on the CMS (11-BM) beamline at NSLS-II.''' + + + def __init__(self, **kwargs): + + super().__init__(**kwargs) + + self.beam = beam + + from epics import PV + + self._chamber_pressure_pv = PV('XF:11BMB-VA{Chm:Det-TCG:1}P-I') + + + + def modeAlignment(self, verbosity=3): + + self.current_mode = 'undefined' + + # TODO: Check what mode (TSAXS, GISAXS) and respond accordingly + # TODO: Check if gate valves are open and flux is okay (warn user) + + + self.beam.off() + self.beam.setTransmission(5e-4) + + #mov( [DETx, DETy], [0, 0] ) + self.beam.bim6.insert() + + caput('XF:11BMB-BI{IM:2}EM180:Acquire', 1) # Turn on bim6 + + self.current_mode = 'alignment' + + self.beam.bim6.reading() + + + + def modeMeasurement(self, verbosity=3): + + self.current_mode = 'undefined' + + self.beam.off() + self.beam.setTransmission(1) + + #mov(DETy, -16) + self.beam.bim6.retract() + + caput('XF:11BMB-BI{IM:2}EM180:Acquire', 0) # Turn off bim6 + + self.current_mode = 'measurement' + + # Check if gate valves are open + if self.beam.GVdsbig.state() is not 'out' and verbosity>=1: + print('Warning: Sample chamber gate valve (large, downstream) is not open.') + + + + + def modeBeamstopAlignment(self, verbosity=3): + '''Places bim6 (dsmon) as a temporary beamstop.''' + + mov(DETy, -6.1) + + + + def ventChamber(self, verbosity=3): + + + # Close large gate valve (downstream side of sample chamber) + caput('XF:11BMB-VA{Chm:Det-GV:1}Cmd:Cls-Cmd',1) + + # Close small gate valve (upstream side of sample chamber) + #caput('XF:11BMB-VA{Slt:4-GV:1}Cmd:Cls-Cmd',1) + + # Close valve connecting sample chamber to vacuum pump + caput('XF:11BMB-VA{Chm:Det-IV:1}Cmd:Cls-Cmd',1) + + sleep(0.5) + + # Soft-open the upstream vent-valve + caput('XF:11BMB-VA{Chm:Smpl-VV:1}Cmd:Cls-Cmd', 1) + sleep(1.0) + caput('XF:11BMB-VA{Chm:Smpl-VV:1_Soft}Cmd:Opn-Cmd', 1) + + + + self.chamberPressure(range_high=100) + + # Fully open the upstream vent-vale + caput('XF:11BMB-VA{Chm:Smpl-VV:1_Soft}Cmd:Cls-Cmd', 1) + sleep(1.0) + caput('XF:11BMB-VA{Chm:Smpl-VV:1}Cmd:Opn-Cmd', 1) + + # Fully open the downstream vent-vale + caput('XF:11BMB-VA{Chm:Det-VV:1_Soft}Cmd:Cls-Cmd', 1) + sleep(1.0) + caput('XF:11BMB-VA{Chm:Det-VV:1}Cmd:Opn-Cmd', 1) + + self.chamberPressure(range_high=1000) + + if verbosity>=1: + print('Sample chamber is ready to be opened.') + + + + def chamberPressure(self, range_low=None, range_high=None, readout_period=1.0, verbosity=3): + '''Monitors the pressure in the sample chamber, printing the current value. + If range arguments are provided, the monitoring will end once the pressure + is outside the range. + ''' + + monitor = True + while monitor: + + try: + + if range_low is not None and self._chamber_pressure_pv.get()range_high: + monitor = False + + P_mbar = self._chamber_pressure_pv.get() + P_atm = P_mbar*0.000986923 + P_torr = P_mbar*0.750062 + P_kPa = P_mbar*0.1 + P_psi = P_mbar = 0.0145038 + + if verbosity>=4: + print('Sample chamber pressure: {:8.2f} mbar = {:5.3f} atm = {:7.3f} torr = {:4.1g} kPa \r'.format(P_mbar, P_atm, P_torr, P_kPa), end='', flush=True) + elif verbosity>=2: + print('Sample chamber pressure: {:8.2f} mbar ({:5.3f} atm) \r'.format(P_mbar, P_atm), end='', flush=True) + + sleep(readout_period) + + + except KeyboardInterrupt: + monitor = False + + + + def pumpChamber(self, readout_delay=0.2): + + + # Close vent-valves + caput('XF:11BMB-VA{Chm:Smpl-VV:1_Soft}Cmd:Cls-Cmd', 1) + sleep(0.5) + caput('XF:11BMB-VA{Chm:Smpl-VV:1}Cmd:Cls-Cmd', 1) + sleep(0.5) + caput('XF:11BMB-VA{Chm:Det-VV:1_Soft}Cmd:Cls-Cmd', 1) + sleep(0.5) + caput('XF:11BMB-VA{Chm:Det-VV:1}Cmd:Cls-Cmd', 1) + sleep(0.2) + + # Turn on pump (if necessary) + if caget('XF:11BMB-VA{Chm:Det-Pmp:1}Sts:Enbl-Sts')==0: + caput('XF:11BMB-VA{Chm:Det-Pmp:1}Cmd:Enbl-Cmd', 0) + sleep(0.2) + caput('XF:11BMB-VA{Chm:Det-Pmp:1}Cmd:Enbl-Cmd', 1) + + # Soft-open valve to pump + caput('XF:11BMB-VA{Chm:Det-IV:1}Cmd:Cls-Cmd', 1) + sleep(1.0) + caput('XF:11BMB-VA{Chm:Det-IV:1_Soft}Cmd:Opn-Cmd', 1) + sleep(0.2) + + sleep(5.0) + # Check pump again + if caget('XF:11BMB-VA{Chm:Det-Pmp:1}Sts:Enbl-Sts')==0: + caput('XF:11BMB-VA{Chm:Det-Pmp:1}Cmd:Enbl-Cmd', 0) + sleep(0.2) + caput('XF:11BMB-VA{Chm:Det-Pmp:1}Cmd:Enbl-Cmd', 1) + + + self.chamberPressure(range_low=500) + + # Fully open valve to pump + caput('XF:11BMB-VA{Chm:Det-IV:1_Soft}Cmd:Cls-Cmd', 1) + sleep(1.0) + caput('XF:11BMB-VA{Chm:Det-IV:1}Cmd:Opn-Cmd', 1) + sleep(0.2) + + self.chamberPressure(range_low=200) + + + def openChamberGateValve(self): + + caput('XF:11BMB-VA{Chm:Det-GV:1}Cmd:Opn-Cmd', 1) # Large (downstream) + #caput('XF:11BMB-VA{Slt:4-GV:1}Cmd:Opn-Cmd',1) # Small (upstream) + + + def closeChamberGateValve(self): + + caput('XF:11BMB-VA{Chm:Det-GV:1}Cmd:Cls-Cmd', 1) # Large (downstream) + #caput('XF:11BMB-VA{Slt:4-GV:1}Cmd:Cls-Cmd',1) # Small (upstream) + + + # Metatdata methods + ######################################## + + def get_md(self, prefix=None, **md): + + md_current = self.md.copy() + md_current['calibration_energy_keV'] = self.beam.energy(verbosity=0) + md_current['calibration_wavelength_A'] = self.beam.wavelength(verbosity=0) + + h, v = self.beam.size(verbosity=0) + md_current['beam_size_x_mm'] = h + md_current['beam_size_y_mm'] = v + h, v = self.beam.divergence(verbosity=0) + md_current['beam_divergence_x_mrad'] = h + md_current['beam_divergence_y_mrad'] = v + + md_current['beamline_mode'] = self.current_mode + + md_current.update(md) + + # Add an optional prefix + if prefix is not None: + md_current = { '{:s}{:s}'.format(prefix, key) : value for key, value in md_current.items() } + + return md_current + + + def setMetadata(self, verbosity=3): + '''Guides the user through setting some of the required and recommended + meta-data fields.''' + + if verbosity>=3: + print('This will guide you through adding some meta-data for the upcoming experiment.') + if verbosity>=4: + print('You can accept default values (shown in square [] brackets) by pressing enter. You can leave a value blank (or put a space) to skip that entry.') + + + # Set some values automatically + month = int(time.strftime('%m')) + if month<=4: + cycle = 1 + elif month<=8: + cycle = 2 + else: + cycle = 3 + RE.md['experiment_cycle'] = '{:s}-{:d}'.format( time.strftime('%Y'), cycle ) + + RE.md['calibration_energy_keV'] = self.beam.energy(verbosity=0) + RE.md['calibration_wavelength_A'] = self.beam.wavelength(verbosity=0) + + # TODO: + # RE.md['calibration_detector_distance_m'] = + # RE.md['calibration_detector_x0'] = + # RE.md['calibration_detector_y0'] = + + + + # Ask the user some questions + + questions = [ + ['experiment_proposal_number', 'Proposal number'] , + ['experiment_SAF_number', 'SAF number'] , + ['experiment_group', 'User group (e.g. PI)'] , + ['experiment_user', 'The specific user/person running the experiment'] , + ['experiment_project', 'Project name/code'] , + ['experiment_type', 'Type of experiments/measurements (SAXS, GIWAXS, etc.)'] , + ] + + + # TBD: + # Path where data will be stored? + + self._dialog_total_questions = len(questions) + self._dialog_question_number = 1 + + for key, text in questions: + try: + self._ask_question(key, text) + except KeyboardInterrupt: + return + + if verbosity>=4: + print('You can also add/edit metadata directly using the RE.md object.') + + + + def _ask_question(self, key, text, default=None): + + if default is None and key in RE.md: + default = RE.md[key] + + if default is None: + ret = input(' Q{:d}/{:d}. {:s}: '.format(self._dialog_question_number, self._dialog_total_questions, text) ) + + else: + ret = input(' Q{:d}/{:d}. {:s} [{}]: '.format(self._dialog_question_number, self._dialog_total_questions, text, default) ) + if ret=='': + ret = default + + + if ret!='' and ret!=' ': + RE.md[key] = ret + + self._dialog_question_number += 1 + + + # Logging methods + ######################################## + + def logAllMotors(self, verbosity=3, **md): + log_pos() + + motor_list = [ + mono_bragg , + mono_pitch2 , + mono_roll2 , + mono_perp2 , + mir_usx , + mir_dsx , + mir_usy , + mir_dsyi , + mir_dsyo , + mir_bend , + s0.tp , + s0.bt , + s0.ob , + s0.ib , + s1.xc , + s1.xg , + s1.yc , + s1.yg , + s2.xc , + s2.xg , + s2.yc , + s2.yg , + s3.xc , + s3.xg , + s3.yc , + s3.yg , + s4.xc , + s4.xg , + s4.yc , + s4.yg , + s5.xc , + s5.xg , + s5.yc , + s5.yg , + bim3y , + fs3y , + bim4y , + bim5y , + smx , + smy , + sth , + schi , + sphi , + srot , + strans , + camx , + camy , + cam2x , + cam2z , + DETx , + DETy , + WAXSx , + SAXSx , + SAXSy , + bsx , + bsy , + bsphi , + armz , + armx , + armphi , + army , + armr , + ] + + self.log_motors(motor_list, verbosity=verbosity, **md) + + + # End class CMSBeam(object) + ######################################## + + + +cms = CMS_Beamline() + +def get_beamline(): + return cms diff --git a/startup/90-bluesky.py b/startup/90-bluesky.py old mode 100644 new mode 100755 diff --git a/startup/91-fit_scan.py b/startup/91-fit_scan.py new file mode 100644 index 0000000..1920083 --- /dev/null +++ b/startup/91-fit_scan.py @@ -0,0 +1,825 @@ + +# Classes and functions to make it easy to do a dscan with realtime fitting to +# a custom function. + + + +if False: + #%run -i /opt/ipython_profiles/profile_collection/startup/91-fit_scan.py + + # Define a 'fake' detector, for testing purposes + from bluesky.examples import Reader + def fake_detector_response_peak(): + pos = armz.user_readback.value + A = 1000.0 + x0 = -40.0 + sigma = 0.1 + I = A*np.exp(-(pos - x0)**2/(2 * sigma**2)) + 10.0 + + return np.random.poisson(I) + def fake_detector_response_edge(): + pos = armz.user_readback.value + A = 1000.0 + x0 = -17.0 + sigma = 0.05 + I = A/( 1 + np.exp( -(pos-x0)/(-sigma) ) ) + 10.0 + + return np.random.poisson(I) + + #det = Reader( 'det', {'intensity': lambda: 1.0*( (DETx.user_readback.value - (-40.0))**2 )/(2.*(0.1)**2) } ) + det = Reader( 'intensity', {'intensity': fake_detector_response_edge} ) + detselect(det) + #detselect(det, suffix='') + + #fit_scan(DETx, 1, 3, detector_suffix='') + #fit_scan(armz, [-5,0], 5, detector_suffix='') + + + + + +class LiveStat(CallbackBase): + """ + Calculate simple statistics for an (x,y) curve. + """ + + # Note: Follows the style/naming of class LiveFit(CallbackBase), + # where possible, so that it can be used in similar contexts. + + def __init__(self, stat, y_name, x_name, update_every=1): + + self.stat = stat + self.y_name = y_name + self.x_name = x_name + self.update_every = update_every + + self.ydata = [] + self.xdata = [] + + class Result(object): + pass + self.result = Result() # Dummy object to replicate the hiearchy expected for LiveFit + self.result.values = {} + + + + def event(self, doc): + + if self.y_name not in doc['data']: + return + + y = doc['data'][self.y_name] + x = doc['data'][self.x_name] + + self.ydata.append(y) + self.xdata.append(x) + + if self.update_every is not None: + i = doc['seq_num'] + if ((i - 1) % self.update_every == 0): + + if type(self.stat) is list: + for stat in self.stat: + self.update_fit(stat) + else: + self.update_fit(self.stat) + + super().event(doc) + + + def update_fit(self, stat): + + xs = np.asarray(self.xdata) + ys = np.asarray(self.ydata) + + if stat is 'max': + idx = np.argmax(ys) + x0 = xs[idx] + y0 = ys[idx] + + self.result.values['x_max'] = x0 + self.result.values['y_max'] = y0 + + elif stat is 'min': + idx = np.argmin(ys) + x0 = xs[idx] + y0 = ys[idx] + + self.result.values['x_min'] = x0 + self.result.values['y_min'] = y0 + + elif stat is 'COM': + x0 = np.sum(xs*ys)/np.sum(ys) + y0 = np.interp(x0, xs, ys) + + self.result.values['x_COM'] = x0 + self.result.values['y_COM'] = y0 + + elif stat is 'HM': + idx_max = np.argmax(ys) + half_max = 0.5*ys[idx_max] + + l = None + r = None + + left = ys[:idx_max] + right = ys[idx_max:] + + if len(left)>0 and left.min()0 and right.min()=0 and yspan>0 and ymin/yspan<0.25: + self.ax.set_ylim([0, ymax*1.2]) + + super().update_plot() + + + def scroll_event(self, event): + '''Gets called when the mousewheel/scroll-wheel is used. This activates + zooming.''' + + if event.inaxes!=self.ax: + return + + current_plot_limits = self.ax.axis() + x = event.xdata + y = event.ydata + + + # The following function converts from the wheel-mouse steps + # into a zoom-percentage. Using a base of 4 and a divisor of 2 + # means that each wheel-click is a 50% zoom. However, the speed + # of zooming can be altered by changing these numbers. + + # 50% zoom: + step_percent = 4.0**( -event.step/2.0 ) + # Fast zoom: + #step_percent = 100.0**( -event.step/2.0 ) + # Slow zoom: + #step_percent = 2.0**( -event.step/4.0 ) + + xi = x - step_percent*(x-current_plot_limits[0]) + xf = x + step_percent*(current_plot_limits[1]-x) + yi = y - step_percent*(y-current_plot_limits[2]) + yf = y + step_percent*(current_plot_limits[3]-y) + + self.ax.axis( (xi, xf, yi, yf) ) + + self.ax.figure.canvas.draw() + + + + +class LiveFitPlot_Custom(LiveFitPlot): + """ + Add a plot to an instance of LiveFit. + + Note: If your figure blocks the main thread when you are trying to + scan with this callback, call `plt.ion()` in your IPython session. + + Parameters + ---------- + livefit : LiveFit + an instance of ``LiveFit`` + legend_keys : list, optional + The list of keys to extract from the RunStart document and format + in the legend of the plot. The legend will always show the + scan_id followed by a colon ("1: "). Each + xlim : tuple, optional + passed to Axes.set_xlim + ylim : tuple, optional + passed to Axes.set_ylim + ax : Axes, optional + matplotib Axes; if none specified, new figure and axes are made. + All additional keyword arguments are passed through to ``Axes.plot``. + """ + + def __init__(self, livefit, *, legend_keys=None, xlim=None, ylim=None, + ax=None, scan_range=None, **kwargs): + + + kwargs_update = { + 'color' : 'b' , + 'linewidth' : 2.5 , + } + kwargs_update.update(kwargs) + + + super().__init__(livefit, legend_keys=legend_keys, xlim=xlim, ylim=ylim, ax=ax, **kwargs_update) + + self.scan_range = scan_range + + + def get_scan_range(self, overscan=0.0): + if self.scan_range is None: + x_start = np.min(self.livefit.independent_vars_data[self.__x_key]) + x_stop = np.max(self.livefit.independent_vars_data[self.__x_key]) + else: + x_start = np.min(self.scan_range) + x_stop = np.max(self.scan_range) + + span = abs(x_stop-x_start) + + x_start -= span*overscan + x_stop += span*overscan + + return x_start, x_stop, span + + + def event(self, doc): + + # Slight kludge (to over-ride possible 'greying out' from LivePlot_Custom.start) + self.current_line.set_alpha(1.0) + self.current_line.set_linewidth(2.5) + self.x0_line.set_alpha(0.5) + self.x0_line.set_linewidth(2.0) + + self.livefit.event(doc) + if self.livefit.result is not None: + #self.y_data = self.livefit.result.best_fit + #self.x_data = self.livefit.independent_vars_data[self.__x_key] + + x_start, x_stop, span = self.get_scan_range(overscan=0.25) + + self.x_data = np.linspace(x_start, x_stop, num=200, endpoint=True, retstep=False) + self.y_data = self.livefit.result.eval(x=self.x_data) + + self.update_plot() + + + # Intentionally override LivePlot.event. Do not call super(). + + + def start(self, doc): + + super().start(doc) + + for line in self.ax.lines: + if hasattr(line, 'custom_tag_x0') and line.custom_tag_x0: + line.remove() + + # A line that denotes the current fit position for x0 (e.g. center of gaussian) + x_start, x_stop, span = self.get_scan_range(overscan=0.0) + self.x0_line = self.ax.axvline( (x_start+x_stop)*0.5, color='b', alpha=0.5, dashes=[5,5], linewidth=2.0 ) + self.x0_line.custom_tag_x0 = True + + + + def update_plot(self): + + x0 = self.livefit.result.values['x0'] + self.x0_line.set_xdata([x0]) + super().update_plot() + + + + +class LiveFit_Custom(LiveFit): + """ + Fit a model to data using nonlinear least-squares minimization. + + Parameters + ---------- + model_name : string + The name of the model to be used in fitting + y : string + name of the field in the Event document that is the dependent variable + independent_vars : dict + map the independent variable name(s) in the model to the field(s) + in the Event document; e.g., ``{'x': 'motor'}`` + init_guess : dict, optional + initial guesses for other values, if expected by model; + e.g., ``{'sigma': 1}`` + update_every : int or None, optional + How often to recompute the fit. If `None`, do not compute until the + end. Default is 1 (recompute after each new point). + + Attributes + ---------- + result : lmfit.ModelResult + """ + def __init__(self, model_name, y, independent_vars, scan_range, update_every=1, background=None): + + + self.x_start = min(scan_range) + self.x_stop = max(scan_range) + self.x_span = abs(self.x_stop-self.x_start) + + substitutions = { 'gaussian': 'gauss', 'lorentzian': 'lorentz', 'squarewave': 'square', 'tophat': 'square', 'rectangular': 'square', 'errorfunction': 'erf' } + if model_name in substitutions.keys(): + model_name = substitutions[model_name] + + + lm_model = self.get_model(model_name) + init_guess = self.get_initial_guess(model_name) + + # Add additional models (if any) + if background is not None: + if type(background) is list: + for back in background: + lm_model += self.get_model(back) + init_guess.update(self.get_initial_guess(back)) + else: + lm_model += self.get_model(background) + init_guess.update(self.get_initial_guess(background)) + + super().__init__(lm_model, y, independent_vars, init_guess=init_guess, update_every=update_every) + + + + def get_model(self, model_name): + + if model_name is 'gauss': + def model_function(x, x0, prefactor, sigma): + return prefactor*np.exp(-(x - x0)**2/(2 * sigma**2)) + + elif model_name is 'lorentz': + def model_function(x, x0, prefactor, gamma): + return prefactor* (gamma**2) / ( (x-x0)**2 + (gamma**2) ) + + elif model_name is 'doublesigmoid': + def model_function(x, x0, prefactor, sigma, fwhm): + left = prefactor/( 1 + np.exp( -(x-(x0-fwhm*0.5))/sigma ) ) + right = prefactor/( 1 + np.exp( -(x-(x0+fwhm*0.5))/sigma ) ) + return prefactor*( left - right ) + + elif model_name is 'square': + def model_function(x, x0, prefactor, fwhm): + sigma = fwhm*0.02 + left = prefactor/( 1 + np.exp( -(x-(x0-fwhm*0.5))/sigma ) ) + right = prefactor/( 1 + np.exp( -(x-(x0+fwhm*0.5))/sigma ) ) + return prefactor*( left - right ) + + + elif model_name is 'sigmoid': + def model_function(x, x0, prefactor, sigma): + return prefactor/( 1 + np.exp( -(x-x0)/sigma ) ) + + elif model_name is 'sigmoid_r': + def model_function(x, x0, prefactor, sigma): + return prefactor/( 1 + np.exp( +(x-x0)/sigma ) ) + + elif model_name is 'step': + def model_function(x, x0, prefactor, sigma): + return prefactor/( 1 + np.exp( -(x-x0)/sigma ) ) + + elif model_name is 'step_r': + def model_function(x, x0, prefactor, sigma): + return prefactor/( 1 + np.exp( +(x-x0)/sigma ) ) + + elif model_name is 'tanh': + def model_function(x, x0, prefactor, sigma): + return prefactor*0.5*( np.tanh((x-x0)/sigma) + 1.0 ) + + elif model_name is 'tanh_r': + def model_function(x, x0, prefactor, sigma): + return prefactor*0.5*( np.tanh(-(x-x0)/sigma) + 1.0 ) + + elif model_name is 'erf': + import scipy + def model_function(x, x0, prefactor, sigma): + return prefactor*0.5*( scipy.special.erf((x-x0)/sigma) + 1.0 ) + + elif model_name is 'erf_r': + import scipy + def model_function(x, x0, prefactor, sigma): + return prefactor*0.5*( scipy.special.erf(-(x-x0)/sigma) + 1.0 ) + + + elif model_name is 'constant': + def model_function(x, offset): + return x*0 + offset + + elif model_name is 'linear': + def model_function(x, m, b): + return m*x + b + + else: + print('ERROR: Model {:s} unknown.'.format(model_name)) + + lm_model = lmfit.Model(model_function) + + return lm_model + + + def get_initial_guess(self, model_name): + return getattr(self, 'initial_guess_{:s}'.format(model_name))() + + + def initial_guess_gauss(self): + init_guess = { + 'x0': lmfit.Parameter('x0', (self.x_start+self.x_stop)*0.5, min=self.x_start-self.x_span*0.1, max=self.x_stop+self.x_span*0.1) , + 'prefactor': lmfit.Parameter('prefactor', 1000, min=0) , + 'sigma': lmfit.Parameter('sigma', self.x_span*0.25, min=0, max=self.x_span*4) , + } + return init_guess + + def initial_guess_lorentz(self): + init_guess = { + 'x0': lmfit.Parameter('x0', (self.x_start+self.x_stop)*0.5, min=self.x_start-self.x_span*0.1, max=self.x_stop+self.x_span*0.1) , + 'prefactor': lmfit.Parameter('prefactor', 1, min=0) , + 'gamma': lmfit.Parameter('gamma', self.x_span*0.25, min=0, max=self.x_span*4) , + } + return init_guess + + def initial_guess_doublesigmoid(self): + init_guess = { + 'x0': lmfit.Parameter('x0', (self.x_start+self.x_stop)*0.5, min=self.x_start-self.x_span*0.1, max=self.x_stop+self.x_span*0.1) , + 'prefactor': lmfit.Parameter('prefactor', 100, min=0) , + 'sigma': lmfit.Parameter('sigma', self.x_span*0.25, min=0, max=self.x_span) , + 'fwhm': lmfit.Parameter('fwhm', self.x_span*0.25, min=0, max=self.x_span) , + } + return init_guess + + def initial_guess_square(self): + init_guess = { + 'x0': lmfit.Parameter('x0', (self.x_start+self.x_stop)*0.5, min=self.x_start-self.x_span*0.1, max=self.x_stop+self.x_span*0.1) , + 'prefactor': lmfit.Parameter('prefactor', 100, min=0) , + 'fwhm': lmfit.Parameter('fwhm', self.x_span*0.25, min=0, max=self.x_span) , + } + return init_guess + + def initial_guess_sigmoid(self): + init_guess = { + 'x0': lmfit.Parameter('x0', (self.x_start+self.x_stop)*0.5, min=self.x_start-self.x_span*0.1, max=self.x_stop+self.x_span*0.1) , + 'prefactor': lmfit.Parameter('prefactor', 100, min=0) , + 'sigma': lmfit.Parameter('sigma', self.x_span*0.25, min=0, max=self.x_span*4) , + } + return init_guess + + def initial_guess_sigmoid_r(self): + return self.initial_guess_sigmoid() + + def initial_guess_step(self): + init_guess = { + 'x0': lmfit.Parameter('x0', (self.x_start+self.x_stop)*0.5, min=self.x_start-self.x_span*0.1, max=self.x_stop+self.x_span*0.1) , + 'prefactor': lmfit.Parameter('prefactor', 100, min=0) , + 'sigma': lmfit.Parameter('sigma', self.x_span*0.002, min=0, max=self.x_span*0.005) , + } + return init_guess + + def initial_guess_step_r(self): + return self.initial_guess_step() + + def initial_guess_tanh(self): + return self.initial_guess_sigmoid() + + def initial_guess_tanh_r(self): + return self.initial_guess_tanh() + + def initial_guess_erf(self): + return self.initial_guess_sigmoid() + + def initial_guess_erf_r(self): + return self.initial_guess_erf() + + + def initial_guess_linear(self): + init_guess = {'m' : 0, 'b' : 0 } + return init_guess + + def initial_guess_constant(self): + init_guess = {'offset' : 0} + return init_guess + + + + + +import lmfit + +def fit_scan(motor, span, num=11, detectors=None, detector_suffix='', fit='HM', background=None, per_step=None, wait_time=None, md={}): + """ + Scans the specified motor, and attempts to fit the data as requested. + + Parameters + ---------- + motor : motor + The axis/stage/motor that you want to move. + span : float + The total size of the scan range (centered about the current position). + If a two-element list is instead specified, this is interpreted as the + distances relative to the current position for the start and end. + num : int + The number of scan points. + fit : None or string + If None, then fitting is not done. Otherwise, the model specified by the + supplied string is used. + peaks: gauss, lorentz, doublesigmoid, square + edges: sigmoid, step + stats: max, min, COM (center-of-mass), HM (half-max) + background : None or string + A baseline/background underlying the fit function can be specified. + (In fact, a sequence of summed background functions can be supplied.) + constant, linear + md : dict, optional + metadata + """ + + # TODO: Normalize per ROI pixel and per count_time? + + initial_position = motor.user_readback.value + + if type(span) is list: + start = initial_position+span[0] + stop = initial_position+span[1] + else: + start = initial_position-span/2. + stop = initial_position+span/2. + span = abs(stop-start) + #positions, dp = np.linspace(start, stop, num, endpoint=True, retstep=True) + + if detectors is None: + detectors = gs.DETS + + + + # Get axes for plotting + title = 'fit_scan: {} vs. {}'.format(detectors[0].name, motor.name) + #if len(plt.get_fignums())>0: + # Try to use existing figure + #fig = plt.gcf() # Most recent figure + + fig = None + for i in plt.get_fignums(): + title_cur = plt.figure(i).canvas.manager.window.windowTitle() + if title_cur==title: + fig = plt.figure(i) + break + + if fig is None: + # New figure + #fig, ax = plt.subplots() + fig = plt.figure(figsize=(11,7), facecolor='white') + fig.canvas.manager.toolbar.pan() + + fig.canvas.set_window_title(title) + ax = fig.gca() + + + subs = [] + + livetable = LiveTable([motor] + list(detectors)) + subs.append(livetable) + liveplot = LivePlot_Custom('{}{}'.format(detectors[0].name, detector_suffix), motor.name, ax=ax) + subs.append(liveplot) + + + + if fit in ['max', 'min', 'COM', 'HM', 'HMi'] or type(fit) is list: + + livefit = LiveStat(fit, '{}{}'.format(detectors[0].name, detector_suffix), motor.name) + + livefitplot = LiveStatPlot(livefit, ax=ax, scan_range=[start, stop]) + + subs.append(livefitplot) + + + elif fit is not None: + + # Perform a fit + + #livefit = LiveFit(lm_model, '{}{}'.format(detectors[0].name, detector_suffix), {'x': motor.name}, init_guess) + livefit = LiveFit_Custom(fit, '{}{}'.format(detectors[0].name, detector_suffix), {'x': motor.name}, scan_range=[start, stop], background=background) + + #livefitplot = LiveFitPlot(livefit, color='k') + livefitplot = LiveFitPlot_Custom(livefit, ax=ax, scan_range=[start, stop]) + + subs.append(livefitplot) + + + md['plan_header_override'] = 'fit_scan' + md['scan'] = 'fit_scan' + md['fit_function'] = fit + md['fit_background'] = background + + # Perform the scan + RE(scan(list(detectors), motor, start, stop, num, per_step=per_step, md=md), subs ) + + + + if fit is None: + # Return to start position + #motor.user_setpoint.set(initial_position) + mov(motor, initial_position) + + else: + + print( livefit.result.values ) + x0 = livefit.result.values['x0'] + mov(motor, x0) + return livefit.result + + + + + + + + + + +# TODO: +# - Fit parameters on graph +# - Correctly guess orientation of sigmoids/etc. +# - Have an 'auto' mode that just picks the right fit-function? +# - Do it silently (for autonomous modes); maybe save to disk +# - Allow fit to be 'redone' (with a different function) at the end. +## Maybe need a global pointer to the last fit? (Or contained within beamline?) + +# HMi + +# TODO: +# Binary search to find half-height, peak, etc. + +# TODO: +# version of fit_scan for use in scripts (only fits at end, does lots of double-checks for sanity...) + + diff --git a/startup/94-sample.py b/startup/94-sample.py new file mode 100644 index 0000000..be340cd --- /dev/null +++ b/startup/94-sample.py @@ -0,0 +1,1601 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# vi: ts=4 sw=4 + + + + +################################################################################ +# Code for defining a 'Sample' object, which keeps track of its state, and +# simplifies the task of aligning, measuring, etc. +################################################################################ +# Known Bugs: +# N/A +################################################################################ +# TODO: +# - Search for "TODO" below. +# - Ability to have a collection of simultaneous motions? (E.g. build up a set +# of deferred motions?) +################################################################################ + + +import time +import re + + + +class CoordinateSystem(object): + """ + A generic class defining a coordinate system. Several coordinate systems + can be layered on top of one another (with a reference to the underlying + coordinate system given by the 'base_stage' pointer). When motion of a given + CoordinateSystem is requested, the motion is passed (with coordinate + conversion) to the underlying stage. + """ + + hint_replacements = { 'positive': 'negative', + 'up': 'down', + 'left': 'right', + 'towards': 'away from', + 'downstream': 'upstream', + 'inboard': 'outboard', + 'clockwise': 'counterclockwise', + 'CW': 'CCW', + } + + + # Core methods + ######################################## + + def __init__(self, name='', base=None, **kwargs): + '''Create a new CoordinateSystem (e.g. a stage or a sample). + + Parameters + ---------- + name : str + Name for this stage/sample. + base : Stage + The stage on which this stage/sample sits. + ''' + + self.name = name + self.base_stage = base + + + self.enabled = True + + self.md = {} + self._marks = {} + + self._set_axes_definitions() + self._init_axes(self._axes_definitions) + + + def _set_axes_definitions(self): + '''Internal function which defines the axes for this stage. This is kept + as a separate function so that it can be over-ridden easily.''' + + # The _axes_definitions array holds a list of dicts, each defining an axis + self._axes_definitions = [] + + + def _init_axes(self, axes): + '''Internal method that generates method names to control the various axes.''' + + # Note: Instead of defining CoordinateSystem() having methods '.x', '.xr', + # '.y', '.yr', etc., we programmatically generate these methods when the + # class (and subclasses) are instantiated. + # Thus, the Axis() class has generic versions of these methods, which are + # appropriated renamed (bound, actually) when a class is instantiated. + + self._axes = {} + + for axis in axes: + + axis_object = Axis(axis['name'], axis['motor'], axis['enabled'], axis['scaling'], axis['units'], axis['hint'], self.base_stage, stage=self) + self._axes[axis['name']] = axis_object + + # Bind the methods of axis_object to appropriately-named methods of + # the CoordinateSystem() class. + setattr(self, axis['name'], axis_object.get_position ) + setattr(self, axis['name']+'abs', axis_object.move_absolute ) + setattr(self, axis['name']+'r', axis_object.move_relative ) + setattr(self, axis['name']+'pos', axis_object.get_position ) + setattr(self, axis['name']+'posMotor', axis_object.get_motor_position ) + + + setattr(self, axis['name']+'units', axis_object.get_units ) + setattr(self, axis['name']+'hint', axis_object.get_hint ) + setattr(self, axis['name']+'info', axis_object.get_info ) + + setattr(self, axis['name']+'set', axis_object.set_current_position ) + setattr(self, axis['name']+'o', axis_object.goto_origin ) + setattr(self, axis['name']+'setOrigin', axis_object.set_origin ) + setattr(self, axis['name']+'mark', axis_object.mark ) + + setattr(self, axis['name']+'scan', axis_object.scan ) + setattr(self, axis['name']+'c', axis_object.center ) + + + def comment(self, text, logbooks=None, tags=None, append_md=True, **md): + '''Add a comment related to this CoordinateSystem.''' + + text += '\n\n[comment for CoordinateSystem: {} ({})].'.format(self.name, self.__class__.__name__) + + if append_md: + + md_current = { k : v for k, v in RE.md.items() } # Global md + md_current.update(get_beamline().get_md()) # Beamline md + + # Self md + #md_current.update(self.get_md()) + + # Specified md + md_current.update(md) + + text += '\n\n\nMetadata\n----------------------------------------' + for key, value in sorted(md_current.items()): + text += '\n{}: {}'.format(key, value) + + + logbook.log(text, logbooks=logbooks, tags=tags) + + + def set_base_stage(self, base): + + self.base_stage = base + self._init_axes(self._axes_definitions) + + + # Convenience/helper methods + ######################################## + + + def multiple_string_replacements(self, text, replacements, word_boundaries=False): + '''Peform multiple string replacements simultaneously. Matching is case-insensitive. + + Parameters + ---------- + text : str + Text to return modified + replacements : dictionary + Replacement pairs + word_boundaries : bool, optional + Decides whether replacements only occur for words. + ''' + + # Code inspired from: + # http://stackoverflow.com/questions/6116978/python-replace-multiple-strings + # Note inclusion of r'\b' sequences forces the regex-match to occur at word-boundaries. + + if word_boundaries: + replacements = dict((r'\b'+re.escape(k.lower())+r'\b', v) for k, v in replacements.items()) + pattern = re.compile("|".join(replacements.keys()), re.IGNORECASE) + text = pattern.sub(lambda m: replacements[r'\b'+re.escape(m.group(0).lower())+r'\b'], text) + + else: + replacements = dict((re.escape(k.lower()), v) for k, v in replacements.items()) + pattern = re.compile("|".join(replacements.keys()), re.IGNORECASE) + text = pattern.sub(lambda m: rep[re.escape(m.group(0))], text) + + return text + + + def _hint_replacements(self, text): + '''Convert a motor-hint into its logical inverse.''' + + # Generates all the inverse replacements + replacements = dict((v, k) for k, v in self.hint_replacements.items()) + replacements.update(self.hint_replacements) + + return self.multiple_string_replacements(text, replacements, word_boundaries=True) + + + # Motion methods + ######################################## + + + def enable(self): + self.enabled = True + + + def disable(self): + self.enabled = False + + + def is_enabled(self): + return self.enabled + + + def pos(self, verbosity=3): + '''Return (and print) the positions of all axes associated with this + stage/sample.''' + + out = {} + for axis_name, axis_object in sorted(self._axes.items()): + out[axis_name] = axis_object.get_position(verbosity=verbosity) + #if verbosity>=2: print('') # \n + + return out + + def hints(self, verbosity=3): + '''Return (and print) the hints of all axes associated with this + stage/sample.''' + + out = {} + for axis_name, axis_object in sorted(self._axes.items()): + if verbosity>=2: print('{:s}'.format(axis_name)) + out[axis_name] = axis_object.get_hint(verbosity=verbosity) + if verbosity>=2: print('') # \n + + return out + + + def origin(self, verbosity=3): + '''Returns the origin for axes.''' + + out = {} + for axis_name, axis_object in sorted(self._axes.items()): + origin = axis_object.get_origin() + if verbosity>=2: print('{:s} origin = {:.3f} {:s}'.format(axis_name, origin, axis_object.get_units())) + out[axis_name] = origin + + return out + + + def gotoOrigin(self, axes=None): + '''Go to the origin (zero-point) for this stage. All axes are zeroed, + unless one specifies the axes to move.''' + + # TODO: Guard against possibly buggy behavior if 'axes' is a string instead of a list. + # (Python will happily iterate over the characters in a string.) + + if axes is None: + axes_to_move = self._axes.values() + + else: + axes_to_move = [self._axes[axis_name] for axis_name in axes] + + for axis in axes_to_move: + axis.goto_origin() + + + def setOrigin(self, axes, positions=None): + '''Define the current position as the zero-point (origin) for this stage/ + sample. The axes to be considered in this redefinition must be supplied + as a list. + + If the optional positions parameter is passed, then those positions are + used to define the origins for the axes.''' + + if positions is None: + + for axis in axes: + getattr(self, axis+'setOrigin')() + + else: + for axis, pos in zip(axes, positions): + getattr(self, axis+'setOrigin')(pos) + + + def gotoAlignedPosition(self): + '''Goes to the currently-defined 'aligned' position for this stage. If + no specific aligned position is defined, then the zero-point for the stage + is used instead.''' + + # TODO: Optional offsets? (Like goto mark?) + + if 'aligned_position' in self.md and self.md['aligned_position'] is not None: + for axis_name, position in self.md['aligned_position'].items(): + self._axes[axis_name].move_absolute(position) + + else: + self.gotoOrigin() + + + + # Motion logging + ######################################## + + def setAlignedPosition(self, axes): + '''Saves the current position as the 'aligned' position for this stage/ + sample. This allows one to return to this position later. One must + specify the axes to be considered. + + WARNING: Currently this position data is not saved persistently. E.g. it will + be lost if you close and reopen the console. + ''' + + positions = {} + for axis_name in axes: + positions[axis_name] = self._axes[axis_name].get_position(verbosity=0) + + self.attributes['aligned_position'] = positions + + + def mark(self, label, *axes, **axes_positions): + '''Set a mark for the stage/sample/etc. + + 'Marks' are locations that have been labelled, which is useful for + later going to a labelled position (using goto), or just to keep track + of sample information (metadata). + + By default, the mark is set at the current position. If no 'axes' are + specified, all motors are logged. Alternately, axes (as strings) can + be specified. If axes_positions are given as keyword arguments, then + positions other than the current position can be specified. + ''' + + positions = {} + + if len(axes)==0 and len(axes_positions)==0: + + for axis_name in self._axes: + positions[axis_name] = self._axes[axis_name].get_position(verbosity=0) + + else: + for axis_name in axes: + positions[axis_name] = self._axes[axis_name].get_position(verbosity=0) + + for axis_name, position in axes_positions.items(): + positions[axis_name] = position + + self._marks[label] = positions + + + def marks(self, verbosity=3): + '''Get a list of the current marks on the stage/sample/etc. 'Marks' + are locations that have been labelled, which is useful for later + going to a labelled position (using goto), or just to keep track + of sample information (metadata).''' + + if verbosity>=3: + print('Marks for {:s} (class {:s}):'.format(self.name, self.__class__.__name__)) + + if verbosity>=2: + for label, positions in self._marks.items(): + print(label) + for axis_name, position in sorted(positions.items()): + print(' {:s} = {:.4f} {:s}'.format(axis_name, position, self._axes[axis_name].get_units())) + + return self._marks + + + def goto(self, label, verbosity=3, **additional): + '''Move the stage/sample to the location given by the label. For this + to work, the specified label must have been 'marked' at some point. + + Additional keyword arguments can be provided. For instance, to move + 3 mm from the left edge: + sam.goto('left edge', xr=+3.0) + ''' + + if label not in self._marks: + if verbosity>=1: + print("Label '{:s}' not recognized. Use '.marks()' for the list of marked positions.".format(label)) + return + + for axis_name, position in sorted(self._marks[label].items()): + + if axis_name+'abs' in additional: + # Override the marked value for this position + position = additional[axis_name+'abs'] + del(additional[axis_name+'abs']) + + + #relative = 0.0 if axis_name+'r' not in additional else additional[axis_name+'r'] + if axis_name+'r' in additional: + relative = additional[axis_name+'r'] + del(additional[axis_name+'r']) + else: + relative = 0.0 + + self._axes[axis_name].move_absolute(position+relative, verbosity=verbosity) + + + # Handle any optional motions not already covered + for command, amount in additional.items(): + if command[-1]=='r': + getattr(self, command)(amount, verbosity=verbosity) + elif command[-3:]=='abs': + getattr(self, command)(amount, verbosity=verbosity) + else: + print("Keyword argument '{}' not understood (should be 'r' or 'abs').".format(command)) + + + + # End class CoordinateSystem(object) + ######################################## + + + + +class Axis(object): + '''Generic motor axis. + + Meant to be used within a CoordinateSystem() or Stage() object. + ''' + + def __init__(self, name, motor, enabled, scaling, units, hint, base, stage=None, origin=0.0): + + self.name = name + self.motor = motor + self.enabled = enabled + self.scaling = scaling + self.units = units + self.hint = hint + + self.base_stage = base + self.stage = stage + + self.origin = 0.0 + + + + self._move_settle_max_time = 10.0 + self._move_settle_period = 0.05 + self._move_settle_tolerance = 0.01 + + + def cur_to_base(self, position): + + base_position = self.get_origin() + self.scaling*position + + return base_position + + + def base_to_cur(self, base_position): + + position = (base_position - self.get_origin())/self.scaling + + return position + + + # Programmatically-defined methods + ######################################## + # Note: Instead of defining CoordinateSystem() having methods '.x', '.xr', + # '.xp', etc., we programmatically generate these methods when the class + # (and subclasses) are instantiated. + # Thus, the Axis() class has generic versions of these methods, which are + # appropriated renamed (bound, actually) when a class is instantiated. + def get_position(self, verbosity=3): + '''Return the current position of this axis (in its coordinate system). + By default, this also prints out the current position.''' + + + if self.motor is not None: + base_position = self.motor.position + + else: + verbosity_c = verbosity if verbosity>=4 else 0 + base_position = getattr(self.base_stage, self.name+'pos')(verbosity=verbosity_c) + + position = self.base_to_cur(base_position) + + + if verbosity>=2: + if self.stage: + stg = self.stage.name + else: + stg = '?' + + if verbosity>=5 and self.motor is not None: + print( '{:s} = {:.3f} {:s}'.format(self.motor.name, base_position, self.get_units()) ) + + print( '{:s}.{:s} = {:.3f} {:s} (origin = {:.3f})'.format(stg, self.name, position, self.get_units(), self.get_origin()) ) + + + return position + + + def get_motor_position(self, verbosity=3): + '''Returns the position of this axis, traced back to the underlying + motor.''' + + if self.motor is not None: + return self.motor.position + + else: + return getattr(self.base_stage, self.name+'posMotor')(verbosity=verbosity) + #return self.base_stage._axes[self.name].get_motor_position(verbosity=verbosity) + + + def move_absolute(self, position=None, wait=True, verbosity=3): + '''Move axis to the specified absolute position. The position is given + in terms of this axis' current coordinate system. The "defer" argument + can be used to defer motions until "move" is called.''' + + + if position is None: + # If called without any argument, just print the current position + return self.get_position(verbosity=verbosity) + + # Account for coordinate transformation + base_position = self.cur_to_base(position) + + if self.is_enabled(): + + if self.motor: + #mov( self.motor, base_position ) + self.motor.user_setpoint.value = base_position + + else: + # Call self.base_stage.xabs(base_position) + getattr(self.base_stage, self.name+'abs')(base_position, verbosity=0) + + + if verbosity>=2: + if self.stage: + stg = self.stage.name + else: + stg = '?' + + # Show a realtime output of position + start_time = time.time() + current_position = self.get_position(verbosity=0) + while abs(current_position-position)>self._move_settle_tolerance and (time.time()-start_time)=1: + print( '{:s}.{:s} = {:5.3f} {:s} '.format(stg, self.name, current_position, self.get_units())) + + + + elif verbosity>=1: + print( 'Axis %s disabled (stage %s).' % (self.name, self.stage.name) ) + + + + def move_relative(self, move_amount=None, verbosity=3): + '''Move axis relative to the current position.''' + + if move_amount is None: + # If called without any argument, just print the current position + return self.get_position(verbosity=verbosity) + + target_position = self.get_position(verbosity=0) + move_amount + + return self.move_absolute(target_position) + + + def goto_origin(self): + '''Move axis to the currently-defined origin (zero-point).''' + + self.move_absolute(0) + + + def set_origin(self, origin=None): + '''Sets the origin (zero-point) for this axis. If no origin is supplied, + the current position is redefined as zero. Alternatively, you can supply + a position (in the current coordinate system of the axis) that should + henceforth be considered zero.''' + + if origin is None: + # Use current position + if self.motor is not None: + self.origin = self.motor.position + + else: + if self.base_stage is None: + print("Error: %s %s has 'base_stage' and 'motor' set to 'None'." % (self.__class__.__name__, self.name)) + else: + self.origin = getattr(self.base_stage, self.name+'pos')(verbosity=0) + + else: + # Use supplied value (in the current coordinate system) + base_position = self.cur_to_base(origin) + self.origin = base_position + + + def set_current_position(self, new_position): + '''Redefines the position value of the current position.''' + current_position = self.get_position(verbosity=0) + self.origin = self.get_origin() + (current_position - new_position)*self.scaling + + def scan(self): + print('todo') + + def center(self): + print('todo') + + def mark(self, label, position=None, verbosity=3): + '''Set a mark for this axis. (By default, the current position is + used.)''' + + if position is None: + position = self.get_position(verbosity=0) + + axes_positions = { self.name : position } + self.stage.mark(label, **axes_positions) + + + + + + # Book-keeping + ######################################## + + def enable(self): + self.enabled = True + + + def disable(self): + self.enabled = False + + + def is_enabled(self): + + return self.enabled and self.stage.is_enabled() + + + def get_origin(self): + + return self.origin + + + def get_units(self): + + if self.units is not None: + return self.units + + else: + return getattr(self.base_stage, self.name+'units')() + + + def get_hint(self, verbosity=3): + '''Return (and print) the "motion hint" associated with this axis. This + hint gives information about the expected directionality of the motion.''' + + if self.hint is not None: + s = '%s\n%s' % (self.hint, self.stage._hint_replacements(self.hint)) + if verbosity>=2: + print(s) + return s + + else: + return getattr(self.base_stage, self.name+'hint')(verbosity=verbosity) + + + def get_info(self, verbosity=3): + '''Returns information about this axis.''' + + self.get_position(verbosity=verbosity) + self.get_hint(verbosity=verbosity) + + + def check_base(self): + if self.base_stage is None: + print("Error: %s %s has 'base_stage' set to 'None'." % (self.__class__.__name__, self.name)) + + + + + + +class Sample_Generic(CoordinateSystem): + """ + The Sample() classes are used to define a single, individual sample. Each + sample is created with a particular name, which is recorded during measurements. + Logging of comments also includes the sample name. Different Sample() classes + can define different defaults for alignment, measurement, etc. + """ + + + # Core methods + ######################################## + def __init__(self, name, base=None, **md): + '''Create a new Sample object. + + Parameters + ---------- + name : str + Name for this sample. + base : Stage + The stage/holder on which this sample sits. + ''' + + if base is None: + base = get_default_stage() + #print("Note: No base/stage/holder specified for sample '{:s}'. Assuming '{:s}' (class {:s})".format(name, base.name, base.__class__.__name__)) + + + super().__init__(name=name, base=base) + + self.name = name + + + self.md = { + 'exposure_time' : 1.0 , + 'measurement_ID' : 1 , + } + self.md.update(md) + + self.naming_scheme = ['name', 'extra', 'exposure_time'] + self.naming_delimeter = '_' + + + # TODO + #if base is not None: + #base.addSample(self) + + + self.reset_clock() + + + def _set_axes_definitions(self): + '''Internal function which defines the axes for this stage. This is kept + as a separate function so that it can be over-ridden easily.''' + + # The _axes_definitions array holds a list of dicts, each defining an axis + self._axes_definitions = [ {'name': 'x', + 'motor': None, + 'enabled': True, + 'scaling': +1.0, + 'units': None, + 'hint': None, + }, + {'name': 'y', + 'motor': None, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': None, + }, + #{'name': 'z', + #'motor': None, + #'enabled': False, + #'scaling': +1.0, + #'units': 'mm', + #'hint': None, + #}, + {'name': 'th', + 'motor': None, + 'enabled': True, + 'scaling': +1.0, + 'units': 'deg', + 'hint': None, + }, + #{'name': 'chi', + #'motor': None, + #'enabled': True, + #'scaling': +1.0, + #'units': 'deg', + #'hint': None, + #}, + #{'name': 'phi', + #'motor': None, + #'enabled': True, + #'scaling': +1.0, + #'units': 'deg', + #'hint': None, + #}, + ] + + + + # Metadata methods + ######################################## + # These involve setting or getting values associated with this sample. + + def clock(self): + '''Return the current value of the "clock" variable. This provides a + way to set a clock/timer for a sample. For instance, you can call + "reset_clock" when you initiate some change to the sample. Thereafter, + the "clock" method lets you check how long it has been since that + event.''' + + clock_delta = time.time() - self.clock_zero + return clock_delta + + + def reset_clock(self): + '''Resets the sample's internal clock/timer to zero.''' + + self.clock_zero = time.time() + + return self.clock() + + + + def get_attribute(self, attribute): + '''Return the value of the requested md.''' + + if attribute in self._axes: + return self._axes[attribute].get_position(verbosity=0) + + if attribute=='name': + return self.name + + if attribute=='clock': + return self.clock() + + + if attribute in self.md: + return self.md[attribute] + + + replacements = { + 'id' : 'measurement_ID' , + 'ID' : 'measurement_ID' , + 'extra' : 'savename_extra' , + } + + if attribute in replacements: + return self.md[replacements[attribute]] + + return None + + + def set_attribute(self, attribute, value): + '''Arbitrary attributes can be set and retrieved. You can use this to + store additional meta-data about the sample. + + WARNING: Currently this meta-data is not saved anywhere. You can opt + to store the information in the sample filename (using "naming"). + ''' + + self.md[attribute] = value + + + def set_md(self, **md): + + self.md.update(md) + + + + def get_md(self, prefix='sample_', include_marks=True, **md): + '''Returns a dictionary of the current metadata. + The 'prefix' argument is prepended to all the md keys, which allows the + metadata to be grouped with other metadata in a clear way. (Especially, + to make it explicit that this metadata came from the sample.)''' + + # Update internal md + #self.md['key'] = value + + + md_return = self.md.copy() + md_return['name'] = self.name + + + if include_marks: + for label, positions in self._marks.items(): + md_return['mark_'+label] = positions + + + # Add md that varies over time + md_return['clock'] = self.clock() + + for axis_name, axis in self._axes.items(): + md_return[axis_name] = axis.get_position(verbosity=0) + md_return['motor_'+axis_name] = axis.get_motor_position(verbosity=0) + + + md_return['savename'] = self.get_savename() # This should be over-ridden by 'measure' + + + # Include the user-specified metadata + md_return.update(md) + + + # Add an optional prefix + if prefix is not None: + md_return = { '{:s}{:s}'.format(prefix, key) : value for key, value in md_return.items() } + + return md_return + + + + + + + # Naming scheme methods + ######################################## + # These allow the user to control how data is named. + + def naming(self, scheme=['name', 'extra', 'exposure_time'], delimeter='_'): + '''This method allows one to define the naming convention that will be + used when storing data for this sample. The "scheme" variable is an array + that lists the various elements one wants to store in the filename. + + Each entry in "scheme" is a string referring to a particular element/ + value. For instance, motor names can be stored ("x", "y", etc.), the + measurement time can be stored, etc.''' + + self.naming_scheme = scheme + self.naming_delimeter = delimeter + + + def get_naming_string(self, attribute): + + # Handle special cases of formatting the text + + if attribute in self._axes: + return '{:s}{:.3f}'.format(attribute, self._axes[attribute].get_position(verbosity=0)) + + if attribute=='clock': + return '{:.1f}s'.format(self.get_attribute(attribute)) + + if attribute=='exposure_time': + return '{:.2f}s'.format(self.get_attribute(attribute)) + + if attribute=='extra': + # Note: Don't eliminate this check; it will not be properly handled + # by the generic call below. When 'extra' is None, we should + # return None, so that it gets skipped entirely. + return self.get_attribute('savename_extra') + + if attribute=='spot_number': + return 'spot{:d}'.format(self.get_attribute(attribute)) + + + # Generically: lookup the attribute and convert to string + + att = self.get_attribute(attribute) + if att is None: + # If the attribute is not found, simply return the text. + # This allows the user to insert arbitrary text info into the + # naming scheme. + return attribute + + else: + return str(att) + + + def get_savename(self, savename_extra=None): + '''Return the filename that will be used to store data for the upcoming + measurement. The method "naming" lets one control what gets stored in + the filename.''' + + if savename_extra is not None: + self.set_attribute('savename_extra', savename_extra) + + attribute_strings = [] + for attribute in self.naming_scheme: + s = self.get_naming_string(attribute) + if s is not None: + attribute_strings.append(s) + + self.set_attribute('savename_extra', None) + + savename = self.naming_delimeter.join(attribute_strings) + + # Avoid 'dangerous' characters + savename = savename.replace(' ', '_') + #savename = savename.replace('.', 'p') + savename = savename.replace('/', '-slash-') + + return savename + + + + # Logging methods + ######################################## + + def comment(self, text, logbooks=None, tags=None, append_md=True, **md): + '''Add a comment related to this sample.''' + + text += '\n\n[comment for sample: {} ({})].'.format(self.name, self.__class__.__name__) + + if append_md: + + md_current = { k : v for k, v in RE.md.items() } # Global md + md_current.update(get_beamline().get_md()) # Beamline md + + # Sample md + md_current.update(self.get_md()) + + # Specified md + md_current.update(md) + + text += '\n\n\nMetadata\n----------------------------------------' + for key, value in sorted(md_current.items()): + text += '\n{}: {}'.format(key, value) + + + logbook.log(text, logbooks=logbooks, tags=tags) + + + def log(self, text, logbooks=None, tags=None, append_md=True, **md): + + if append_md: + + text += '\n\n\nMetadata\n----------------------------------------' + for key, value in sorted(md.items()): + text += '\n{}: {}'.format(key, value) + + logbook.log(text, logbooks=logbooks, tags=tags) + + + + # Measurement methods + ######################################## + + def get_measurement_md(self, prefix=None, **md): + + #md_current = {} + md_current = { k : v for k, v in RE.md.items() } # Global md + md_current['detector_sequence_ID'] = caget('XF:11BMB-ES{Det:SAXS}:cam1:FileNumber_RBV') + + md_current.update(get_beamline().get_md()) + + md_current.update(md) + + # Add an optional prefix + if prefix is not None: + md_return = { '{:s}{:s}'.format(prefix, key) : value for key, value in md_return.items() } + + return md_current + + + def expose(self, exposure_time=None, verbosity=3, poling_period=0.1, **md): + '''Internal function that is called to actually trigger a measurement.''' + + # TODO: Improve this (switch to Bluesky methods) + # TODO: Store metadata + + if 'measure_type' not in md: + md['measure_type'] = 'expose' + self.log('{} for {}.'.format(md['measure_type'], self.name), **md) + + if exposure_time is not None: + # Prep detector + caput('XF:11BMB-ES{Det:SAXS}:cam1:AcquireTime', exposure_time) + caput('XF:11BMB-ES{Det:SAXS}:cam1:AcquirePeriod', exposure_time+0.1) + + + get_beamline().beam.on() + + caput('XF:11BMB-ES{Det:SAXS}:cam1:Acquire', 1) + + if verbosity>=2: + start_time = time.time() + while caget('XF:11BMB-ES{Det:SAXS}:cam1:Acquire')==1 and (time.time()-start_time)<(exposure_time+20): + percentage = 100*(time.time()-start_time)/exposure_time + print( 'Exposing {:6.2f} s ({:3.0f}%) \r'.format((time.time()-start_time), percentage), end='') + time.sleep(poling_period) + else: + time.sleep(exposure_time) + + if verbosity>=3 and caget('XF:11BMB-ES{Det:SAXS}:cam1:Acquire')==1: + print('Warning: Detector still not done acquiring.') + + get_beamline().beam.off() + + + def snap(self, exposure_time=None, extra=None, measure_type='snap', verbosity=3, **md): + '''Take a quick exposure (without saving data).''' + + # TODO: Disable data saving when using 'snap'. + self.measure(exposure_time=exposure_time, extra=extra, measure_type=measure_type, verbosity=verbosity, **md) + + + def measure(self, exposure_time=None, extra=None, measure_type='measure', verbosity=3, **md): + '''Measure data by triggering the area detectors. + + Parameters + ---------- + exposure_time : float + How long to collect data + extra : string, optional + Extra information about this particular measurement (which is typically + included in the savename/filename). + ''' + + if exposure_time is not None: + self.set_attribute('exposure_time', exposure_time) + else: + exposure_time = self.get_attribute('exposure_time') + + savename = self.get_savename(savename_extra=extra) + + caput('XF:11BMB-ES{Det:SAXS}:cam1:FileName', savename) + + if verbosity>=2 and (get_beamline().current_mode != 'measurement'): + print("WARNING: Beamline is not in measurement mode (mode is '{}')".format(get_beamline().current_mode)) + + md_current = self.get_md() + md_current['sample_savename'] = savename + md_current['measure_type'] = measure_type + + md_current.update(self.get_measurement_md()) + md_current['filename'] = '{:s}_{:04d}.tiff'.format(savename, md_current['detector_sequence_ID']) + md_current.update(md) + + self.expose(exposure_time, verbosity=verbosity, **md_current) + + + def measureSpots(self, num_spots=4, translation_amount=0.2, axis='y', exposure_time=None, extra=None, measure_type='measureSpots', **md): + '''Measure multiple spots on the sample.''' + + if 'spot_number' not in self.md: + self.md['spot_number'] = 1 + + + for spot_num in range(num_spots): + + self.measure(exposure_time=exposure_time, extra=extra, measure_type=measure_type, **md) + + getattr(self, axis+'r')(translation_amount) + self.md['spot_number'] += 1 + + + + + def do(self, step=0): + '''Performs the "default action" for this sample. This usually means + aligning the sample, and taking data. + + The 'step' argument can optionally be given to jump to a particular + step in the sequence.''' + + if step<=1: + self.xo() # goto origin + #self.gotoAlignedPosition() + + #if step<=5: + #self.align() + + if step<=10: + self.measure() + + + + +class SampleTSAXS_Generic(Sample_Generic): + + pass + + +class SampleGISAXS_Generic(Sample_Generic): + + def __init__(self, name, base=None, **md): + + super().__init__(name=name, base=base, **md) + self.naming_scheme = ['name', 'extra', 'th', 'exposure_time'] + self.incident_angles_default = [0.08, 0.10, 0.12, 0.15, 0.20] + + + def measureSpots(self, num_spots=2, translation_amount=0.1, axis='x', exposure_time=None, extra=None, measure_type='measureSpots', **md): + super().measureSpots(num_spots=num_spots, translation_amount=translation_amount, axis=axis, exposure_time=exposure_time, extra=extra, measure_type=measure_type, **md) + + + def measureIncidentAngle(self, angle, exposure_time=None, extra=None, **md): + + self.thabs(angle) + + self.measure(exposure_time=exposure_time, extra=extra, **md) + + + def measureIncidentAngles(self, angles=None, exposure_time=None, extra=None, **md): + + if angles is None: + angles = self.incident_angles_default + + for angle in angles: + self.measureIncidentAngle(angle, exposure_time=exposure_time, extra=extra, **md) + + + def align(self): + + # TODO: Check what mode we are in, change if necessary... + # cms.modeAlignment() + + fit_scan(smy, 0.3, 17, fit='sigmoid_r') + fit_scan(sth, 1.2, 21, fit='gauss') + + fit_scan(smy, 0.2, 17, fit='sigmoid_r') + fit_scan(sth, 0.8, 21, fit='gauss') + + + def do(self, step=0): + + if step<=1: + self.xo() # goto origin + + if step<=5: + self.align() + + if step<=10: + self.set_attribute('exposure_time', 5.0) + self.measureIncidentAngles(self.incident_angles_default) + + + + + + + +class Stage(CoordinateSystem): + + pass + + +class SampleStage(Stage): + + def __init__(self, name='SampleStage', base=None, **kwargs): + + super().__init__(name=name, base=base, **kwargs) + + def _set_axes_definitions(self): + '''Internal function which defines the axes for this stage. This is kept + as a separate function so that it can be over-ridden easily.''' + + # The _axes_definitions array holds a list of dicts, each defining an axis + self._axes_definitions = [ {'name': 'x', + 'motor': smx, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves stage left/outboard (beam moves right on sample)', + }, + {'name': 'y', + 'motor': smy, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves stage up (beam moves down on sample)', + }, + {'name': 'th', + 'motor': sth, + 'enabled': True, + 'scaling': +1.0, + 'units': 'deg', + 'hint': 'positive tilts clockwise (positive incident angle)', + }, + ] + + + + +class Holder(Stage): + '''The Holder() classes are used to define bars/stages that hold one or more + samples. This class can thus help to keep track of coordinate conversions, + to store the positions of multiple samples, and to automate the measurement + of multiple samples.''' + + # Core methods + ######################################## + + def __init__(self, name='Holder', base=None, **kwargs): + + super().__init__(name=name, base=base, **kwargs) + + self._samples = {} + + def _set_axes_definitions(self): + '''Internal function which defines the axes for this stage. This is kept + as a separate function so that it can be over-ridden easily.''' + + # The _axes_definitions array holds a list of dicts, each defining an axis + self._axes_definitions = [ {'name': 'x', + 'motor': None, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves stage left/outboard (beam moves right on sample)', + }, + {'name': 'y', + 'motor': None, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves stage up (beam moves down on sample)', + }, + {'name': 'th', + 'motor': None, + 'enabled': True, + 'scaling': +1.0, + 'units': 'deg', + 'hint': 'positive tilts clockwise (positive incident angle)', + }, + ] + + # Sample management + ######################################## + + def addSample(self, sample, sample_number=None): + '''Add a sample to this holder/bar.''' + + if sample_number is None: + if len(self._samples)==0: + sample_number = 1 + else: + ki = [ int(key) for key in self._samples.keys() ] + sample_number = np.max(ki) + 1 + + + if sample_number in self._samples.keys(): + print('Warning: Sample number {} is already defined on holder "{:s}". Use "replaceSample" if you are sure you want to eliminate the existing sample from the holder.'.format(sample_number, self.name) ) + + else: + self._samples[sample_number] = sample + + self._samples[sample_number] = sample + + sample.set_base_stage(self) + sample.md['holder_sample_number'] = sample_number + + + def removeSample(self, sample_number): + '''Remove a particular sample from this holder/bar.''' + + del self._samples[sample_number] + + + def removeSamplesAll(self): + + self._samples = {} + + + def replaceSample(self, sample, sample_number): + '''Replace a given sample on this holder/bar with a different sample.''' + + self.removeSample(sample_number) + self.addSample(sample, sample_number) + + + def getSample(self, sample_number, verbosity=3): + '''Return the requested sample object from this holder/bar. + + One can provide an integer, in which case the corresponding sample + (from the holder's inventory) is returned. If a string is provided, + the closest-matching sample (by name) is returned.''' + + if type(sample_number) is int: + if sample_number not in self._samples: + if verbosity>=1: + print('Error: Sample {} not defined.'.format(sample_number)) + return None + + sample_match = self._samples[sample_number] + + if verbosity>=3: + print('{}: {:s}'.format(sample_number, sample_match.name)) + + return sample_match + + + elif type(sample_number) is str: + + # First search for an exact name match + matches = 0 + sample_match = None + sample_i_match = None + for sample_i, sample in sorted(self._samples.items()): + if sample.name==sample_number: + matches += 1 + if sample_match is None: + sample_match = sample + sample_i_match = sample_i + + if matches==1: + if verbosity>=3: + print('{}: {:s}'.format(sample_i_match, sample_match.name)) + return sample_match + + elif matches>1: + if verbosity>=2: + print('{:d} exact matches for "{:s}", returning sample {}: {:s}'.format(matches, sample_number, sample_i_match, sample_match.name)) + return sample_match + + + # Try to find a 'start of name' match + for sample_i, sample in sorted(self._samples.items()): + if sample.name.startswith(sample_number): + matches += 1 + if sample_match is None: + sample_match = sample + sample_i_match = sample_i + + if matches==1: + if verbosity>=3: + print('Beginning-name match: {}: {:s}'.format(sample_i_match, sample_match.name)) + return sample_match + + elif matches>1: + if verbosity>=2: + print('{:d} beginning-name matches for "{:s}", returning sample {}: {:s}'.format(matches, sample_number, sample_i_match, sample_match.name)) + return sample_match + + # Try to find a substring match + for sample_i, sample in sorted(self._samples.items()): + if sample_number in sample.name: + matches += 1 + if sample_match is None: + sample_match = sample + sample_i_match = sample_i + + if matches==1: + if verbosity>=3: + print('Substring match: {}: {:s}'.format(sample_i_match, sample_match.name)) + return sample_match + + elif matches>1: + if verbosity>=2: + print('{:d} substring matches for "{:s}", returning sample {}: {:s}'.format(matches, sample_number, sample_i_match, sample_match.name)) + return sample_match + + if verbosity>=1: + print('No sample has a name matching "{:s}"'.format(sample_number)) + return None + + + else: + + print('Error: Sample designation "{}" not understood.'.format(sample_number)) + return None + + + def getSamples(self, range=None, verbosity=3): + '''Get the list of samples associated with this holder. + + If the optional range argument is provided (2-tuple), then only sample + numbers within that range (inclusive) are run. If range is instead a + string, then all samples with names that match are returned.''' + + samples = [] + + if range is None: + for sample_number in sorted(self._samples): + samples.append(self._samples[sample_number]) + + elif type(range) is list: + start, stop = range + for sample_number in sorted(self._samples): + if sample_number>=start and sample_number<=stop: + samples.append(self._samples[sample_number]) + + elif type(range) is str: + for sample_number, sample in sorted(self._samples.items()): + if range in sample.name: + samples.append(sample) + + else: + if verbosity>=1: + print('Range argument "{}" not understood.'.format(range)) + + + return samples + + + def listSamples(self): + '''Print a list of the current samples associated with this holder/ + bar.''' + + for sample_number, sample in sorted(self._samples.items()): + print( '{}: {:s}'.format(sample_number, sample.name) ) + + + def gotoSample(self, sample_number): + + sample = self.getSample(sample_number, verbosity=0) + sample.gotoAlignedPosition() + + return sample + + + # Action (measurement) methods + ######################################## + + def doSamples(self, range=None): + '''Activate the default action (typically measurement) for all the samples. + + If the optional range argument is provided (2-tuple), then only sample + numbers within that range (inclusive) are run. If range is instead a + string, then all samples with names that match are returned.''' + + for sample in self.getSamples(range=range): + sample.do() + + + + +class PositionalHolder(Holder): + '''This class is a sample holder that is one-dimensional. E.g. a bar with a + set of samples lined up, or a holder with a set number of slots for holding + samples. This class thus helps to associate each sample with its position + on the bar.''' + + # Core methods + ######################################## + + def __init__(self, name='PositionalHolder', base=None, **kwargs): + + super().__init__(name=name, base=base, **kwargs) + + self._positional_axis = 'x' + + + # Sample management + ######################################## + + def slot(self, sample_number): + '''Moves to the selected slot in the holder.''' + + getattr(self, self._positional_axis+'abs')( self.get_slot_position(sample_number) ) + + + def get_slot_position(self, slot): + '''Return the motor position for the requested slot number.''' + # This method should be over-ridden in sub-classes, so as to properly + # implement the positioning appropriate for that holder. + + position = 0.0 + slot*1.0 + + return position + + + def addSampleSlot(self, sample, slot): + '''Adds a sample to the specified "slot" (defined/numbered sample + holding spot on this holder).''' + + self.addSample(sample, sample_number=slot) + sample.setOrigin( [self._positional_axis], [self.get_slot_position(slot)] ) + + + def listSamplesPositions(self): + '''Print a list of the current samples associated with this holder/ + bar.''' + + for sample_number, sample in self._samples.items(): + pos = getattr(sample, self._positional_axis+'pos')(verbosity=0) + print( '%s: %s (%s = %.3f)' % (str(sample_number), sample.name, self._positional_axis, pos) ) + + + +class CapillaryHolder(PositionalHolder): + '''This class is a sample holder that has 15 slots for capillaries.''' + + # Core methods + ######################################## + + def __init__(self, name='CapillaryHolder', base=None, **kwargs): + + super().__init__(name=name, base=base, **kwargs) + + self._positional_axis = 'x' + + self.x_spacing = 6.342 # 3.5 inches / 14 spaces + + # Set the x and y origin to be the center of slot 8 + self.xsetOrigin(0.00) + self.ysetOrigin(0.00) + + + def get_slot_position(self, slot): + '''Return the motor position for the requested slot number.''' + + return self.x_spacing*(slot-8) + + +stg = SampleStage() +def get_default_stage(): + return stg + + + + +if False: + # For testing: + # %run -i /opt/ipython_profiles/profile_collection/startup/94-sample.py + sam = SampleGISAXS_Generic('testing_of_code') + sam.mark('here') + #sam.mark('XY_field', 'x', 'y') + #sam.mark('specified', x=1, th=0.1) + #sam.naming(['name', 'extra', 'clock', 'th', 'exposure_time', 'id']) + #sam.thsetOrigin(0.5) + #sam.marks() + + + hol = CapillaryHolder(base=stg) + hol.addSampleSlot( SampleGISAXS_Generic('test_sample_01'), 1.0 ) + hol.addSampleSlot( SampleGISAXS_Generic('test_sample_02'), 3.0 ) + hol.addSampleSlot( SampleGISAXS_Generic('test_sample_03'), 5.0 ) + + sam = hol.getSample(1) + + + diff --git a/startup/96-automation.py b/startup/96-automation.py new file mode 100644 index 0000000..5013e3e --- /dev/null +++ b/startup/96-automation.py @@ -0,0 +1,346 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# vi: ts=4 sw=4 + + + + +################################################################################ +# Classes for controlling the robotics and automation on the beamline. +################################################################################ +# Known Bugs: +# N/A +################################################################################ +# TODO: +# Search for "TODO" below. +################################################################################ + + + + + +class SampleExchangeRobot(Stage): + + def __init__(self, name='SampleExchangeRobot', base=None, **kwargs): + + super().__init__(name=name, base=base, **kwargs) + + self._sample = None + + # The region can be: + # 'safe' : arm won't collid with anything, it is near the (+,+,+) limit of its travel. + # 'parking' : arm is close to the parking lot (movement may hit a sample) + # 'stage' : arm is close to the sample stage/stack (movement may collide with stack, on-axis camera, or downstream window) + # 'undefined' : position is unknown (do not assume it is safe to move!) + self._region = 'undefined' + + + + # self.yabs(-82.0) # Good height for 'slotted approach' + # self.yabs(-77.0) # Good height for 'grip' (grip-screws sitting at bottom of wells) + # self.yabs(-67.0) # Good height for 'hover' (sample held above stage) + self._delta_y_hover = 10.0 + self._delta_y_slot = 5.0 + + self._position_safe = [0, -82.0, -94.8, 0.0, -90] # x, y, z, r, phi + self._position_safe_rotate = [0, -82.0, -15.0, 0.0, -90] # x, y, z, r, phi + self._position_sample_gripped = [-101, -77, -94.8, 18.0, -90] # x, y, z, r, phi + + + self._position_stg_exchange = [] # smx, smy + self._position_stg_measure = [] # smx, smy + + + + def _set_axes_definitions(self): + '''Internal function which defines the axes for this stage. This is kept + as a separate function so that it can be over-ridden easily.''' + + # The _axes_definitions array holds a list of dicts, each defining an axis + self._axes_definitions = [ {'name': 'x', + 'motor': armx, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves left/outboard', + }, + {'name': 'r', + 'motor': armr, + #'motor': strans, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves radial arm outwards', + }, + {'name': 'y', + 'motor': army, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves arm up', + }, + {'name': 'z', + 'motor': armz, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves arm downstream', + }, + {'name': 'phi', + 'motor': armphi, + 'enabled': True, + 'scaling': +1.0, + 'units': 'mm', + 'hint': 'positive moves arm downstream', + }, + + ] + + + def guessRegion(self): + '''Determine where the arm is located.''' + + # TODO + pass + + + def home(self, verbosity=3, delays=0.5): + '''Home the axes, so that one can now trust the position values.''' + + if not self.checkSafe(): + return + + + # army to positive limit (moves arm to top of vertical range); set this to be zero + caput('XF:11BMB-ES{SM:1-Ax:Y}Start:Home-Cmd', 1) + sleep(delays) + while army.moving: + sleep(delays) + + # armx to positive limit; set this to be zero + caput('XF:11BMB-ES{SM:1-Ax:X}Start:Home-Cmd', 1) + sleep(delays) + while armx.moving: + sleep(delays) + + # Rotate arm so that it doesn't collide when doing a +z scan + self.phiabs(-90) # gripper pointing -x (towards sample stack) + sleep(delays) + while armphi.moving: + sleep(delays) + + + # armz to positive limit (moves arm to downstream of range); set this to be zero + caput('XF:11BMB-ES{SM:1-Ax:Z}Start:Home-Cmd', 1) + sleep(delays) + while armz.moving: + sleep(delays) + + + #caput('XF:11BMB-ES{SM:1-Ax:ArmR}Mtr.HOMF',1) # armr home forward + #caput('XF:11BMB-ES{SM:1-Ax:ArmR}Mtr.HOMR',1) # armr home reverse + sleep(delays) + while self._axes['r'].motor.moving: + sleep(delays) + + + + self.zabs(-15, verbosity=verbosity) # This is far enough that the arm can rotate without colliding with the downstream wall + self.phiabs(-180, verbosity=verbosity) # gripper pointing -z (towards parking lot; upstream) + + self._region = 'safe' + + + def checkSafe(self): + + if self._region is not 'safe': + print("ERROR: Robot arm must start in the 'safe' region of the chamber (current region is '{}'). Move the robot to the safe region (and/or set _region to 'safe').".format(self._region)) + return False + + return True + + + def sequenceArmSafe(self, verbosity=3): + + if not self.checkSafe(): + return + + self.zabs(-25, verbosity=verbosity) # self._position_safe_rotate[2] + self.phiabs(-180) + self.yabs(-1) + self.xabs(-1) + + + + def sequencePutSampleOntoStage(self, verbosity=3): + + if self._sample is None: + print("ERROR: No sample currently being gripped by robot arm.") + return + + if not self.checkSafe(): + return + + + x, y, z, r, phi = self._position_sample_gripped + + # TODO: Move sample stage + + # Pre-align the arm in (y,z) + if abs(self.phipos(verbosity=0)-phi)>0.5: + self.zabs(-25, verbosity=verbosity) # self._position_safe_rotate[2] + self.phiabs(phi, verbosity=verbosity) + self.zabs(z, verbosity=verbosity) + self.yabs(y+self._delta_y_hover, verbosity=verbosity) + + self._region = 'stage' + # Push the sample out so that it is hovering above the stage + mov([armx, self._axes['r'].motor], [x, r]) + + # Move sample down (-y) + self.yr(-self._delta_y_hover, verbosity=verbosity) # Now in contact with stage + + # De-grip + self.yr(-self._delta_y_slot, verbosity=verbosity) + self._sample = None + + # Move away from stage + x, y, z, r, phi = self._position_safe + mov([armx, self._axes['r'].motor], [x, r]) + self._region = 'safe' + + + def sequenceGetSampleFromStage(self, verbosity=3): + + if self._sample is not None: + print("ERROR: There is already a sample being gripped by robot arm (sample {}.".format(self._sample.name)) + return + + if not self.checkSafe(): + return + + + # TODO: Move sample stage + + x, y, z, r, phi = self._position_sample_gripped + + # Pre-align the arm in (y,z) + if abs(self.phipos(verbosity=0)-phi)>0.5: + self.zabs(-25, verbosity=verbosity) # self._position_safe_rotate[2] + self.phiabs(phi, verbosity=verbosity) + self.zabs(z, verbosity=verbosity) + self.yabs(y-self._delta_y_slot, verbosity=verbosity) + + self._region = 'stage' + # Move arm so that it is slotted over the sample + self.xabs(x, verbosity=verbosity) + self.rabs(r, verbosity=verbosity) + + # Grip sample + self.yr(+self._delta_y_slot, verbosity=verbosity) + # Pick sample up (+y) + self.yr(+self._delta_y_hover, verbosity=verbosity) + + # Move away from stage + x, y, z, r, phi = self._position_safe + mov([armx, self._axes['r'].motor], [x, r]) + self._region = 'safe' + + + + def sequencePrepPark(self, verbosity=3): + '''Rotate the arm so that it is 'pointed' at the parking lot.''' + + if not self.checkSafe(): + return + + phi_park = -180 + + if abs(self.phipos(verbosity=0)-(phi_park))>0.5: + x, y, z, r, phi = self._position_safe_rotate + self.zabs(z, verbosity=verbosity) + self.xabs(x, verbosity=verbosity) + self.rabs(r, verbosity=verbosity) + + self.phiabs(phi_park, verbosity=verbosity) + + + def sequenceParkSample(self, parking_spot, verbosity=3): + + if self._sample is None: + print("ERROR: No sample currently being gripped by robot arm.") + return + + if not self.checkSafe(): + return + + + # TODO: Implement moving to correct parking spot in (x,y) + + + self._region = 'parking' + self.zabs(-80, verbosity=verbosity) # TODO: Fix + + # Move sample down (-y) + self.yr(-self._delta_y_hover, verbosity=verbosity) # Now in contact with parking spot + + # De-grip + self.yr(-self._delta_y_slot, verbosity=verbosity) + self._sample = None + + # Move away + self.zabs(-25, verbosity=verbosity) + self._region = 'safe' + + + def sequenceUnparkSample(self, parking_spot, verbosity=3): + + if self._sample is not None: + print("ERROR: There is already a sample being gripped by robot arm (sample {}.".format(self._sample.name)) + return + + if not self.checkSafe(): + return + + + # TODO: Check parking spot is free + + + # TODO: Implement moving to correct parking spot in (x,y) + + + self._region = 'parking' + self.zabs(-80, verbosity=verbosity) # TODO: Fix + + # Grip sample + self.yr(+self._delta_y_slot, verbosity=verbosity) + # Pick sample up (+y) + self.yr(+self._delta_y_hover, verbosity=verbosity) + # self._sample = TODO + + + # Move away + self.zabs(-25, verbosity=verbosity) + self._region = 'safe' + + + + + + + + + +class Queue(object): + """ + Holds the current state of the sample queue, allowing samples settings + to be 'extracted'; or even allowing a particular sample to be physically + loaded. + """ + + pass + + + + +robot = SampleExchangeRobot() diff --git a/startup/98-user.py b/startup/98-user.py new file mode 100644 index 0000000..a97c3b4 --- /dev/null +++ b/startup/98-user.py @@ -0,0 +1,63 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# vi: ts=4 sw=4 + + + + +################################################################################ +# Short-term settings (specific to a particular user/experiment) can +# be placed in this file. You may instead wish to make a copy of this file in +# the user's data directory, and use that as a working copy. +################################################################################ + + +#logbooks_default = ['User Experiments'] +#tags_default = ['CFN Soft-Bio'] + + +if False: + # The following shortcuts can be used for unit conversions. For instance, + # for a motor operating in 'mm' units, one could instead do: + # sam.xr( 10*um ) + # To move it by 10 micrometers. HOWEVER, one must be careful if using + # these conversion parameters, since they make implicit assumptions. + # For instance, they assume linear axes are all using 'mm' units. Conversely, + # you will not receive an error if you try to use 'um' for a rotation axis! + m = 1e3 + cm = 10.0 + mm = 1.0 + um = 1e-3 + nm = 1e-6 + + inch = 25.4 + + deg = 1.0 + rad = np.degrees(1.0) + mrad = np.degrees(1e-3) + urad = np.degrees(1e-6) + + +def get_default_stage(): + return stg + + +class SampleTSAXS(SampleTSAXS_Generic): + + def __init__(self, name, base=None, **md): + super().__init__(name=name, base=base, **md) + self.naming_scheme = ['name', 'extra', 'exposure_time'] + + +class SampleGISAXS(SampleGISAXS_Generic): + + def __init__(self, name, base=None, **md): + super().__init__(name=name, base=base, **md) + self.naming_scheme = ['name', 'extra', 'th', 'exposure_time'] + + +class Sample(SampleTSAXS): + + def goto(self, label, verbosity=3, **additional): + super().goto(label, verbosity=verbosity, **additional) + # You can add customized 'goto' behavior here \ No newline at end of file