Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel_pipeline: fix system frames again #820

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions src/pipecat/pipeline/parallel_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

match direction:
case FrameDirection.UPSTREAM:
# We don't want to queue system frames as they would be
# processed by a separate task.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
# SystemFrames are pushed directly from ParallelPipeline.
if not isinstance(frame, SystemFrame):
await self._up_queue.put(frame)
case FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
Expand All @@ -49,11 +46,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
case FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
case FrameDirection.DOWNSTREAM:
# We don't want to queue system frames as they would be
# processed by a separate task.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
# SystemFrames are pushed directly from ParallelPipeline.
if not isinstance(frame, SystemFrame):
await self._down_queue.put(frame)


Expand Down Expand Up @@ -123,11 +117,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks])
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
# TODO(aleix): We are creating task for each frame. For real-time
# video/audio this might be too slow. We should use an already
# created task instead.
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources])

# If we have a SystemFrame we will push it from this task. Note that the
# connected sinks and sources ignore SystemFrames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)

# If we get an EndFrame we stop our queue processing tasks and wait on
# all the pipelines to finish.
if isinstance(frame, (CancelFrame, EndFrame)):
Expand Down
Loading