Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audio recording FrameProcessor #451

Merged
merged 12 commits into from
Oct 11, 2024
Merged

Conversation

adriancowham
Copy link
Contributor

Adds a FrameProcessor that will record a conversation between a bot and a human. The new AudioBufferProcessor is meant to be used a base class, it simply stores the conversation in a buffer. The intention is that developers can extend this class and do something interesting.

This PR also includes another FrameProcessor, CanonicalMetrics. This FrameProcessor is a concrete implementation of the AudioBufferProcessor and when the end of the call is reached, CanonicalMetrics will upload the audio to the Canonical AI platform for evaluation and metric collection.

This PR adds 2 examples in the examples directory demonstrating how to use both of the new FrameProcessors, they are in the examples/chatbot-audio-recording and canonical-metrics directories.

… buffer and another frame processor to upload audio to Canonical for evaluation and metrics collection. Examples included
@kwindla
Copy link
Contributor

kwindla commented Sep 10, 2024

@aconchillo, I talked to @adriancowham about this via email.

  1. There have been a number of questions in Discord about how to build a pipeline element that can record a conversation. I think it's really useful to have a standard recording capability in Pipecat that doesn't depend on a transport implementation.
  2. Canonical's new evals/observability platform for voice AI is really cool! Adrian's colleague Tom did a demo at the event last night in SF.

@aconchillo
Copy link
Contributor

@aconchillo, I talked to @adriancowham about this via email.

  1. There have been a number of questions in Discord about how to build a pipeline element that can record a conversation. I think it's really useful to have a standard recording capability in Pipecat that doesn't depend on a transport implementation.
  2. Canonical's new evals/observability platform for voice AI is really cool! Adrian's colleague Tom did a demo at the event last night in SF.

This is fantastic. I'll review as soon as I can!

self.num_channels = None
self.sample_rate = None
self.assistant_audio = False
self.user_audio = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use private fields (single underscore) so it's clear we don't expose this. 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we expose them, we could use properties instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

This class extends FrameProcessor, used to mark the user's audio in the pipeline.
This FrameProcessor must be inserted after transport.input() so that the only
AudioRaw it receives are from the user.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory this is not needed. All the audio coming from transport.input() is from the user. I believe what you have done in AudioBufferProcessor by checking UserStartedSpeaking and UserStoppedSpeaking should be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aconchillo I can get this to work by only using UserStartedSpeaking and UserStoppedSpeaking but there is an longer than delay between when the assistant finishes and the human starts. This delay does not reflect what happened reality. I'm pretty sure it's because audio from the user's mic is coming in and since there's no way to differentiate it, it gets added to the buffer while the assistant is talking. That is, the amount of extra delay added is the amount of time the assistant is speaking because all that silence from the user's mic is coming in and getting added to the buffer. There's a small mention of this in the discord, here: https://discord.com/channels/1239284677165056021/1278753324274814977/1279769874482204814

I'm happy to try an alternative implementation, but this is the best one I could come up with. The inflated delay is pretty bad, I don't think it would lead to usable feature.

Let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So weird.... I'll take a look again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, duh! Yes, of course! I'm working on a PR that will make this easier. We will no need for a UserMarkerProcessor, the incoming audio frames will have different types. Please, bear with me!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for others who find this.... The reason this happens is because of this DailyParams:

                vad_audio_passthrough=True,
                vad_analyzer=SileroVADAnalyzer(),
                transcription_enabled=True,

The STT will happen on the Daily server so really no audio would be required in the pipeline. However, since we want to save the audio we have vad_audio_passthrough which continuously pushes audio frames.

raise e
self.audio_buffer = bytearray()

