-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBeatSaberMqttProxy.py
141 lines (98 loc) · 3.58 KB
/
BeatSaberMqttProxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python
import asyncio
import websockets
import json
import logging
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
def on_publish(client, userdata, mid):
print("mid: "+str(mid))
def on_message(client, userdata, message):
msg = message.payload.decode("utf-8")
print('received message: {}'.format(msg))
def parse_json(input_json):
json_content = json.loads(input_json)
current_event = json_content["event"]
if current_event == "noteCut" or current_event == "noteFullyCut":
event_note_cut(json_content["noteCut"])
elif current_event == "bombCut":
event_bomb_cut()
elif current_event == "beatmapEvent":
event_beat_map(json_content["beatmapEvent"])
else:
print("other event: " + current_event)
def event_note_cut(note_cut_object):
print("Note Cut")
event_note_cut_parse(note_cut_object)
def event_note_cut_parse(note_cut_object):
saber_type = note_cut_object["saberType"]
note_type = note_cut_object["noteType"]
light_value = 0
if note_type == "NoteA":
light_value = 7
elif note_type == "NoteB":
light_value = 3
if saber_type == "SaberA":
trigger_saber_a(light_value)
elif saber_type == "SaberB":
trigger_saber_b(light_value)
def trigger_saber_a(light_value):
print("saber a: " + str(light_value))
mqtt_publish_saber("saber/a", light_value)
def trigger_saber_b(light_value):
print("saber b: " + str(light_value))
mqtt_publish_saber("saber/b", light_value)
def mqtt_publish_saber(saber_type, light_value):
publish_path = 'beatsaber/' + str(saber_type)
client.publish(publish_path, light_value, qos=0, retain=True)
def event_bomb_cut():
print("Bomb Cut")
def event_beat_map(event_object):
# print("Beatmap")
event_beat_map_parse(event_object)
def event_beat_map_parse(beatmap_event_object):
type = beatmap_event_object["type"]
value = beatmap_event_object["value"]
if 0 < type < 5:
if type == 0:
trigger_light_small(value)
elif type == 1:
trigger_light_big(value)
elif type == 2:
trigger_light_left(value)
elif type == 3:
trigger_light_right(value)
elif type == 4:
trigger_light_center(value)
def mqtt_publish_light(light_type, light_value):
publish_path = 'beatsaber/light/' + str(light_type)
client.publish(publish_path, light_value, qos=0, retain=True)
def trigger_light_small(value):
mqtt_publish_light(str('small'), str(value))
print("light small " + str(value))
def trigger_light_big(value):
mqtt_publish_light(str('big'), str(value))
print("light big " + str(value))
def trigger_light_center(value):
mqtt_publish_light(str('center'), str(value))
print("light center " + str(value))
def trigger_light_left(value):
mqtt_publish_light(str('left'), str(value))
print("light left " + str(value))
def trigger_light_right(value):
mqtt_publish_light(str('right'), str(value))
print("light right " + str(value))
async def loop_websocket():
async with websockets.connect('ws://localhost:6557/socket') as websocket:
while True:
result = await websocket.recv()
# print(f"< {result}")
parse_json(result)
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("10.42.0.244", 1883, 60)
client.loop_start()
asyncio.get_event_loop().run_until_complete(loop_websocket())