-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathingestAudio.py
73 lines (47 loc) · 1.84 KB
/
ingestAudio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import numpy as np
import pyaudio
import time
from multiprocessing import Queue, Process
from pydub import AudioSegment
from io import BytesIO
sample_rate=48000
chunk_size=960
p = pyaudio.PyAudio()
def find_audio_input_device_index(p, target_device_name="ROCCAT Khan AIMO"):
num_devices = p.get_device_count()
for i in range(num_devices):
device_info = p.get_device_info_by_index(i)
#print(device_info)
device_name = device_info.get("name")
if device_name and target_device_name in device_name and device_info["maxInputChannels"] > 0:
return i
return None
def from_audio_stream(q_output,sample_rate=sample_rate, channels=1):
audio_device_index = find_audio_input_device_index(p)
#audio_device_index=5
try:
audio_format = pyaudio.paInt16
device_info = p.get_device_info_by_index(audio_device_index)
if sample_rate is None:
sample_rate = int(device_info["defaultSampleRate"])
print("Start recording!")
stream = p.open(format=audio_format,
channels=channels,
rate=sample_rate,
input=True,
input_device_index=audio_device_index,
frames_per_buffer=chunk_size)
while True:
data = stream.read(chunk_size, exception_on_overflow=False)
#print(" data: ",len(data))
audio_data = np.frombuffer(data, dtype=np.int16)
#print(" audio_data: ",len(audio_data))
q_output.put(audio_data)
#q_output.put(data)
stream.stop_stream()
stream.close()
p.terminate()
print("Finish recording!")
return audio_segment
except Exception as e:
print(f"Error in from_audio_stream: {e}")