From add8d3cbaf4ada67610ef8c32afb4266f147227f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 16 May 2024 16:33:47 -0700 Subject: [PATCH] transport: create input transports push frame task --- CHANGELOG.md | 3 ++ src/pipecat/transports/base_input.py | 35 ++++++++++++++++++++---- src/pipecat/transports/local/tk.py | 6 ++-- src/pipecat/transports/services/daily.py | 8 ++++-- 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b8cee789..28a97c370 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `DailyTransport`: don't publish camera and audio tracks if not enabled. +- Fixed an issue in `BaseInputTransport` that was causing frames pushed + downstream not pushed in the right order. + ## [0.0.15] - 2024-05-15 ### Fixed diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 5345eda6f..265c8e6c4 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -35,6 +35,13 @@ def __init__(self, params: TransportParams): if self._params.audio_in_enabled or self._params.vad_enabled: self._audio_in_queue = queue.Queue() + # Start push frame task. This is the task that will push frames in + # order. So, a transport guarantees that all frames are pushed in the + # same task. + loop = self.get_event_loop() + self._push_frame_task = loop.create_task(self._push_frame_task_handler()) + self._push_queue = asyncio.Queue() + async def start(self): if self._running: return @@ -74,12 +81,30 @@ async def cleanup(self): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, StartFrame): await self.start() - await self.push_frame(frame, direction) + await self._internal_push_frame(frame, direction) elif isinstance(frame, CancelFrame) or isinstance(frame, EndFrame): await self.stop() - await self.push_frame(frame, direction) + await self._internal_push_frame(frame, direction) else: - await self.push_frame(frame, direction) + await self._internal_push_frame(frame, direction) + + # + # Push frames task + # + + async def _internal_push_frame( + self, + frame: Frame, + direction: FrameDirection = FrameDirection.DOWNSTREAM): + await self._push_queue.put((frame, direction)) + + async def _push_frame_task_handler(self): + running = True + while running: + (frame, direction) = await self._push_queue.get() + if frame: + await self.push_frame(frame, direction) + running = frame is not None # # Audio input @@ -95,7 +120,7 @@ def _handle_vad(self, audio_frames: bytes, vad_state: VADState): frame = UserStoppedSpeakingFrame() if frame: future = asyncio.run_coroutine_threadsafe( - self.push_frame(frame), self.get_event_loop()) + self._internal_push_frame(frame), self.get_event_loop()) future.result() vad_state = new_vad_state return vad_state @@ -133,7 +158,7 @@ def _audio_out_thread_handler(self): # Push audio downstream if passthrough. if audio_passthrough: future = asyncio.run_coroutine_threadsafe( - self.push_frame(frame), self.get_event_loop()) + self._internal_push_frame(frame), self.get_event_loop()) future.result() except queue.Empty: pass diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 4b1d4e297..4165f941c 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -87,9 +87,7 @@ def write_raw_audio_frames(self, frames: bytes): self._out_stream.write(frames) def write_frame_to_camera(self, frame: ImageRawFrame): - future = asyncio.run_coroutine_threadsafe( - self._write_frame_to_tk(frame), self.get_event_loop()) - future.result() + self.get_event_loop().call_soon(self._write_frame_to_tk, frame) async def start(self): await super().start() @@ -107,7 +105,7 @@ async def cleanup(self): await super().cleanup() - async def _write_frame_to_tk(self, frame: ImageRawFrame): + def _write_frame_to_tk(self, frame: ImageRawFrame): width = frame.size[0] height = frame.size[1] data = f"P6 {width} {height} 255 ".encode() + frame.image diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7ee8b101e..cca69a284 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -478,12 +478,14 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): - future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) + future = asyncio.run_coroutine_threadsafe( + self._internal_push_frame(frame), self.get_event_loop()) future.result() def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) - future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) + future = asyncio.run_coroutine_threadsafe( + self._internal_push_frame(frame), self.get_event_loop()) future.result() # @@ -543,7 +545,7 @@ def _camera_in_thread_handler(self): try: frame = self._camera_in_queue.get(timeout=1) future = asyncio.run_coroutine_threadsafe( - self.push_frame(frame), self.get_event_loop()) + self._internal_push_frame(frame), self.get_event_loop()) future.result() except queue.Empty: pass