-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpfeiffer.py
executable file
·102 lines (76 loc) · 3.42 KB
/
pfeiffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python
import os
import sys
from daemon import SimpleFactory, SimpleProtocol
from twisted.internet.serialport import SerialPort
from twisted.internet.task import LoopingCall
from command import Command
from optparse import OptionParser
# Example code with server daemon and outgoing connection to hardware
class DaemonProtocol(SimpleProtocol):
# _debug = True # Display all traffic for debug purposes
def processMessage(self, string):
# It will handle some generic messages and return pre-parsed Command object
cmd = SimpleProtocol.processMessage(self, string)
if cmd is None:
return
obj = self.object # Object holding the state
hw = obj['hw'] # HW factory
if cmd.name == 'get_status':
self.message('status hw_connected=%s status=%d pressure=%g' %
(self.object['hw_connected'], self.object['status'], self.object['pressure']))
else:
if obj['hw_connected']:
# Pass all other commands directly to hardware
hw.protocol.message(string)
class HWProtocol(SimpleProtocol):
# _debug = True # Display all traffic for debug purposes
def connectionMade(self):
self.object['hw_connected'] = 1
# SimpleProtocol.connectionMade(self)
self._updateTimer = LoopingCall(self.update)
self._updateTimer.start(self._refresh)
def connectionLost(self, reason):
self.object['hw_connected'] = 0
self.object['status'] = -1
self.object['pressure'] = 0
# SimpleProtocol.connectionLost(self, reason)
self._updateTimer.stop()
def processMessage(self, string):
# Process the device reply
if self._debug:
print "hw > %s" % string
if len(string) and string[0] >= '0' and string[0] <= '6' and 'E' in string:
# b,sx.xxxxEsxx
self.object['status'] = int(string[0])
self.object['pressure'] = float(string[2:])
def message(self, string):
"""Sending outgoing message"""
if self._debug:
print ">> serial >>", string
self.transport.write(string)
self.transport.write("\r\n")
def update(self):
# Request the hardware state from the device
self.message('COM') # Start periodic reporting of status
if __name__ == '__main__':
parser = OptionParser(usage="usage: %prog [options] arg")
parser.add_option('-P', '--hw-port', help='Hardware port to connect', action='store', dest='hw_port', default='/dev/ttyUSB0')
parser.add_option('-p', '--port', help='Daemon port', action='store', dest='port', type='int', default=7023)
parser.add_option('-n', '--name', help='Daemon name', action='store', dest='name', default='pfeiffer')
(options, args) = parser.parse_args()
# Object holding actual state and work logic.
# May be anything that will be passed by reference - list, dict, object etc
obj = {'hw_connected': 0, 'status': -1, 'pressure': 0}
# Factories for daemon and hardware connections
# We need two different factories as the protocols are different
daemon = SimpleFactory(DaemonProtocol, obj)
proto = HWProtocol()
proto.object = obj
hw = SerialPort(proto, options.hw_port, daemon._reactor, baudrate=9600)
daemon.name = options.name
obj['daemon'] = daemon
obj['hw'] = hw
# Incoming connections
daemon.listen(options.port)
daemon._reactor.run()