def get_output_filename(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add underscore to all private functions. 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

super().__init__()
if not os.environ.get("CANONICAL_API_KEY"):
raise ValueError(
"CANONICAL_API_KEY is not set, a Canonical API key is required to use this class")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pass api_key: str to the constructor instead? This is the approach we take in many other services. Form outside you can do os.environ.get("CANONICAL_API_KEY") or get it from somewhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def request_headers(self):
return {
"Content-Type": "application/json",
"X-Canonical-Api-Key": os.environ.get("CANONICAL_API_KEY")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow my other comment here we would do self._api_key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return f"{self.output_dir}/{timestamp}-{uuid.uuid4().hex}.wav"

def canonical_api_url(self):
return os.environ.get("CANONICAL_API_URL", "https://voiceapp.canonical.chat/api/v1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same to api_key, we could probably add this to the constructor defaulting to "https://voiceapp.canonical.chat/api/v1"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

'assistantSpeaksFirst': self.assistant_speaks_first
}
print(f"Requesting presigned URLs for {numparts} parts")
async with aiohttp.ClientSession() as session:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In many other services we pass aiohttp_session as an argument so we can provide an external session and we don't have to create it every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

json=params
) as response:
if not response.ok:
raise Exception(f"Failed to get presigned URLs: {await response.text()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could use logger.error and return so the bot doesn't stop working.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm... I actually think we capture the exception upstream, so maybe this is OK. I'll double-check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self.end_of_call:
return
Copy link
Contributor

@aconchillo aconchillo Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not get anything after an EndFrame or CancelFrame so we probably don't need end_of_call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I thought I saw a case where I was getting both of them.

# mic is always coming in. if we include all the user's audio there will be a long latency before
# the user starts speaking because all of the user's silence during the assistant's speech will have been
# added to the buffer.
if isinstance(frame, UserAudioFrame) and self.user_audio:
Copy link
Contributor

@aconchillo aconchillo Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this should be enough:

if isinstance(frame, AudioRawFrame) and self.user_audio:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aconchillo Gonna leave this one for now until we figure the latency issue described above.

if (
isinstance(frame, AudioRawFrame)
and not isinstance(frame, UserAudioFrame)
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

if isinstance(frame, AudioRawFrame) and self.assistant_audio:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aconchillo Gonna leave this one for now until we figure the latency issue described above.

@@ -0,0 +1,75 @@
from pipecat.frames.frames import (AudioRawFrame, BotStartedSpeakingFrame,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great if we could move this to src/pipecat/processors/audio.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""


class CanonicalMetrics(AudioBufferProcessor):
Copy link
Contributor

@aconchillo aconchillo Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel maybe this could be probably an AIService instead and we should then pass the AudioBufferProcessor as an argument.

So:

class CanonicalMetricsService(AIService):

    def __init__(self, audio_bufer_processor: AudioBufferProcessor, ...):
        ...

So CanonicalMetricsServices would live in src/pipecat/services with this approach. Another benefit is that you could then use any other AudioBufferProcessor implementation.

The only thing is that you then need to put both processors in the pipeline. But it's probably fine. So,

Pipeline([transport.input(), audio_buffer_processor, canonical, ....])

Actually, that would also allow other processors in the pipeline to use the audio_buffer_processor.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. I'll make the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thank you. Looking great!

@adriancowham
Copy link
Contributor Author

@aconchillo Thanks for the thoughtful feedback. I'll start working on this.

@adriancowham
Copy link
Contributor Author

@aconchillo I pushed the changes from your feedback. I left you a comment, let me know what you think.

self._sample_rate = None
self._assistant_audio = False
self._user_audio = False
print(f"ctor::AudioBufferProcessor object memory address: {id(self)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use loguru logger if you need to log anything. 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My fault, I left a debug print statement in there. Fixed.

@aconchillo
Copy link
Contributor

@aconchillo I pushed the changes from your feedback. I left you a comment, let me know what you think.

This looks great! I'll take a final look later today/tomorrow max.

@adriancowham
Copy link
Contributor Author

No rush.

@aconchillo
Copy link
Contributor

aconchillo commented Sep 20, 2024

This is the PR that I'll probably merge: #480

You will not need the additional UserMarkerProcessor and you will need to check for InputAudioRawFrame instead.

@adriancowham
Copy link
Contributor Author

Ok, sounds good. That's a massive PR, nice work on that! I'll keep my eye out for the merge and then update my PR.

@aconchillo
Copy link
Contributor

aconchillo commented Sep 20, 2024

Ok, sounds good. That's a massive PR, nice work on that! I'll keep my eye out for the merge and then update my PR.

This is now merged. I'll take a final look once you update. So excited to get this one in as well. Thank you for your patience!

@aconchillo
Copy link
Contributor

Another massive PR went in to use Ruff as our formatter. Please, rebase and make sure the pipeline is green. I'll merge after that. Thank you! 🙏

@cyrilS-dev
Copy link
Contributor

hey @aconchillo @adriancowham i did some test on my side, using InputAudioRawFrame & OutputAudioRawFrame
About the input :

async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        
        if isinstance(frame, UserStartedSpeakingFrame):
            self._human_speaking = True
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._human_speaking= False
        elif isinstance(frame, InputAudioRawFrame) and self._human_speaking :
            self.audio_buffer.extend((frame.audio))
        
        await self.push_frame(frame, direction)

is a wrong approach, because the VAD latency causes the beginning of the input audio to be missing for a few milliseconds.

It means the input audio has to be recorded continuously, while the output audio is only recorded when the bot is speaking.

This creates a problem when combining the input and output audio, making the process more complicated.

@aconchillo Do you have any insights? thanks

@aconchillo
Copy link
Contributor

hey @aconchillo @adriancowham i did some test on my side, using InputAudioRawFrame & OutputAudioRawFrame About the input :

async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        
        if isinstance(frame, UserStartedSpeakingFrame):
            self._human_speaking = True
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._human_speaking= False
        elif isinstance(frame, InputAudioRawFrame) and self._human_speaking :
            self.audio_buffer.extend((frame.audio))
        
        await self.push_frame(frame, direction)

is a wrong approach, because the VAD latency causes the beginning of the input audio to be missing for a few milliseconds.

It means the input audio has to be recorded continuously, while the output audio is only recorded when the bot is speaking.

This creates a problem when combining the input and output audio, making the process more complicated.

@aconchillo Do you have any insights? thanks

You are right, it would be a few milliseconds off. One solution is to keep, for example, a 1 second buffer. When you get the VAD event you just grab VADParams.start_secs from that buffer (maybe a bit more) and then start appending as we do now.

@cyrilS-dev
Copy link
Contributor

cyrilS-dev commented Sep 30, 2024

hey @aconchillo @adriancowham i did some test on my side, using InputAudioRawFrame & OutputAudioRawFrame About the input :

async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        
        if isinstance(frame, UserStartedSpeakingFrame):
            self._human_speaking = True
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._human_speaking= False
        elif isinstance(frame, InputAudioRawFrame) and self._human_speaking :
            self.audio_buffer.extend((frame.audio))
        
        await self.push_frame(frame, direction)

is a wrong approach, because the VAD latency causes the beginning of the input audio to be missing for a few milliseconds.
It means the input audio has to be recorded continuously, while the output audio is only recorded when the bot is speaking.
This creates a problem when combining the input and output audio, making the process more complicated.
@aconchillo Do you have any insights? thanks

You are right, it would be a few milliseconds off. One solution is to keep, for example, a 1 second buffer. When you get the VAD event you just grab VADParams.start_secs from that buffer (maybe a bit more) and then start appending as we do now.

Thanks for the pointer, @aconchillo ! I'm not sure I'll use the pipeline to record the audio since InputAudioRawFrames flow continuously through the pipeline, while OutputAudioRawFrames are only present when the TTS is speaking. This means that silences would need to be inserted into the output audio to maintain sync with the input.
Even with the idea of recording the input audio based on VAD events, i would still need to reconstruct the silences by using timestamps on the frames to maintain proper alignment and sync.
Using LiveKit Egress seems like a more precise and simpler approach.

@aconchillo
Copy link
Contributor

hey @aconchillo @adriancowham i did some test on my side, using InputAudioRawFrame & OutputAudioRawFrame About the input :

async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        
        if isinstance(frame, UserStartedSpeakingFrame):
            self._human_speaking = True
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._human_speaking= False
        elif isinstance(frame, InputAudioRawFrame) and self._human_speaking :
            self.audio_buffer.extend((frame.audio))
        
        await self.push_frame(frame, direction)

is a wrong approach, because the VAD latency causes the beginning of the input audio to be missing for a few milliseconds.
It means the input audio has to be recorded continuously, while the output audio is only recorded when the bot is speaking.
This creates a problem when combining the input and output audio, making the process more complicated.
@aconchillo Do you have any insights? thanks

You are right, it would be a few milliseconds off. One solution is to keep, for example, a 1 second buffer. When you get the VAD event you just grab VADParams.start_secs from that buffer (maybe a bit more) and then start appending as we do now.

Thanks for the pointer, @aconchillo ! I'm not sure I'll use the pipeline to record the audio since InputAudioRawFrames flow continuously through the pipeline, while OutputAudioRawFrames are only present when the TTS is speaking. This means that silences would need to be inserted into the output audio to maintain sync with the input. Even with the idea of recording the input audio based on VAD events, i would still need to reconstruct the silences by using timestamps on the frames to maintain proper alignment and sync. Using LiveKit Egress seems like a more precise and simpler approach.

That's sounds reasonable. We have recording support at Daily as well and if you use a service transport it makes sense to use the provided recording.

@adriancowham
Copy link
Contributor Author

@aconchillo @cyrilS-dev I just got back from out of town and I'm all caught up. I plan to jump on this tomorrow. I'll play around with VAD stuff while I'm in there. Thanks for the insights!

… also moving to stereo instead of mono with the human and bot on their own channel.
@adriancowham
Copy link
Contributor Author

adriancowham commented Oct 3, 2024

@aconchillo I nuked UserMarkerProcessor and the code now uses InputAudioRawFrame. I also changed the audio post-processing code to save to stereo. Things work well, however, when the bot is interrupted the recording has the bot's full response but the live playback doesn't. That is, when talking to the bot and I interrupt it, its speech is cutoff (which is good), but the recording has the bot's full response. I'm not sure how to address this, any guidance would be appreciated.

Thanks!

@adriancowham
Copy link
Contributor Author

@aconchillo Wanted to get this back on your radar, no rush, but curious to hear your thoughts on how best to handle interruptions. Thanks!

@aconchillo
Copy link
Contributor

@aconchillo Wanted to get this back on your radar, no rush, but curious to hear your thoughts on how best to handle interruptions. Thanks!

Sorry for the delay. I've been heads down on something else.

I believe all you have to do is put the audio processor after transport.output().

So, this:

            transport.output(),
            audiobuffer,  # used to buffer the audio in the pipeline

instead of

            audiobuffer,  # used to buffer the audio in the pipeline
            transport.output(),

@adriancowham
Copy link
Contributor Author

@aconchillo Ah, makes perfect sense, I'll give it a go. Thanks!

@adriancowham
Copy link
Contributor Author

@aconchillo It's a lot better now, thanks for the tip. Hopefully this PR is ready, let me know if there's anything else I can do?

Thank you!


class AudioBufferProcessor(FrameProcessor):

def __init__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass **kwargs

The num_channels and sample_rate are set to None initially and will be
populated when the first audio frame is processed.
"""
super().__init__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass **kwargs

- user_audio: A boolean flag to indicate if user audio is being processed.

The num_channels and sample_rate are set to None initially and will be
populated when the first audio frame is processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the documentation accurate with latest changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, no. Good catch.

with BytesIO() as buffer:
with wave.open(buffer, 'wb') as wf:
wf.setnchannels(2)
wf.setsampwidth(self._sample_rate // 8000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should always be 2 because every sample is a signed 16-bit integer. If we change sample rate to 44100 this would break.


async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if (isinstance(frame, AudioRawFrame) and self._sample_rate is None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove outter parenthesis

self._assistant_audio_buffer.extend(silence)

# if the assistant is speaking, include all audio from the assistant,
if (isinstance(frame, OutputAudioRawFrame)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove outter parenthesis


def __init__(
self,
aiohttp_session: aiohttp.ClientSession,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force keyword-only arguments with

self, *, ...

api_key: str,
api_url: str = "https://voiceapp.canonical.chat/api/v1",
assistant_speaks_first: bool = True,
output_dir: str = "recordings"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass **kwargs

api_url: str = "https://voiceapp.canonical.chat/api/v1",
assistant_speaks_first: bool = True,
output_dir: str = "recordings"):
super().__init__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass **kwargs

@aconchillo
Copy link
Contributor

@aconchillo It's a lot better now, thanks for the tip. Hopefully this PR is ready, let me know if there's anything else I can do?

Thank you!

Added some last minor comments. All looks great. I'll merge after those.

@adriancowham
Copy link
Contributor Author

No problem at all, I appreciate all the feedback and the time you've put into this PR.

@aconchillo
Copy link
Contributor

LGTM! Thank you so much for all the work!!!

@aconchillo aconchillo merged commit dc73b20 into pipecat-ai:main Oct 11, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants