-
Notifications
You must be signed in to change notification settings - Fork 0
/
chiaki_testclient.py
executable file
·361 lines (315 loc) · 13.2 KB
/
chiaki_testclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#!/usr/bin/env python3
from PyQt6.QtCore import QSettings
from chiaki_streamsession import ChiakiStreamSession, JoyButtons, JoyAxes
import argparse
from evdev import ecodes, list_devices, AbsInfo, InputDevice
import sys, select, tty, termios
import numpy as np
from numpy_ringbuffer import RingBuffer
import pyformulas as pf
import matplotlib.pyplot as plt
from matplotlib.ticker import (AutoMinorLocator, MultipleLocator)
from time import time, sleep
import scipy
import pygame
from eventtimer import EventTimer
def print_event(e):
if e.type == ecodes.EV_SYN:
if e.code == ecodes.SYN_MT_REPORT:
msg = "time {:<16} +++++++++ {} ++++++++"
else:
msg = "time {:<16} --------- {} --------"
print(msg.format(e.timestamp(), ecodes.SYN[e.code]))
else:
if e.type in ecodes.bytype:
codename = ecodes.bytype[e.type][e.code]
else:
codename = "?"
evfmt = "time {:<16} type {} ({}), code {:<4} ({}), value {}"
print(evfmt.format(e.timestamp(), e.type, ecodes.EV[e.type], e.code, codename, e.value))
def expo_stick_remap(inscale, outscale,str, dz, value):
normval = np.sign(value)*max((abs(value)-dz),0)/(inscale-dz)
mapval = (1-str)*normval + str*np.power(normval,3.0)
return int(np.clip(mapval, -1.0, 1.0)*outscale)
"""
Xbox Elite event mappings:
type = EV_ABS for analog, EV_KEY for buttons
Right stick: ABS_RX and ABS_RY, looks like -32767 to 32767
Left stick: ABS_X and ABS_Y
Left trigger: ABS_Z
Right trigger: ABS_RZ
Dpad: ABS_HAT0X/0Y, values are -1,0,+1
Xbox button: BTN_MODE
Share? : BTN_SELECT
Hamburger? : BTN_START
A: BTN_A
B: BTN_B
X: BTN_X
Y: BTN_Y
Bumper R: BTN_TR
Bumper L: BTN_TL
Left Thumb: BTN_THUMBL
Right Thumb: BTN_THUMBR
"""
def handle_evdev_event(ss,e):
if e.type == ecodes.EV_KEY:
match e.code:
case ecodes.KEY_Q: # obsolete, leaving it here just in case
ss.Stop()
exit()
case k if k in [ecodes.KEY_A, ecodes.KEY_LEFT]:
ss.HandleButtonEvent(JoyButtons.DPAD_LEFT, e.value)
case k if k in [ecodes.KEY_D, ecodes.KEY_RIGHT]:
ss.HandleButtonEvent(JoyButtons.DPAD_RIGHT, e.value)
case k if k in [ecodes.KEY_W, ecodes.KEY_UP]:
ss.HandleButtonEvent(JoyButtons.DPAD_UP, e.value)
case k if k in [ecodes.KEY_S, ecodes.KEY_DOWN]:
ss.HandleButtonEvent(JoyButtons.DPAD_DOWN, e.value)
case k if k in [ecodes.BTN_MODE]:
ss.HandleButtonEvent(JoyButtons.TOUCHPAD, e.value)
case k if k in [ecodes.BTN_SELECT]:
ss.HandleButtonEvent(JoyButtons.PS, e.value)
case k if k in [ecodes.BTN_START]:
ss.HandleButtonEvent(JoyButtons.OPTIONS, e.value)
case k if k in [ecodes.BTN_THUMBL]:
ss.HandleButtonEvent(JoyButtons.L3, e.value)
case k if k in [ecodes.BTN_THUMBR]:
ss.HandleButtonEvent(JoyButtons.R3, e.value)
case k if k in [ecodes.BTN_TL]:
ss.HandleButtonEvent(JoyButtons.L1, e.value)
case k if k in [ecodes.BTN_TR]:
ss.HandleButtonEvent(JoyButtons.R1, e.value)
case k if k in [ecodes.BTN_A]:
ss.HandleButtonEvent(JoyButtons.CROSS, e.value)
case k if k in [ecodes.BTN_B]:
ss.HandleButtonEvent(JoyButtons.MOON, e.value)
case k if k in [ecodes.BTN_X]:
ss.HandleButtonEvent(JoyButtons.BOX, e.value)
case k if k in [ecodes.BTN_Y]:
ss.HandleButtonEvent(JoyButtons.PYRAMID, e.value)
if e.type == ecodes.EV_ABS:
match e.code:
case ecodes.ABS_X:
ss.HandleAxisEvent(JoyAxes.LX, expo_stick_remap(32767, 32767, 0.5, 1024, e.value))
case ecodes.ABS_Y:
ss.HandleAxisEvent(JoyAxes.LY, expo_stick_remap(32767, 32767, 0.5, 1024, e.value))
case ecodes.ABS_Z:
ss.HandleAxisEvent(JoyAxes.LZ, e.value >> 2)
case ecodes.ABS_RX:
ss.HandleAxisEvent(JoyAxes.RX, expo_stick_remap(32767, 32767, 0.5, 0, e.value))
case ecodes.ABS_RY:
ss.HandleAxisEvent(JoyAxes.RY, expo_stick_remap(32767, 32767, 0.5, 0, e.value))
case ecodes.ABS_RZ:
ss.HandleAxisEvent(JoyAxes.RZ, e.value >> 2)
case ecodes.ABS_HAT0X:
ss.HandleButtonEvent(JoyButtons.DPAD_LEFT, (e.value == -1))
ss.HandleButtonEvent(JoyButtons.DPAD_RIGHT, (e.value == 1))
case ecodes.ABS_HAT0Y:
ss.HandleButtonEvent(JoyButtons.DPAD_UP, (e.value == -1))
ss.HandleButtonEvent(JoyButtons.DPAD_DOWN, (e.value == 1))
'''
Xbox Elite mappings, pygame edition:
Axes:
0 : LX
1 : LY
2 : LZ (trigger)
3 : RX
4 : RY
5 : RZ (trigger)
Buttons:
0: A
1: B
2: X
3: Y
4: LT
5: RT
6: Windows???
7: Hamburger?
8: Xbox button
9: Left Thumbstick click
10: Right Thumbstick click
11: Upper right paddle
12: Lower right paddle
13: Upper Left paddle
14: Lower left paddle
Why does xpad give the right paddles lower values than left???
'''
def handle_pygame_event(ss,e):
global joysticks
if not hasattr(handle_pygame_event, "ievtimer"):
handle_pygame_event.ievtimer = EventTimer() # it doesn't exist yet, so initialize it
handle_pygame_event.ievtimer.start(0.004, periodic=True)
if e.type == pygame.JOYDEVICEADDED:
# This event will be generated when the program starts for every
# joystick, filling up the list without needing to create them manually.
joy = pygame.joystick.Joystick(e.device_index)
joysticks[joy.get_instance_id()] = joy
print(f"Joystick {joy.get_instance_id()} connencted")
if e.type == pygame.JOYDEVICEREMOVED:
del joysticks[e.instance_id]
print(f"Joystick {e.instance_id} disconnected")
if (e.type == pygame.JOYBUTTONDOWN) or (e.type == pygame.JOYBUTTONUP):
value = (e.type == pygame.JOYBUTTONDOWN)
match e.button:
case 8: #Xbox button
ss.HandleButtonEvent(JoyButtons.TOUCHPAD, value)
case 6: #Share? Select? Guide?
ss.HandleButtonEvent(JoyButtons.PS, value)
case 7: #Hamburger?
ss.HandleButtonEvent(JoyButtons.OPTIONS, value)
case k if k in [9, 14]: #Left thumb or lower left paddle
ss.HandleButtonEvent(JoyButtons.L3, value)
case k if k in [10, 12]: #Right thumb or lower right paddle
ss.HandleButtonEvent(JoyButtons.R3, value)
case k if k in [4, 13]: #LT or upper left paddle
ss.HandleButtonEvent(JoyButtons.L1, value)
case k if k in [5, 11]: #RT or upper right paddle
ss.HandleButtonEvent(JoyButtons.R1, value)
case 0: #A
ss.HandleButtonEvent(JoyButtons.CROSS, value)
case 1: #B
ss.HandleButtonEvent(JoyButtons.MOON, value)
case 2: #X
ss.HandleButtonEvent(JoyButtons.BOX, value)
case 3: #Y
ss.HandleButtonEvent(JoyButtons.PYRAMID, value)
if e.type == pygame.JOYAXISMOTION:
match e.axis:
case 0:
ss.HandleAxisEvent(JoyAxes.LX, int(expo_stick_remap(1.0, 32767, 0.5, 0.025, e.value)))
case 1:
ss.HandleAxisEvent(JoyAxes.LY, int(expo_stick_remap(1.0, 32767, 0.5, 0.025, e.value)))
case 2:
ss.HandleAxisEvent(JoyAxes.LZ, int(max(0,(e.value+1)*255/2)))
case 3:
ss.HandleAxisEvent(JoyAxes.RX, int(expo_stick_remap(1.0, 32767, 0.5, 0.0, e.value)))
case 4:
ss.HandleAxisEvent(JoyAxes.RY, int(expo_stick_remap(1.0, 32767, 0.5, 0.0, e.value)))
case 5:
ss.HandleAxisEvent(JoyAxes.RZ, int(max(0,(e.value+1)*255/2)))
if e.type == pygame.JOYHATMOTION:
if(e.hat == 0):
ss.HandleButtonEvent(JoyButtons.DPAD_LEFT, (e.value[0] == -1))
ss.HandleButtonEvent(JoyButtons.DPAD_RIGHT, (e.value[0] == 1))
ss.HandleButtonEvent(JoyButtons.DPAD_UP, (e.value[1] == 1))
ss.HandleButtonEvent(JoyButtons.DPAD_DOWN, (e.value[1] == -1))
if(handle_pygame_event.ievtimer.check()):
ss.SendFeedbackState()
def haptics_callback(data):
# requires https://github.com/eric-wieser/numpy_ringbuffer/pull/18
rbuf.extend(data/32767) #normalize to the equivalent of 1V p-p
global last_haptic_time
last_haptic_time = time()
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--host', required=True,
help='PS5 to connect to')
args = vars(ap.parse_args())
chiaki_settings = QSettings('Chiaki', 'Chiaki')
regkey = chiaki_settings.value('registered_hosts/1/rp_regist_key')
morning = chiaki_settings.value('registered_hosts/1/rp_key')
ss = ChiakiStreamSession(host=args['host'], regkey=regkey.data(), rpkey=morning.data())
ss.set_haptics_callback(haptics_callback)
if(0):
xonedev = InputDevice('/dev/input/event20')
indevices = [xonedev]
fd_to_device = {dev.fd: dev for dev in indevices}
ss.Start()
while True:
if(0): #disable evdev for now, keep the code for potential future use after refactoring
r, w, e = select.select(fd_to_device, [], [], 0)
for fd in r:
for event in fd_to_device[fd].read():
#print_event(event)
handle_event(ss, event)
'''
Using evdev for this results in the quit function being executed any time we press q - even elsewhere.
So read sys.stdin instead. In the process, we've dropped support for all other keyboard inputs since controller input is fully functional now
'''
r, w, e = select.select([sys.stdin], [], [], 0)
global plot_pause
for fd in r:
if fd == sys.stdin:
inchar = sys.stdin.read(1)
if(inchar == 'q'):
ss.Stop()
return
if(inchar == 'p'):
plot_pause = True
if(inchar == 'r'):
plot_pause = False
'''
Handle pygame joystick events
'''
for pgevent in pygame.event.get():
handle_pygame_event(ss, pgevent)
global last_haptic_time
if((time() - last_haptic_time) >= 0.04):
last_haptic_time = time() - 0.02
rbuf.extend(np.zeros(60))
global last_power_time
if((time() - last_power_time) >= buffer_time):
last_power_time = time()
lpow, hpow = get_sigpower(np.array(rbuf))
for joy in joysticks.keys():
joysticks[joy].rumble(lpow, hpow, int(buffer_time*1000))
global last_plot_time
if(0):
if(((time() - last_plot_time) >= plot_time) and not plot_pause):
last_plot_time = time()
ln.set_data(fdata, np.abs(fft.rfft(np.array(rbuf))))
fig.canvas.draw()
image = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
image = image.reshape(fig.canvas.get_width_height()[::-1] + (4,))
image = np.flip(image[:,:,0:3], axis=2) #screen wants BGR, and we want to drop alpha
screen.update(image)
sleep(0)
# https://stackoverflow.com/questions/42169348/calculating-bandpower-of-a-signal-in-python
def get_sigpower(data):
f, Pxx = scipy.signal.periodogram(np.array(data), fs=fs, scaling='density')
nyq = fs/2
ind_lowcut = np.argmax(f > 85) - 1
ind_highcut = np.argmax(f >= 350) - 1 #Do we just want to take all high frequency content???
lowpow = np.sqrt(scipy.integrate.trapezoid(Pxx[:ind_lowcut], f[:ind_lowcut]))
highpow = np.sqrt(scipy.integrate.trapezoid(Pxx[ind_lowcut:ind_highcut], f[ind_lowcut:ind_highcut]))
return lowpow, highpow
try:
buffer_time = 0.06 #3 haptics packets
fs = 3000
nyq = fs/2
plot_pause = False
buf_cap = int(buffer_time*fs)
rbuf = RingBuffer(capacity=buf_cap, dtype=np.float32)
rbuf.extend(np.zeros(buf_cap))
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
tty.setcbreak(sys.stdin)
if(0):
fig,ax = plt.subplots()
fig.set_figwidth(15)
fig.set_figheight(6)
fig.set_dpi(100)
# https://stackoverflow.com/a/49594258/4434513
canvas = np.zeros((480,1080,3))
screen = pf.screen(canvas, 'Audio Data')
xdata = np.arange(buf_cap)/3000
fdata = np.linspace(0, 3000, 513)
maxfreq = 150
ax.set_xlim(0,maxfreq)
ax.set_ylim(0, 32767)
ax.xaxis.set_major_locator(MultipleLocator(maxfreq/10))
ax.yaxis.set_major_locator(MultipleLocator(10000))
ax.xaxis.set_minor_locator(AutoMinorLocator(5))
ax.yaxis.set_minor_locator(AutoMinorLocator(5))
ax.grid(which='major', color='#CCCCCC', linestyle='--')
ax.grid(which='minor', color='#CCCCCC', linestyle=':')
ln = ax.plot(fdata, np.abs(fft.rfft(np.array(rbuf))), color='blue')[0]
last_plot_time = time()
last_power_time = time()
last_haptic_time = time()
pygame.init()
joysticks = {}
main()
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
pygame.quit()