Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip(signal_proc): introduce basic agents for common signal processing #230

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions agentMET4FOF/signal_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy import fftpack

from agentMET4FOF.agents import AgentMET4FOF

matplotlib.use("Agg") # for sending plots as images

class FFT_Agent(AgentMET4FOF):
"""
Buffers the received signal and apply fft on the collected signal upon reaching the `buffer_size` limit.
The buffer then empties and the fft spectrum data and its plot are sent out on `default` and `plot` channel respectively.

Parameters
----------
s_freq : int
Actual sampling frequency for plotting the x-axis
"""

def init_parameters(self, s_freq=1):
self.s_freq = s_freq

def on_received_message(self, message):
single_measurement = message["data"]
agent_from = message["from"]
self.buffer.store(data=single_measurement, agent_from=agent_from)

if self.buffer.buffer_filled(agent_from):
# extract collected signal from buffer
buffered_sig = self.buffer[agent_from]

# apply fft and convert to amplitudes
sig_fft = fftpack.fft(buffered_sig)
power_amp = np.abs(sig_fft) ** 2

# get the corresponding frequencies
sample_freq = fftpack.fftfreq(len(buffered_sig), d=1/self.s_freq)
self.send_output(power_amp)
self.send_plot(self.plot_fft(power_amp, sample_freq))
self.buffer_clear(agent_from)

def plot_fft(self, amplitudes, sample_freq):
N = len(amplitudes)
fig = plt.figure()
plt.plot(sample_freq[1:N//2], amplitudes[1:N//2])
plt.xlabel("Frequency (Hz)")
plt.ylabel("Power Amplitude")
return fig
28 changes: 28 additions & 0 deletions agentMET4FOF_tutorials/signal_processing/fft_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from agentMET4FOF.agents import AgentNetwork, SineGeneratorAgent, MonitorAgent
from agentMET4FOF.signal_processing import FFT_Agent


def main():
# start agent network server
agentNetwork = AgentNetwork(backend="mesa")
# init agents
gen_agent = agentNetwork.add_agent(agentType=SineGeneratorAgent)
fft_agent = agentNetwork.add_agent(agentType=FFT_Agent, buffer_size=50, s_freq=100)
monitor_agent = agentNetwork.add_agent(agentType=MonitorAgent)

# connect agents : We can connect multiple agents to any particular agent
agentNetwork.bind_agents(gen_agent, fft_agent)

# connect
agentNetwork.bind_agents(gen_agent, monitor_agent)
agentNetwork.bind_agents(fft_agent, monitor_agent, channel=["plot"])

# set all agents states to "Running"
agentNetwork.set_running_state()

# allow for shutting down the network after execution
return agentNetwork


if __name__ == "__main__":
main()