diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 1bf42311d..b56846aa6 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,7 +5,6 @@ # import asyncio -import time from enum import Enum diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index b75b65627..1458cc21d 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -423,18 +423,26 @@ async def _start(self, frame: StartFrame): await self._maybe_send_bot_ready() async def _stop(self, frame: EndFrame): - self._action_task.cancel() - await self._action_task + if self._action_task: + self._action_task.cancel() + await self._action_task + self._action_task = None - self._message_task.cancel() - await self._message_task + if self._message_task: + self._message_task.cancel() + await self._message_task + self._message_task = None async def _cancel(self, frame: CancelFrame): - self._action_task.cancel() - await self._action_task - - self._message_task.cancel() - await self._message_task + if self._action_task: + self._action_task.cancel() + await self._action_task + self._action_task = None + + if self._message_task: + self._message_task.cancel() + await self._message_task + self._message_task = None async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageFrame( diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index cdad3de52..197067adc 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -350,6 +350,7 @@ async def _stop_words_task(self): if self._words_task: self._words_task.cancel() await self._words_task + self._words_task = None async def _words_task_handler(self): while True: diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 73ad3f5e3..df7babff1 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -37,6 +37,10 @@ def __init__(self, params: TransportParams, **kwargs): self._executor = ThreadPoolExecutor(max_workers=5) + # Task to process incoming audio (VAD) and push audio frames downstream + # if passthrough is enabled. + self._audio_task = None + async def start(self, frame: StartFrame): # Create audio input queue and task if needed. if self._params.audio_in_enabled or self._params.vad_enabled: @@ -45,16 +49,17 @@ async def start(self, frame: StartFrame): async def stop(self, frame: EndFrame): # Cancel and wait for the audio input task to finish. - if self._params.audio_in_enabled or self._params.vad_enabled: + if self._audio_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_task.cancel() await self._audio_task + self._audio_task = None async def cancel(self, frame: CancelFrame): - # Cancel all the tasks and wait for them to finish. - - if self._params.audio_in_enabled or self._params.vad_enabled: + # Cancel and wait for the audio input task to finish. + if self._audio_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_task.cancel() await self._audio_task + self._audio_task = None def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 5423b122f..941a3505a 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -47,6 +47,18 @@ def __init__(self, params: TransportParams, **kwargs): self._params = params + # Task to process incoming frames so we don't block upstream elements. + self._sink_task = None + + # Task to process incoming frames using a clock. + self._sink_clock_task = None + + # Task to write/send audio frames. + self._audio_out_task = None + + # Task to write/send image frames. + self._camera_out_task = None + # These are the images that we should send to the camera at our desired # framerate. self._camera_images = None @@ -88,36 +100,53 @@ async def stop(self, frame: EndFrame): # that EndFrame to be processed by the sink tasks. We also need to wait # for these tasks before cancelling the camera and audio tasks below # because they might be still rendering. - await self._sink_task - await self._sink_clock_task + if self._sink_task: + await self._sink_task + if self._sink_clock_task: + await self._sink_clock_task # Cancel and wait for the camera output task to finish. - if self._params.camera_out_enabled: + if self._camera_out_task and self._params.camera_out_enabled: self._camera_out_task.cancel() await self._camera_out_task + self._camera_out_task = None # Cancel and wait for the audio output task to finish. - if self._params.audio_out_enabled and self._params.audio_out_is_live: + if ( + self._audio_out_task + and self._params.audio_out_enabled + and self._params.audio_out_is_live + ): self._audio_out_task.cancel() await self._audio_out_task + self._audio_out_task = None async def cancel(self, frame: CancelFrame): # Since we are cancelling everything it doesn't matter if we cancel sink # tasks first or not. - self._sink_task.cancel() - self._sink_clock_task.cancel() - await self._sink_task - await self._sink_clock_task + if self._sink_task: + self._sink_task.cancel() + await self._sink_task + self._sink_task = None + + if self._sink_clock_task: + self._sink_clock_task.cancel() + await self._sink_clock_task + self._sink_clock_task = None # Cancel and wait for the camera output task to finish. - if self._params.camera_out_enabled: + if self._camera_out_task and self._params.camera_out_enabled: self._camera_out_task.cancel() await self._camera_out_task + self._camera_out_task = None # Cancel and wait for the audio output task to finish. - if self._params.audio_out_enabled and self._params.audio_out_is_live: + if self._audio_out_task and ( + self._params.audio_out_enabled and self._params.audio_out_is_live + ): self._audio_out_task.cancel() await self._audio_out_task + self._audio_out_task = None async def send_message(self, frame: TransportMessageFrame): pass @@ -183,11 +212,13 @@ async def _handle_interruptions(self, frame: Frame): if isinstance(frame, StartInterruptionFrame): # Stop sink tasks. - self._sink_task.cancel() - await self._sink_task + if self._sink_task: + self._sink_task.cancel() + await self._sink_task # Stop sink clock tasks. - self._sink_clock_task.cancel() - await self._sink_clock_task + if self._sink_clock_task: + self._sink_clock_task.cancel() + await self._sink_clock_task # Create sink tasks. self._create_sink_tasks() # Let's send a bot stopped speaking if we have to. diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 48b59d8ff..50c2ae085 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -575,6 +575,9 @@ def __init__(self, client: DailyTransportClient, params: DailyParams, **kwargs): self._client = client self._video_renderers = {} + + # Task that gets audio data from a device or the network and queues it + # internally to be processed. self._audio_in_task = None self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer @@ -603,6 +606,7 @@ async def stop(self, frame: EndFrame): if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_in_task.cancel() await self._audio_in_task + self._audio_in_task = None async def cancel(self, frame: CancelFrame): # Parent stop. @@ -613,6 +617,7 @@ async def cancel(self, frame: CancelFrame): if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled): self._audio_in_task.cancel() await self._audio_in_task + self._audio_in_task = None async def cleanup(self): await super().cleanup()