Skip to content

Commit

Permalink
Code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
markbackman committed Dec 18, 2024
1 parent c4d8572 commit 963059f
Showing 1 changed file with 46 additions and 29 deletions.
75 changes: 46 additions & 29 deletions src/pipecat/processors/transcript_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from abc import ABC, abstractmethod
from typing import List

from loguru import logger

from pipecat.frames.frames import (
ErrorFrame,
Frame,
OpenAILLMContextAssistantTimestampFrame,
TranscriptionFrame,
Expand All @@ -21,7 +17,7 @@
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class BaseTranscriptProcessor(FrameProcessor, ABC):
class BaseTranscriptProcessor(FrameProcessor):
"""Base class for processing conversation transcripts.
Provides common functionality for handling transcript messages and updates.
Expand All @@ -45,16 +41,6 @@ async def _emit_update(self, messages: List[TranscriptionMessage]):
await self._call_event_handler("on_transcript_update", update_frame)
await self.push_frame(update_frame)

@abstractmethod
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames to build conversation transcript.
Args:
frame: Input frame to process
direction: Frame processing direction
"""
await super().process_frame(frame, direction)


class UserTranscriptProcessor(BaseTranscriptProcessor):
"""Processes user transcription frames into timestamped conversation messages."""
Expand Down Expand Up @@ -195,18 +181,44 @@ async def handle_update(processor, frame):
```
"""

def __init__(self, **kwargs):
"""Initialize factory with user and assistant processors."""
self._user_processor = UserTranscriptProcessor(**kwargs)
self._assistant_processor = AssistantTranscriptProcessor(**kwargs)
def __init__(self):
"""Initialize factory."""
self._user_processor = None
self._assistant_processor = None
self._event_handlers = {}

def user(self) -> UserTranscriptProcessor:
"""Get the user transcript processor."""
def user(self, **kwargs) -> UserTranscriptProcessor:
"""Get the user transcript processor.
Args:
**kwargs: Arguments specific to UserTranscriptProcessor
"""
if self._user_processor is None:
self._user_processor = UserTranscriptProcessor(**kwargs)
# Apply any registered event handlers
for event_name, handler in self._event_handlers.items():

@self._user_processor.event_handler(event_name)
async def user_handler(processor, frame):
return await handler(processor, frame)

return self._user_processor

def assistant(self) -> AssistantTranscriptProcessor:
"""Get the assistant transcript processor."""
def assistant(self, **kwargs) -> AssistantTranscriptProcessor:
"""Get the assistant transcript processor.
Args:
**kwargs: Arguments specific to AssistantTranscriptProcessor
"""
if self._assistant_processor is None:
self._assistant_processor = AssistantTranscriptProcessor(**kwargs)
# Apply any registered event handlers
for event_name, handler in self._event_handlers.items():

@self._assistant_processor.event_handler(event_name)
async def assistant_handler(processor, frame):
return await handler(processor, frame)

return self._assistant_processor

def event_handler(self, event_name: str):
Expand All @@ -222,13 +234,18 @@ def event_handler(self, event_name: str):
def decorator(handler):
self._event_handlers[event_name] = handler

@self._user_processor.event_handler(event_name)
async def user_handler(processor, frame):
return await handler(processor, frame)
# Apply handler to existing processors if they exist
if self._user_processor:

@self._user_processor.event_handler(event_name)
async def user_handler(processor, frame):
return await handler(processor, frame)

if self._assistant_processor:

@self._assistant_processor.event_handler(event_name)
async def assistant_handler(processor, frame):
return await handler(processor, frame)
@self._assistant_processor.event_handler(event_name)
async def assistant_handler(processor, frame):
return await handler(processor, frame)

return handler

Expand Down

0 comments on commit 963059f

Please sign in to comment.