-
Notifications
You must be signed in to change notification settings - Fork 0
/
noolite-receiver.py
118 lines (92 loc) · 3.29 KB
/
noolite-receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# -*- coding=utf-8 -*-
from datetime import datetime, timedelta
import sys
import serial
from subprocess import call
import logging
logger = logging.getLogger(__name__)
LAST_COMMAND_TIME = datetime.now() - timedelta(days=1)
class EventHook(object):
def __init__(self):
self.__handlers = []
def __iadd__(self, handler):
self.__handlers.append(handler)
return self
def __isub__(self, handler):
self.__handlers.remove(handler)
return self
def fire(self, *args, **keywargs):
for handler in self.__handlers:
handler(*args, **keywargs)
def clearObjectHandlers(self, inObject):
for theHandler in self.__handlers:
if theHandler.im_self == inObject:
self -= theHandler
class SerialPort(object):
READ_MESSAGE_LENGTH = 17
CHECK_BIT_MAX_RANGE = 8
CHECK_BIT_MASK = 0xFF
CHECK_BIT_POSITION = 15
def __init__(self, port='/dev/ttyAMA0', baudrate=9600, timeout=1):
self.serial = serial.Serial(port, baudrate=baudrate, timeout=timeout)
self.serial.flushInput()
self.serial.flushOutput()
self.onMessage = EventHook()
def publish(self, command):
check_bit = 0
for i in range(0, self.CHECK_BIT_MAX_RANGE):
check_bit += command[i]
check_bit = check_bit & self.CHECK_BIT_MASK
command[self.CHECK_BIT_POSITION] = check_bit
buffer = bytearray(command)
self.serial.write(buffer)
logger.debug('Serial data written: %s', repr(buffer))
def consume(self):
while True:
data = self.serial.read(self.READ_MESSAGE_LENGTH)
data = list(map(ord,data))
if data:
logger.debug('Serial data received: %s', data)
self.onMessage.fire(data)
def close(self):
self.serial.close()
if __name__ == "__main__":
CMD_CLICK = 4
CMD_HOLD = 5
CMD_REALEASE = 10
CMD_DBL_CLICK = 101
logger.setLevel(logging.DEBUG)
log = logging.StreamHandler(sys.stdout)
log.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
logger.addHandler(log)
port = SerialPort()
def command_handler(command):
global LAST_COMMAND_TIME
cmd_num = command[4]
cmd_mode = command[5]
if (datetime.now() - LAST_COMMAND_TIME).total_seconds() < 1:
cmd_mode = CMD_DBL_CLICK
logger.info('DOWBLE-CLICK!')
LAST_COMMAND_TIME = datetime.now()
logger.info('Command received: %s', cmd_num)
if cmd_mode == CMD_DBL_CLICK:
if cmd_num == 1:
call(["mpc", "next"])
if cmd_mode == CMD_CLICK:
if cmd_num == 1:
call(["mpc", "play"])
if cmd_num == 2:
call(["mpc", "pause"])
if cmd_mode == CMD_HOLD:
if cmd_num == 1:
call(["mpc", "volume", "+5"])
if cmd_num == 2:
call(["mpc", "volume", "-5"])
def command_handler_plain(command):
call(["mpc", "toggle"])
port.onMessage += command_handler_plain
try:
port.consume()
except KeyboardInterrupt:
logger.error('Interrupted by keyboard')
port.close()