Skip to content

Commit

Permalink
Merge pull request #505 from pipecat-ai/aleix/init-task-variables
Browse files Browse the repository at this point in the history
initialize task variables and add minor description
  • Loading branch information
aconchillo authored Sep 25, 2024
2 parents 8502c7c + e276dcb commit cf0ab85
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 28 deletions.
1 change: 0 additions & 1 deletion src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#

import asyncio
import time

from enum import Enum

Expand Down
26 changes: 17 additions & 9 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,18 +423,26 @@ async def _start(self, frame: StartFrame):
await self._maybe_send_bot_ready()

async def _stop(self, frame: EndFrame):
self._action_task.cancel()
await self._action_task
if self._action_task:
self._action_task.cancel()
await self._action_task
self._action_task = None

self._message_task.cancel()
await self._message_task
if self._message_task:
self._message_task.cancel()
await self._message_task
self._message_task = None

async def _cancel(self, frame: CancelFrame):
self._action_task.cancel()
await self._action_task

self._message_task.cancel()
await self._message_task
if self._action_task:
self._action_task.cancel()
await self._action_task
self._action_task = None

if self._message_task:
self._message_task.cancel()
await self._message_task
self._message_task = None

async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
frame = TransportMessageFrame(
Expand Down
1 change: 1 addition & 0 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ async def _stop_words_task(self):
if self._words_task:
self._words_task.cancel()
await self._words_task
self._words_task = None

async def _words_task_handler(self):
while True:
Expand Down
13 changes: 9 additions & 4 deletions src/pipecat/transports/base_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def __init__(self, params: TransportParams, **kwargs):

self._executor = ThreadPoolExecutor(max_workers=5)

# Task to process incoming audio (VAD) and push audio frames downstream
# if passthrough is enabled.
self._audio_task = None

async def start(self, frame: StartFrame):
# Create audio input queue and task if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
Expand All @@ -45,16 +49,17 @@ async def start(self, frame: StartFrame):

async def stop(self, frame: EndFrame):
# Cancel and wait for the audio input task to finish.
if self._params.audio_in_enabled or self._params.vad_enabled:
if self._audio_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_task.cancel()
await self._audio_task
self._audio_task = None

async def cancel(self, frame: CancelFrame):
# Cancel all the tasks and wait for them to finish.

if self._params.audio_in_enabled or self._params.vad_enabled:
# Cancel and wait for the audio input task to finish.
if self._audio_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_task.cancel()
await self._audio_task
self._audio_task = None

def vad_analyzer(self) -> VADAnalyzer | None:
return self._params.vad_analyzer
Expand Down
59 changes: 45 additions & 14 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ def __init__(self, params: TransportParams, **kwargs):

self._params = params

# Task to process incoming frames so we don't block upstream elements.
self._sink_task = None

# Task to process incoming frames using a clock.
self._sink_clock_task = None

# Task to write/send audio frames.
self._audio_out_task = None

# Task to write/send image frames.
self._camera_out_task = None

# These are the images that we should send to the camera at our desired
# framerate.
self._camera_images = None
Expand Down Expand Up @@ -88,36 +100,53 @@ async def stop(self, frame: EndFrame):
# that EndFrame to be processed by the sink tasks. We also need to wait
# for these tasks before cancelling the camera and audio tasks below
# because they might be still rendering.
await self._sink_task
await self._sink_clock_task
if self._sink_task:
await self._sink_task
if self._sink_clock_task:
await self._sink_clock_task

# Cancel and wait for the camera output task to finish.
if self._params.camera_out_enabled:
if self._camera_out_task and self._params.camera_out_enabled:
self._camera_out_task.cancel()
await self._camera_out_task
self._camera_out_task = None

# Cancel and wait for the audio output task to finish.
if self._params.audio_out_enabled and self._params.audio_out_is_live:
if (
self._audio_out_task
and self._params.audio_out_enabled
and self._params.audio_out_is_live
):
self._audio_out_task.cancel()
await self._audio_out_task
self._audio_out_task = None

async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter if we cancel sink
# tasks first or not.
self._sink_task.cancel()
self._sink_clock_task.cancel()
await self._sink_task
await self._sink_clock_task
if self._sink_task:
self._sink_task.cancel()
await self._sink_task
self._sink_task = None

if self._sink_clock_task:
self._sink_clock_task.cancel()
await self._sink_clock_task
self._sink_clock_task = None

# Cancel and wait for the camera output task to finish.
if self._params.camera_out_enabled:
if self._camera_out_task and self._params.camera_out_enabled:
self._camera_out_task.cancel()
await self._camera_out_task
self._camera_out_task = None

# Cancel and wait for the audio output task to finish.
if self._params.audio_out_enabled and self._params.audio_out_is_live:
if self._audio_out_task and (
self._params.audio_out_enabled and self._params.audio_out_is_live
):
self._audio_out_task.cancel()
await self._audio_out_task
self._audio_out_task = None

async def send_message(self, frame: TransportMessageFrame):
pass
Expand Down Expand Up @@ -183,11 +212,13 @@ async def _handle_interruptions(self, frame: Frame):

if isinstance(frame, StartInterruptionFrame):
# Stop sink tasks.
self._sink_task.cancel()
await self._sink_task
if self._sink_task:
self._sink_task.cancel()
await self._sink_task
# Stop sink clock tasks.
self._sink_clock_task.cancel()
await self._sink_clock_task
if self._sink_clock_task:
self._sink_clock_task.cancel()
await self._sink_clock_task
# Create sink tasks.
self._create_sink_tasks()
# Let's send a bot stopped speaking if we have to.
Expand Down
5 changes: 5 additions & 0 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ def __init__(self, client: DailyTransportClient, params: DailyParams, **kwargs):
self._client = client

self._video_renderers = {}

# Task that gets audio data from a device or the network and queues it
# internally to be processed.
self._audio_in_task = None

self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer
Expand Down Expand Up @@ -603,6 +606,7 @@ async def stop(self, frame: EndFrame):
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None

async def cancel(self, frame: CancelFrame):
# Parent stop.
Expand All @@ -613,6 +617,7 @@ async def cancel(self, frame: CancelFrame):
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
self._audio_in_task.cancel()
await self._audio_in_task
self._audio_in_task = None

async def cleanup(self):
await super().cleanup()
Expand Down

0 comments on commit cf0ab85

Please sign in to comment.