From 6d11911d835be7fe2c144d9fa38f845f644660f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 12 Dec 2024 17:03:40 -0800 Subject: [PATCH] Revert "no longer necessary to call super().process_frame(frame, direction)" --- CHANGELOG.md | 6 --- .../foundational/05-sync-speech-and-image.py | 2 + .../05a-local-sync-speech-and-image.py | 6 +++ examples/foundational/06a-image-sync.py | 2 + .../07s-interruptible-google-audio-in.py | 5 ++ examples/foundational/09-mirror.py | 2 + examples/foundational/09a-local-mirror.py | 2 + examples/foundational/11-sound-effects.py | 4 ++ examples/foundational/12-describe-video.py | 2 + .../12a-describe-video-gemini-flash.py | 2 + .../foundational/12b-describe-video-gpt-4o.py | 2 + .../12c-describe-video-anthropic.py | 2 + .../foundational/13-whisper-transcription.py | 2 + examples/foundational/13a-whisper-local.py | 2 + .../13b-deepgram-transcription.py | 2 + .../foundational/13c-gladia-transcription.py | 2 + .../13d-assemblyai-transcription.py | 2 + .../22b-natural-conversation-proposal.py | 4 ++ .../22c-natural-conversation-mixed-llms.py | 41 +++++++------- .../22d-natural-conversation-gemini-audio.py | 6 +++ examples/foundational/25-google-audio-in.py | 8 +++ examples/moondream-chatbot/bot.py | 8 +++ examples/simple-chatbot/server/bot-gemini.py | 2 + examples/simple-chatbot/server/bot-openai.py | 2 + .../storytelling-chatbot/src/processors.py | 4 ++ examples/translation-chatbot/bot.py | 4 ++ src/pipecat/pipeline/parallel_pipeline.py | 6 +++ src/pipecat/pipeline/pipeline.py | 6 +++ .../pipeline/sync_parallel_pipeline.py | 6 +++ src/pipecat/pipeline/task.py | 4 ++ src/pipecat/processors/aggregators/gated.py | 2 + .../aggregators/gated_openai_llm_context.py | 2 + .../processors/aggregators/llm_response.py | 4 ++ .../processors/aggregators/sentence.py | 2 + .../processors/aggregators/user_response.py | 2 + .../aggregators/vision_image_frame.py | 2 + src/pipecat/processors/async_generator.py | 2 + .../audio/audio_buffer_processor.py | 2 + src/pipecat/processors/audio/vad/silero.py | 2 + .../processors/filters/frame_filter.py | 2 + .../processors/filters/function_filter.py | 2 + .../processors/filters/identity_filter.py | 1 + .../processors/filters/wake_check_filter.py | 2 + .../filters/wake_notifier_filter.py | 2 + src/pipecat/processors/frame_processor.py | 54 ++++++++----------- .../processors/frameworks/langchain.py | 2 + src/pipecat/processors/frameworks/rtvi.py | 16 ++++++ .../processors/gstreamer/pipeline_source.py | 2 + .../processors/idle_frame_processor.py | 2 + src/pipecat/processors/text_transformer.py | 2 + src/pipecat/processors/user_idle_processor.py | 2 + src/pipecat/services/ai_services.py | 2 + src/pipecat/services/simli.py | 1 + src/pipecat/transports/base_input.py | 2 + src/pipecat/transports/base_output.py | 2 + src/pipecat/utils/test_frame_processor.py | 2 + tests/test_langchain.py | 2 + 57 files changed, 212 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4a005280..5ca41b8a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,12 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, Galician, Hebrew, Mandarin, Serbian, Tagalog, Urdu, Xhosa). -### Changed - -- It's no longer necessary to call `super().process_frame(frame, direction)` if - you subclass and implement `FrameProcessor.process_frame()`. This is all now - done internally and will avoid possible issues if you forget to add it. - ### Deprecated - `AWSTTSService` is now deprecated, use `PollyTTSService` instead. diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 8d5790ac7..64f85930b 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -56,6 +56,8 @@ def __init__(self): self.prepend_to_next_text_frame = False async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, MonthFrame): self.most_recent_month = frame.month elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame): diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index f6e5f0ce6..4a561c073 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -62,6 +62,8 @@ def __init__(self): self.text = "" async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): self.text = frame.text await self.push_frame(frame, direction) @@ -73,6 +75,8 @@ def __init__(self): self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TTSAudioRawFrame): self.audio.extend(frame.audio) self.frame = OutputAudioRawFrame( @@ -86,6 +90,8 @@ def __init__(self): self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, URLImageRawFrame): self.frame = frame await self.push_frame(frame, direction) diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 11c894478..eda3c61df 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -47,6 +47,8 @@ def __init__(self, speaking_path: str, waiting_path: str): self._waiting_image_bytes = self._waiting_image.tobytes() async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM: await self.push_frame( OutputImageRawFrame( diff --git a/examples/foundational/07s-interruptible-google-audio-in.py b/examples/foundational/07s-interruptible-google-audio-in.py index 1db8b0d36..1778e0c62 100644 --- a/examples/foundational/07s-interruptible-google-audio-in.py +++ b/examples/foundational/07s-interruptible-google-audio-in.py @@ -82,6 +82,8 @@ def __init__(self, context, user_context_aggregator): self._user_speaking = False async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): # We could gracefully handle both audio input and text/transcription input ... # but let's leave that as an exercise to the reader. :-) @@ -124,6 +126,7 @@ def reset(self): self._accumulating_transcript = False async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) if isinstance(frame, LLMFullResponseStartFrame): self._processing_llm_response = True self._accumulating_transcript = True @@ -177,6 +180,8 @@ def add_transcript_back_to_inference_output(self): self._context.messages[-1].parts[-1].text += f"\n\n{marker}\n{self._transcript}\n" async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, MagicDemoTranscriptionFrame): self._transcript = frame.text elif isinstance(frame, LLMFullResponseEndFrame) or isinstance( diff --git a/examples/foundational/09-mirror.py b/examples/foundational/09-mirror.py index 8eaee5750..a719d54f6 100644 --- a/examples/foundational/09-mirror.py +++ b/examples/foundational/09-mirror.py @@ -35,6 +35,8 @@ class MirrorProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, InputAudioRawFrame): await self.push_frame( OutputAudioRawFrame( diff --git a/examples/foundational/09a-local-mirror.py b/examples/foundational/09a-local-mirror.py index 4a4a1fee1..539cca600 100644 --- a/examples/foundational/09a-local-mirror.py +++ b/examples/foundational/09a-local-mirror.py @@ -39,6 +39,8 @@ class MirrorProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, InputAudioRawFrame): await self.push_frame( OutputAudioRawFrame( diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 50d3f9e33..d8692a7f1 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -60,6 +60,8 @@ class OutboundSoundEffectWrapper(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, LLMFullResponseEndFrame): await self.push_frame(sounds["ding1.wav"]) # In case anything else downstream needs it @@ -70,6 +72,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class InboundSoundEffectWrapper(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, OpenAILLMContextFrame): await self.push_frame(sounds["ding2.wav"]) # In case anything else downstream needs it diff --git a/examples/foundational/12-describe-video.py b/examples/foundational/12-describe-video.py index 3f00bafc9..b5bb577aa 100644 --- a/examples/foundational/12-describe-video.py +++ b/examples/foundational/12-describe-video.py @@ -42,6 +42,8 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/12a-describe-video-gemini-flash.py b/examples/foundational/12a-describe-video-gemini-flash.py index 52ddc6e43..bc76afc73 100644 --- a/examples/foundational/12a-describe-video-gemini-flash.py +++ b/examples/foundational/12a-describe-video-gemini-flash.py @@ -42,6 +42,8 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/12b-describe-video-gpt-4o.py b/examples/foundational/12b-describe-video-gpt-4o.py index 1840d7117..d8474b568 100644 --- a/examples/foundational/12b-describe-video-gpt-4o.py +++ b/examples/foundational/12b-describe-video-gpt-4o.py @@ -42,6 +42,8 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/12c-describe-video-anthropic.py b/examples/foundational/12c-describe-video-anthropic.py index f3690b277..bc6f5a4ea 100644 --- a/examples/foundational/12c-describe-video-anthropic.py +++ b/examples/foundational/12c-describe-video-anthropic.py @@ -42,6 +42,8 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/13-whisper-transcription.py b/examples/foundational/13-whisper-transcription.py index 7a1657df7..c895cb944 100644 --- a/examples/foundational/13-whisper-transcription.py +++ b/examples/foundational/13-whisper-transcription.py @@ -30,6 +30,8 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13a-whisper-local.py b/examples/foundational/13a-whisper-local.py index 2d0b0f9d7..c1ba37ca9 100644 --- a/examples/foundational/13a-whisper-local.py +++ b/examples/foundational/13a-whisper-local.py @@ -28,6 +28,8 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13b-deepgram-transcription.py b/examples/foundational/13b-deepgram-transcription.py index c915f9b42..7b3a25316 100644 --- a/examples/foundational/13b-deepgram-transcription.py +++ b/examples/foundational/13b-deepgram-transcription.py @@ -31,6 +31,8 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13c-gladia-transcription.py b/examples/foundational/13c-gladia-transcription.py index 13ef5556d..acc21b6c2 100644 --- a/examples/foundational/13c-gladia-transcription.py +++ b/examples/foundational/13c-gladia-transcription.py @@ -29,6 +29,8 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13d-assemblyai-transcription.py b/examples/foundational/13d-assemblyai-transcription.py index ea112b184..d10a80274 100644 --- a/examples/foundational/13d-assemblyai-transcription.py +++ b/examples/foundational/13d-assemblyai-transcription.py @@ -29,6 +29,8 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/22b-natural-conversation-proposal.py b/examples/foundational/22b-natural-conversation-proposal.py index e00714a75..2deeb3da4 100644 --- a/examples/foundational/22b-natural-conversation-proposal.py +++ b/examples/foundational/22b-natural-conversation-proposal.py @@ -64,6 +64,7 @@ def __init__(self, notifier: BaseNotifier, **kwargs): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) # We must not block system frames. if isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -117,6 +118,7 @@ def __init__(self, notifier: BaseNotifier): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) if isinstance(frame, TextFrame) and frame.text == "YES": logger.debug("Completeness check YES") await self.push_frame(UserStoppedSpeakingFrame()) @@ -139,6 +141,8 @@ def open_gate(self): self._gate_open = True async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # We must not block system frames. if isinstance(frame, SystemFrame): if isinstance(frame, StartFrame): diff --git a/examples/foundational/22c-natural-conversation-mixed-llms.py b/examples/foundational/22c-natural-conversation-mixed-llms.py index 3f12e5f34..97bc57ec1 100644 --- a/examples/foundational/22c-natural-conversation-mixed-llms.py +++ b/examples/foundational/22c-natural-conversation-mixed-llms.py @@ -101,12 +101,12 @@ Examples: # Complete Wh-question -[{"role": "assistant", "content": "I can help you learn."}, +[{"role": "assistant", "content": "I can help you learn."}, {"role": "user", "content": "What's the fastest way to learn Spanish"}] Output: YES # Complete Yes/No question despite STT error -[{"role": "assistant", "content": "I know about planets."}, +[{"role": "assistant", "content": "I know about planets."}, {"role": "user", "content": "Is is Jupiter the biggest planet"}] Output: YES @@ -118,12 +118,12 @@ Examples: # Direct instruction -[{"role": "assistant", "content": "I can explain many topics."}, +[{"role": "assistant", "content": "I can explain many topics."}, {"role": "user", "content": "Tell me about black holes"}] Output: YES # Action demand -[{"role": "assistant", "content": "I can help with math."}, +[{"role": "assistant", "content": "I can help with math."}, {"role": "user", "content": "Solve this equation x plus 5 equals 12"}] Output: YES @@ -134,12 +134,12 @@ Examples: # Specific answer -[{"role": "assistant", "content": "What's your favorite color?"}, +[{"role": "assistant", "content": "What's your favorite color?"}, {"role": "user", "content": "I really like blue"}] Output: YES # Option selection -[{"role": "assistant", "content": "Would you prefer morning or evening?"}, +[{"role": "assistant", "content": "Would you prefer morning or evening?"}, {"role": "user", "content": "Morning"}] Output: YES @@ -153,17 +153,17 @@ Examples: # Self-correction reaching completion -[{"role": "assistant", "content": "What would you like to know?"}, +[{"role": "assistant", "content": "What would you like to know?"}, {"role": "user", "content": "Tell me about... no wait, explain how rainbows form"}] Output: YES # Topic change with complete thought -[{"role": "assistant", "content": "The weather is nice today."}, +[{"role": "assistant", "content": "The weather is nice today."}, {"role": "user", "content": "Actually can you tell me who invented the telephone"}] Output: YES # Mid-sentence completion -[{"role": "assistant", "content": "Hello I'm ready."}, +[{"role": "assistant", "content": "Hello I'm ready."}, {"role": "user", "content": "What's the capital of? France"}] Output: YES @@ -175,12 +175,12 @@ Examples: # Acknowledgment -[{"role": "assistant", "content": "Should we talk about history?"}, +[{"role": "assistant", "content": "Should we talk about history?"}, {"role": "user", "content": "Sure"}] Output: YES # Disagreement with completion -[{"role": "assistant", "content": "Is that what you meant?"}, +[{"role": "assistant", "content": "Is that what you meant?"}, {"role": "user", "content": "No not really"}] Output: YES @@ -194,12 +194,12 @@ Examples: # Word repetition but complete -[{"role": "assistant", "content": "I can help with that."}, +[{"role": "assistant", "content": "I can help with that."}, {"role": "user", "content": "What what is the time right now"}] Output: YES # Missing punctuation but complete -[{"role": "assistant", "content": "I can explain that."}, +[{"role": "assistant", "content": "I can explain that."}, {"role": "user", "content": "Please tell me how computers work"}] Output: YES @@ -211,12 +211,12 @@ Examples: # Filler words but complete -[{"role": "assistant", "content": "What would you like to know?"}, +[{"role": "assistant", "content": "What would you like to know?"}, {"role": "user", "content": "Um uh how do airplanes fly"}] Output: YES # Thinking pause but incomplete -[{"role": "assistant", "content": "I can explain anything."}, +[{"role": "assistant", "content": "I can explain anything."}, {"role": "user", "content": "Well um I want to know about the"}] Output: NO @@ -241,17 +241,17 @@ Examples: # Incomplete despite corrections -[{"role": "assistant", "content": "What would you like to know about?"}, +[{"role": "assistant", "content": "What would you like to know about?"}, {"role": "user", "content": "Can you tell me about"}] Output: NO # Complete despite multiple artifacts -[{"role": "assistant", "content": "I can help you learn."}, +[{"role": "assistant", "content": "I can help you learn."}, {"role": "user", "content": "How do you I mean what's the best way to learn programming"}] Output: YES # Trailing off incomplete -[{"role": "assistant", "content": "I can explain anything."}, +[{"role": "assistant", "content": "I can explain anything."}, {"role": "user", "content": "I was wondering if you could tell me why"}] Output: NO """ @@ -268,6 +268,7 @@ def __init__(self, notifier: BaseNotifier, **kwargs): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) # We must not block system frames. if isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -319,6 +320,8 @@ def __init__(self, notifier: BaseNotifier): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame) and frame.text == "YES": logger.debug("!!! Completeness check YES") await self.push_frame(UserStoppedSpeakingFrame()) @@ -341,6 +344,8 @@ def open_gate(self): self._gate_open = True async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # We must not block system frames. if isinstance(frame, SystemFrame): if isinstance(frame, StartFrame): diff --git a/examples/foundational/22d-natural-conversation-gemini-audio.py b/examples/foundational/22d-natural-conversation-gemini-audio.py index e506372a5..1ff8aa23e 100644 --- a/examples/foundational/22d-natural-conversation-gemini-audio.py +++ b/examples/foundational/22d-natural-conversation-gemini-audio.py @@ -90,6 +90,8 @@ async def reset(self): self._user_speaking = False async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # ignore context frame if isinstance(frame, OpenAILLMContextFrame): return @@ -131,6 +133,8 @@ def __init__( self._audio_accumulator = audio_accumulator async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame) and frame.text.startswith("YES"): logger.debug("Completeness check YES") await self.push_frame(UserStoppedSpeakingFrame()) @@ -155,6 +159,8 @@ def open_gate(self): self._gate_open = True async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # We must not block system frames. if isinstance(frame, SystemFrame): if isinstance(frame, StartFrame): diff --git a/examples/foundational/25-google-audio-in.py b/examples/foundational/25-google-audio-in.py index abeb62043..843d24e1f 100644 --- a/examples/foundational/25-google-audio-in.py +++ b/examples/foundational/25-google-audio-in.py @@ -95,6 +95,8 @@ def __init__(self, context, user_context_aggregator): self._user_speaking = False async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): # We could gracefully handle both audio input and text/transcription input ... # but let's leave that as an exercise to the reader. :-) @@ -133,6 +135,8 @@ class InputTranscriptionContextFilter(FrameProcessor): """ async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, SystemFrame): # We don't want to block system frames. await self.push_frame(frame, direction) @@ -206,6 +210,8 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): self._aggregation += frame.text elif isinstance(frame, LLMFullResponseEndFrame): @@ -256,6 +262,8 @@ def swap_user_audio(self): audio_part.text = self._transcript async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, LLMDemoTranscriptionFrame): logger.info(f"Transcription from Gemini: {frame.text}") self._transcript = frame.text diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 1c412e88a..54c2013b4 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -81,6 +81,8 @@ def __init__(self): self._is_talking = False async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: await self.push_frame(talking_frame) @@ -101,6 +103,8 @@ def set_participant_id(self, participant_id: str): self.participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self.participant_id and isinstance(frame, TextFrame): if frame.text == user_request_answer: await self.push_frame( @@ -117,6 +121,8 @@ def __init__(self, text: str): self.text = text async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): if frame.text != self.text: await self.push_frame(frame) @@ -126,6 +132,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class ImageFilterProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if not isinstance(frame, ImageRawFrame): await self.push_frame(frame, direction) diff --git a/examples/simple-chatbot/server/bot-gemini.py b/examples/simple-chatbot/server/bot-gemini.py index 0ce46e50e..991df1cd1 100644 --- a/examples/simple-chatbot/server/bot-gemini.py +++ b/examples/simple-chatbot/server/bot-gemini.py @@ -95,6 +95,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): frame: The incoming frame to process direction: The direction of frame flow in the pipeline """ + await super().process_frame(frame, direction) + # Switch to talking animation when bot starts speaking if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: diff --git a/examples/simple-chatbot/server/bot-openai.py b/examples/simple-chatbot/server/bot-openai.py index 02685a99b..a3a68c839 100644 --- a/examples/simple-chatbot/server/bot-openai.py +++ b/examples/simple-chatbot/server/bot-openai.py @@ -95,6 +95,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): frame: The incoming frame to process direction: The direction of frame flow in the pipeline """ + await super().process_frame(frame, direction) + # Switch to talking animation when bot starts speaking if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: diff --git a/examples/storytelling-chatbot/src/processors.py b/examples/storytelling-chatbot/src/processors.py index dd46f9c82..6aa9ad7ab 100644 --- a/examples/storytelling-chatbot/src/processors.py +++ b/examples/storytelling-chatbot/src/processors.py @@ -54,6 +54,8 @@ def __init__(self, fal_service): self._fal_service = fal_service async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, StoryImageFrame): try: async with timeout(7): @@ -88,6 +90,8 @@ def __init__(self, messages, story): self._story = story async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, UserStoppedSpeakingFrame): # Send an app message to the UI await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) diff --git a/examples/translation-chatbot/bot.py b/examples/translation-chatbot/bot.py index 59f495360..e654c0159 100644 --- a/examples/translation-chatbot/bot.py +++ b/examples/translation-chatbot/bot.py @@ -51,6 +51,8 @@ def __init__(self, language): self._language = language async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): context = [ { @@ -76,6 +78,8 @@ def __init__(self, language): # subtitles. # async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): message = {"language": self._language, "text": frame.text} await self.push_frame(DailyTransportMessageFrame(message)) diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 7499192fb..40bfea90d 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -28,6 +28,8 @@ def __init__( self._push_frame_func = push_frame_func async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: if isinstance(frame, SystemFrame): @@ -49,6 +51,8 @@ def __init__( self._push_frame_func = push_frame_func async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) @@ -116,6 +120,8 @@ async def _start_tasks(self): self._down_task = loop.create_task(self._process_down_queue()) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, StartFrame): await self._start_tasks() diff --git a/src/pipecat/pipeline/pipeline.py b/src/pipecat/pipeline/pipeline.py index 457b70cab..703f911fe 100644 --- a/src/pipecat/pipeline/pipeline.py +++ b/src/pipecat/pipeline/pipeline.py @@ -17,6 +17,8 @@ def __init__(self, upstream_push_frame: Callable[[Frame, FrameDirection], Corout self._upstream_push_frame = upstream_push_frame async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: await self._upstream_push_frame(frame, direction) @@ -30,6 +32,8 @@ def __init__(self, downstream_push_frame: Callable[[Frame, FrameDirection], Coro self._downstream_push_frame = downstream_push_frame async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) @@ -70,6 +74,8 @@ async def cleanup(self): await self._cleanup_processors() async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if direction == FrameDirection.DOWNSTREAM: await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM) elif direction == FrameDirection.UPSTREAM: diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 4dcf190de..20f4275e4 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -31,6 +31,8 @@ def __init__(self, upstream_queue: asyncio.Queue): self._up_queue = upstream_queue async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: await self._up_queue.put(frame) @@ -44,6 +46,8 @@ def __init__(self, downstream_queue: asyncio.Queue): self._down_queue = downstream_queue async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) @@ -99,6 +103,8 @@ def processors_with_metrics(self) -> List[FrameProcessor]: # async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # The last processor of each pipeline needs to be synchronous otherwise # this element won't work. Since, we know it should be synchronous we # push a SyncFrame. Since frames are ordered we know this frame will be diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index d8bada663..f09013a58 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -45,6 +45,8 @@ def __init__(self, up_queue: asyncio.Queue): self._up_queue = up_queue async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + match direction: case FrameDirection.UPSTREAM: await self._handle_upstream_frame(frame) @@ -73,6 +75,8 @@ def __init__(self, down_queue: asyncio.Queue): self._down_queue = down_queue async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # We really just want to know when the EndFrame reached the sink. if isinstance(frame, EndFrame): await self._down_queue.put(frame) diff --git a/src/pipecat/processors/aggregators/gated.py b/src/pipecat/processors/aggregators/gated.py index 763dc456c..c39a35c82 100644 --- a/src/pipecat/processors/aggregators/gated.py +++ b/src/pipecat/processors/aggregators/gated.py @@ -56,6 +56,8 @@ def __init__( self._accumulator: List[Tuple[Frame, FrameDirection]] = [] async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # We must not block system frames. if isinstance(frame, SystemFrame): await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/aggregators/gated_openai_llm_context.py b/src/pipecat/processors/aggregators/gated_openai_llm_context.py index 9b0d77d32..71a540dd4 100644 --- a/src/pipecat/processors/aggregators/gated_openai_llm_context.py +++ b/src/pipecat/processors/aggregators/gated_openai_llm_context.py @@ -24,6 +24,8 @@ def __init__(self, notifier: BaseNotifier, **kwargs): self._last_context_frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, StartFrame): await self.push_frame(frame) await self._start() diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 544b49dda..479746471 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -86,6 +86,8 @@ def role(self): # and T2 would be dropped. async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + send_aggregation = False if isinstance(frame, self._start_frame): @@ -238,6 +240,8 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): self._aggregation += frame.text elif isinstance(frame, LLMFullResponseEndFrame): diff --git a/src/pipecat/processors/aggregators/sentence.py b/src/pipecat/processors/aggregators/sentence.py index ab400b2a0..d0c593a83 100644 --- a/src/pipecat/processors/aggregators/sentence.py +++ b/src/pipecat/processors/aggregators/sentence.py @@ -33,6 +33,8 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # We ignore interim description at this point. if isinstance(frame, InterimTranscriptionFrame): return diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index 78287127f..903019059 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -85,6 +85,8 @@ def __init__( # and T2 would be dropped. async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + send_aggregation = False if isinstance(frame, self._start_frame): diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index 3a4eda330..d07337f06 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -31,6 +31,8 @@ def __init__(self): self._describe_text = None async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): self._describe_text = frame.text elif isinstance(frame, InputImageRawFrame): diff --git a/src/pipecat/processors/async_generator.py b/src/pipecat/processors/async_generator.py index 356ef4388..4f9bc85d0 100644 --- a/src/pipecat/processors/async_generator.py +++ b/src/pipecat/processors/async_generator.py @@ -24,6 +24,8 @@ def __init__(self, *, serializer: FrameSerializer, **kwargs): self._data_queue = asyncio.Queue() async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, (CancelFrame, EndFrame)): diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index c7bb36736..488a251f0 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -68,6 +68,8 @@ def reset_audio_buffers(self): self._bot_audio_buffer = bytearray() async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # Include all audio from the user. if isinstance(frame, InputAudioRawFrame): resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate) diff --git a/src/pipecat/processors/audio/vad/silero.py b/src/pipecat/processors/audio/vad/silero.py index 1db510f24..4aa32a163 100644 --- a/src/pipecat/processors/audio/vad/silero.py +++ b/src/pipecat/processors/audio/vad/silero.py @@ -39,6 +39,8 @@ def __init__( # async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, AudioRawFrame): await self._analyze_audio(frame) if self._audio_passthrough: diff --git a/src/pipecat/processors/filters/frame_filter.py b/src/pipecat/processors/filters/frame_filter.py index e87034a1a..11f2e601a 100644 --- a/src/pipecat/processors/filters/frame_filter.py +++ b/src/pipecat/processors/filters/frame_filter.py @@ -26,5 +26,7 @@ def _should_passthrough_frame(self, frame): return isinstance(frame, ControlFrame) or isinstance(frame, SystemFrame) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if self._should_passthrough_frame(frame): await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/filters/function_filter.py b/src/pipecat/processors/filters/function_filter.py index 522a89324..e38cea3e0 100644 --- a/src/pipecat/processors/filters/function_filter.py +++ b/src/pipecat/processors/filters/function_filter.py @@ -29,6 +29,8 @@ def _should_passthrough_frame(self, frame, direction): return isinstance(frame, SystemFrame) or direction != self._direction async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + passthrough = self._should_passthrough_frame(frame, direction) allowed = await self._filter(frame) if passthrough or allowed: diff --git a/src/pipecat/processors/filters/identity_filter.py b/src/pipecat/processors/filters/identity_filter.py index c837e02a7..d6f896b73 100644 --- a/src/pipecat/processors/filters/identity_filter.py +++ b/src/pipecat/processors/filters/identity_filter.py @@ -26,4 +26,5 @@ def __init__(self, **kwargs): async def process_frame(self, frame: Frame, direction: FrameDirection): """Process an incoming frame by passing it through unchanged.""" + await super().process_frame(frame, direction) await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/filters/wake_check_filter.py b/src/pipecat/processors/filters/wake_check_filter.py index 441f32fb9..f1a7afbef 100644 --- a/src/pipecat/processors/filters/wake_check_filter.py +++ b/src/pipecat/processors/filters/wake_check_filter.py @@ -45,6 +45,8 @@ def __init__(self, wake_phrases: list[str], keepalive_timeout: float = 3): self._wake_patterns.append(pattern) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + try: if isinstance(frame, TranscriptionFrame): p = self._participant_states.get(frame.user_id) diff --git a/src/pipecat/processors/filters/wake_notifier_filter.py b/src/pipecat/processors/filters/wake_notifier_filter.py index 7623b6da8..a7f074ccb 100644 --- a/src/pipecat/processors/filters/wake_notifier_filter.py +++ b/src/pipecat/processors/filters/wake_notifier_filter.py @@ -32,6 +32,8 @@ def __init__( self._filter = filter async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, self._types) and await self._filter(frame): await self._notifier.notify() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index e0806a0bf..52066b4f4 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -161,13 +161,6 @@ def get_parent(self) -> "FrameProcessor": def get_clock(self) -> BaseClock: return self._clock - async def pause_processing_frames(self): - self.__should_block_frames = True - - async def resume_processing_frames(self): - self.__input_event.set() - self.__should_block_frames = False - async def queue_frame( self, frame: Frame, @@ -182,13 +175,32 @@ async def queue_frame( if isinstance(frame, SystemFrame): # We don't want to queue system frames. - await self._process_frame(frame, direction) + await self.process_frame(frame, direction) else: # We queue everything else. await self.__input_queue.put((frame, direction, callback)) + async def pause_processing_frames(self): + self.__should_block_frames = True + + async def resume_processing_frames(self): + self.__input_event.set() + self.__should_block_frames = False + async def process_frame(self, frame: Frame, direction: FrameDirection): - pass + if isinstance(frame, StartFrame): + self._clock = frame.clock + self._allow_interruptions = frame.allow_interruptions + self._enable_metrics = frame.enable_metrics + self._enable_usage_metrics = frame.enable_usage_metrics + self._report_only_initial_ttfb = frame.report_only_initial_ttfb + elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() + await self.stop_all_metrics() + elif isinstance(frame, StopInterruptionFrame): + self._should_report_ttfb = True + elif isinstance(frame, CancelFrame): + self._cancelling = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) @@ -216,28 +228,6 @@ def _register_event_handler(self, event_name: str): raise Exception(f"Event handler {event_name} already registered") self._event_handlers[event_name] = [] - # - # Frame processing - # - - async def _process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, StartFrame): - self._clock = frame.clock - self._allow_interruptions = frame.allow_interruptions - self._enable_metrics = frame.enable_metrics - self._enable_usage_metrics = frame.enable_usage_metrics - self._report_only_initial_ttfb = frame.report_only_initial_ttfb - elif isinstance(frame, StartInterruptionFrame): - await self._start_interruption() - await self.stop_all_metrics() - elif isinstance(frame, StopInterruptionFrame): - self._should_report_ttfb = True - elif isinstance(frame, CancelFrame): - self._cancelling = True - - # Call subclass. - await self.process_frame(frame, direction) - # # Handle interruptions # @@ -299,7 +289,7 @@ async def __input_frame_task_handler(self): (frame, direction, callback) = await self.__input_queue.get() # Process the frame. - await self._process_frame(frame, direction) + await self.process_frame(frame, direction) # If this frame has an associated callback, call it now. if callback: diff --git a/src/pipecat/processors/frameworks/langchain.py b/src/pipecat/processors/frameworks/langchain.py index 25de11070..c0b657244 100644 --- a/src/pipecat/processors/frameworks/langchain.py +++ b/src/pipecat/processors/frameworks/langchain.py @@ -36,6 +36,8 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, LLMMessagesFrame): # Messages are accumulated on the context as a list of messages. # The last one by the human is the one we want to send to the LLM. diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index b91abb181..471bdbb88 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -380,6 +380,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame)): @@ -413,6 +415,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)): @@ -442,6 +446,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, OpenAILLMContextFrame): @@ -467,6 +473,8 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, UserStartedSpeakingFrame): @@ -488,6 +496,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, LLMFullResponseStartFrame): @@ -504,6 +514,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, TTSStartedFrame): @@ -520,6 +532,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) if isinstance(frame, MetricsFrame): @@ -628,6 +642,8 @@ async def handle_function_call_start( await self._push_transport_message(message, exclude_none=False) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 09456f12e..649a2c529 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -66,6 +66,8 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), bus.connect("message", self._on_gstreamer_message) async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 80902ee59..e674b6b84 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -35,6 +35,8 @@ def __init__( self._create_idle_task() async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) # If we are not waiting for any specific frame set the event, otherwise diff --git a/src/pipecat/processors/text_transformer.py b/src/pipecat/processors/text_transformer.py index 90ef6b8bc..79e9b885e 100644 --- a/src/pipecat/processors/text_transformer.py +++ b/src/pipecat/processors/text_transformer.py @@ -27,6 +27,8 @@ def __init__(self, transform_fn): self._transform_fn = transform_fn async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): result = self._transform_fn(frame.text) if isinstance(result, Coroutine): diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 91cbd2334..160c49908 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -43,6 +43,8 @@ async def _stop(self): await self._idle_task async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): await self._stop() diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index e324d413c..e0f16e220 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -110,6 +110,8 @@ async def _update_settings(self, settings: Dict[str, Any]): logger.warning(f"Unknown setting for {self.name} service: {key}") async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, StartFrame): await self.start(frame) elif isinstance(frame, CancelFrame): diff --git a/src/pipecat/services/simli.py b/src/pipecat/services/simli.py index e61fb394c..bfae861dc 100644 --- a/src/pipecat/services/simli.py +++ b/src/pipecat/services/simli.py @@ -92,6 +92,7 @@ async def _consume_and_process_video(self): pass async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) if isinstance(frame, StartFrame): await self.push_frame(frame, direction) await self._start_connection() diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 64583c758..025a5bed2 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -79,6 +79,8 @@ async def push_audio_frame(self, frame: InputAudioRawFrame): # async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 56c86076a..573b67717 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -120,6 +120,8 @@ async def send_image(self, frame: OutputImageRawFrame | SpriteFrame): # async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + # # System frames (like StartInterruptionFrame) are pushed # immediately. Other frames require order so they are put in the sink diff --git a/src/pipecat/utils/test_frame_processor.py b/src/pipecat/utils/test_frame_processor.py index ec37efd80..e46bae7ad 100644 --- a/src/pipecat/utils/test_frame_processor.py +++ b/src/pipecat/utils/test_frame_processor.py @@ -13,6 +13,8 @@ def __init__(self, test_frames): super().__init__() async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if not self.test_frames[ 0 ]: # then we've run out of required frames but the generator is still going? diff --git a/tests/test_langchain.py b/tests/test_langchain.py index b1f8f618d..d30d213bd 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -42,6 +42,8 @@ def __str__(self): return self.name async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + if isinstance(frame, LLMFullResponseStartFrame): self.start_collecting = True elif isinstance(frame, TextFrame) and self.start_collecting: