-
Notifications
You must be signed in to change notification settings - Fork 0
/
audio_dsp_sample.py
129 lines (103 loc) · 4.28 KB
/
audio_dsp_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
# !/usr/bin/env python
# This file is part of AudioLazy, the signal processing Python package.
# Copyright (C) 2012-2016 Danilo de Jesus da Silva Bellini
#
# AudioLazy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Pitch follower via DFT peak with Tkinter GUI
"""
# ------------------------
# AudioLazy pitch follower
# ------------------------
import sys
from audiolazy import (tostream, AudioIO, freq2str, freq2midi, sHz, chunks,
lowpass, envelope, pi, thub, Stream, maverage)
from numpy.fft import rfft
def limiter(sig, threshold=.1, size=256, env=envelope.rms, cutoff=pi / 2048):
sig = thub(sig, 2)
return sig * Stream(1. if el <= threshold else threshold / el
for el in maverage(size)(env(sig, cutoff=cutoff)))
@tostream
def dft_pitch(sig, size=2048, hop=None):
for blk in Stream(sig).blocks(size=size, hop=hop):
dft_data = rfft(blk) # 计算一维离散傅里叶变换以获得实际输入。
idx, vmax = max(enumerate(dft_data),
key=lambda el: abs(el[1]) / (2 * el[0] / size + 1)
)
yield 2 * pi * idx / size
def pitch_from_mic(upd_time_in_ms):
rate = 44100
s, Hz = sHz(rate)
api = sys.argv[1] if sys.argv[1:] else None # Choose API via command-line
chunks.size = 1 if api == "jack" else 16
with AudioIO(api=api) as recorder:
snd = recorder.record(rate=rate)
sndlow = lowpass(500 * Hz)(limiter(snd, cutoff=20 * Hz))
hop = int(upd_time_in_ms * 1e-3 * s)
for pitch in freq2str(dft_pitch(sndlow, size=2 * hop, hop=hop) / Hz):
yield pitch
def midi_pitch_from_mic(upd_time_in_ms):
rate = 44100
s, Hz = sHz(rate)
api = sys.argv[1] if sys.argv[1:] else None # Choose API via command-line
chunks.size = 1 if api == "jack" else 16
with AudioIO(api=api) as recorder:
snd = recorder.record(rate=rate)
sndlow = lowpass(500 * Hz)(limiter(snd, cutoff=20 * Hz))
hop = int(upd_time_in_ms * 1e-3 * s)
for pitch in freq2midi(dft_pitch(sndlow, size=2 * hop, hop=hop) / Hz):
yield pitch
# ----------------
# GUI with tkinter
# ----------------
if __name__ == "__main__":
try:
import tkinter
except ImportError:
import Tkinter as tkinter
import threading
import re
# Window (Tk init), text label and button
tk = tkinter.Tk()
tk.title(__doc__.strip().splitlines()[0])
lbldata = tkinter.StringVar(tk)
lbltext = tkinter.Label(tk, textvariable=lbldata, font=("Purisa", 72),
width=10)
lbltext.pack(expand=True, fill=tkinter.BOTH)
btnclose = tkinter.Button(tk, text="Close", command=tk.destroy,
default="active")
btnclose.pack(fill=tkinter.X)
# Needed data
regex_note = re.compile(r"^([A-Gb#]*-?[0-9]*)([?+-]?)(.*?%?)$")
upd_time_in_ms = 200
# Update functions for each thread
def upd_value(): # Recording thread
# pitches = iter(pitch_from_mic(upd_time_in_ms))
pitches = iter(pitch_from_mic(upd_time_in_ms))
while not tk.should_finish:
tk.value = next(pitches)
def upd_timer(): # GUI mainloop thread
lbldata.set("\n".join(regex_note.findall(str(tk.value))[0]))
tk.after(upd_time_in_ms, upd_timer)
# Multi-thread management initialization
tk.should_finish = False
tk.value = freq2str(0) # Starting value
lbldata.set(tk.value)
tk.upd_thread = threading.Thread(target=upd_value)
# Go
tk.upd_thread.start()
tk.after_idle(upd_timer)
tk.mainloop()
tk.should_finish = True
tk.upd_thread.join()