diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index a76812806..6a10f927c 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -44,10 +44,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(d, ProcessingMetricsData): print(f"!!! MetricsFrame: {frame}, processing: {d.value}") elif isinstance(d, LLMUsageMetricsData): + tokens = d.value print( f"!!! MetricsFrame: {frame}, tokens: { - d.prompt_tokens}, characters: { - d.completion_tokens}") + tokens.prompt_tokens}, characters: { + tokens.completion_tokens}") elif isinstance(d, TTSUsageMetricsData): print(f"!!! MetricsFrame: {frame}, characters: {d.value}") await self.push_frame(frame, direction) diff --git a/src/pipecat/metrics/metrics.py b/src/pipecat/metrics/metrics.py index 9baffcbcb..053708998 100644 --- a/src/pipecat/metrics/metrics.py +++ b/src/pipecat/metrics/metrics.py @@ -4,31 +4,28 @@ class MetricsData(BaseModel): processor: str + model: Optional[str] = None class TTFBMetricsData(MetricsData): value: float - model: Optional[str] = None class ProcessingMetricsData(MetricsData): value: float - model: Optional[str] = None -class LLMUsageMetricsData(MetricsData): - model: str +class LLMTokenUsage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int + cache_read_input_tokens: Optional[int] = None + cache_creation_input_tokens: Optional[int] = None -class CacheUsageMetricsData(LLMUsageMetricsData): - cache_read_input_tokens: int - cache_creation_input_tokens: int +class LLMUsageMetricsData(MetricsData): + value: LLMTokenUsage class TTSUsageMetricsData(MetricsData): - processor: str - model: Optional[str] = None value: int diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 051542486..b97129e96 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -19,7 +19,13 @@ StartInterruptionFrame, StopInterruptionFrame, SystemFrame) -from pipecat.metrics.metrics import LLMUsageMetricsData, ProcessingMetricsData, TTFBMetricsData, TTSUsageMetricsData +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -32,47 +38,68 @@ class FrameDirection(Enum): class FrameProcessorMetrics: def __init__(self, name: str): - self._name = name + self._core_metrics_data = MetricsData(processor=name) self._start_ttfb_time = 0 self._start_processing_time = 0 self._should_report_ttfb = True + def _processor_name(self): + return self._core_metrics_data.processor + + def _model_name(self): + return self._core_metrics_data.model + + def set_core_metrics_data(self, data: MetricsData): + self._core_metrics_data = data + async def start_ttfb_metrics(self, report_only_initial_ttfb): if self._should_report_ttfb: self._start_ttfb_time = time.time() self._should_report_ttfb = not report_only_initial_ttfb - async def stop_ttfb_metrics(self, model: str | None = None): + async def stop_ttfb_metrics(self): if self._start_ttfb_time == 0: return None value = time.time() - self._start_ttfb_time - logger.debug(f"{self._name} TTFB: {value}") - ttfb = TTFBMetricsData(processor=self._name, value=value, model=model) + logger.debug(f"{self._processor_name()} TTFB: {value}") + ttfb = TTFBMetricsData( + processor=self._processor_name(), + value=value, + model=self._model_name()) self._start_ttfb_time = 0 return MetricsFrame(data=[ttfb]) async def start_processing_metrics(self): self._start_processing_time = time.time() - async def stop_processing_metrics(self, model: str | None = None): + async def stop_processing_metrics(self): if self._start_processing_time == 0: return None value = time.time() - self._start_processing_time - logger.debug(f"{self._name} processing time: {value}") - processing = ProcessingMetricsData(processor=self._name, value=value, model=model) + logger.debug(f"{self._processor_name()} processing time: {value}") + processing = ProcessingMetricsData( + processor=self._processor_name(), value=value, model=self._model_name()) self._start_processing_time = 0 return MetricsFrame(data=[processing]) - async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData): - logger.debug(f"{self._name} prompt tokens: { - usage_params.prompt_tokens}, completion tokens: {usage_params.completion_tokens}") - return MetricsFrame(data=[usage_params]) + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): + logger.debug(f"{self._processor_name()} prompt tokens: { + tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}") + value = LLMUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=tokens) + return MetricsFrame(data=[value]) - async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData): - logger.debug(f"{self._name} usage characters: {usage_params.value}") - return MetricsFrame(data=[usage_params]) + async def start_tts_usage_metrics(self, text: str): + characters = TTSUsageMetricsData( + processor=self._processor_name(), + model=self._model_name(), + value=len(text)) + logger.debug(f"{self._processor_name()} usage characters: {characters.value}") + return MetricsFrame(data=[characters]) class FrameProcessor: @@ -131,13 +158,16 @@ def report_only_initial_ttfb(self): def can_generate_metrics(self) -> bool: return False + def set_core_metrics_data(self, data: MetricsData): + self._metrics.set_core_metrics_data(data) + async def start_ttfb_metrics(self): if self.can_generate_metrics() and self.metrics_enabled: await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb) - async def stop_ttfb_metrics(self, model: str | None = None): + async def stop_ttfb_metrics(self): if self.can_generate_metrics() and self.metrics_enabled: - frame = await self._metrics.stop_ttfb_metrics(model) + frame = await self._metrics.stop_ttfb_metrics() if frame: await self.push_frame(frame) @@ -145,27 +175,27 @@ async def start_processing_metrics(self): if self.can_generate_metrics() and self.metrics_enabled: await self._metrics.start_processing_metrics() - async def stop_processing_metrics(self, model: str | None = None): + async def stop_processing_metrics(self): if self.can_generate_metrics() and self.metrics_enabled: - frame = await self._metrics.stop_processing_metrics(model) + frame = await self._metrics.stop_processing_metrics() if frame: await self.push_frame(frame) - async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData): + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): if self.can_generate_metrics() and self.usage_metrics_enabled: - frame = await self._metrics.start_llm_usage_metrics(usage_params) + frame = await self._metrics.start_llm_usage_metrics(tokens) if frame: await self.push_frame(frame) - async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData): + async def start_tts_usage_metrics(self, text: str): if self.can_generate_metrics() and self.usage_metrics_enabled: - frame = await self._metrics.start_tts_usage_metrics(usage_params) + frame = await self._metrics.start_tts_usage_metrics(text) if frame: await self.push_frame(frame) - async def stop_all_metrics(self, model: str | None = None): - await self.stop_ttfb_metrics(model) - await self.stop_processing_metrics(model) + async def stop_all_metrics(self): + await self.stop_ttfb_metrics() + await self.stop_processing_metrics() async def cleanup(self): pass diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 441839a5a..7146a3b98 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -11,7 +11,6 @@ from abc import abstractmethod from typing import AsyncGenerator, List, Optional, Tuple -from attr import has from pipecat.frames.frames import ( AudioRawFrame, CancelFrame, @@ -33,6 +32,7 @@ UserImageRequestFrame, VisionImageRawFrame ) +from pipecat.metrics.metrics import MetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -47,6 +47,11 @@ class AIService(FrameProcessor): def __init__(self, **kwargs): super().__init__(**kwargs) + self.model_name: str = "" + + def set_model_name(self, model: str): + self.model_name = model + self.set_core_metrics_data(MetricsData(processor=self.name, model=self.model_name)) async def start(self, frame: StartFrame): pass @@ -159,7 +164,7 @@ def sample_rate(self) -> int: @abstractmethod async def set_model(self, model: str): - pass + self.set_model_name(model) @abstractmethod async def set_voice(self, voice: str): @@ -368,7 +373,7 @@ def __init__(self, **kwargs): @abstractmethod async def set_model(self, model: str): - pass + self.set_model_name(model) @abstractmethod async def set_language(self, language: Language): @@ -483,7 +488,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) await self.start_processing_metrics() await self.process_generator(self.run_image_gen(frame.text)) - await self.stop_processing_metrics(self._model if hasattr(self, "_model") else None) + await self.stop_processing_metrics() else: await self.push_frame(frame, direction) @@ -505,6 +510,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, VisionImageRawFrame): await self.start_processing_metrics() await self.process_generator(self.run_vision(frame)) - await self.stop_processing_metrics(self._model if hasattr(self, "_model") else None) + await self.stop_processing_metrics() else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 754c1d7e2..7935691ce 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -29,7 +29,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) -from pipecat.metrics.metrics import CacheUsageMetricsData +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import ( @@ -85,7 +85,7 @@ def __init__( **kwargs): super().__init__(**kwargs) self._client = AsyncAnthropic(api_key=api_key) - self._model = model + self.set_model_name(model) self._max_tokens = max_tokens self._enable_prompt_caching_beta = enable_prompt_caching_beta @@ -138,11 +138,11 @@ async def _process_context(self, context: OpenAILLMContext): tools=context.tools or [], system=context.system, messages=messages, - model=self._model, + model=self.model_name, max_tokens=self._max_tokens, stream=True) - await self.stop_ttfb_metrics(self._model) + await self.stop_ttfb_metrics() # Function calling tool_use_block = None @@ -206,7 +206,7 @@ async def _process_context(self, context: OpenAILLMContext): except Exception as e: logger.exception(f"{self} exception: {e}") finally: - await self.stop_processing_metrics(self._model) + await self.stop_processing_metrics() await self.push_frame(LLMFullResponseEndFrame()) comp_tokens = completion_tokens if not use_completion_tokens_estimate else completion_tokens_estimate await self._report_usage_metrics( @@ -232,7 +232,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = AnthropicLLMContext.from_image_frame(frame) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) elif isinstance(frame, LLMEnablePromptCachingFrame): logger.debug(f"Setting enable prompt caching to: [{frame.enable}]") self._enable_prompt_caching_beta = frame.enable @@ -252,9 +252,7 @@ async def _report_usage_metrics( cache_creation_input_tokens: int, cache_read_input_tokens: int): if prompt_tokens or completion_tokens or cache_creation_input_tokens or cache_read_input_tokens: - tokens = CacheUsageMetricsData( - processor=self.name, - model=self._model, + tokens = LLMTokenUsage( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, cache_creation_input_tokens=cache_creation_input_tokens, diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 07a75af2b..daac57119 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -113,7 +113,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml)) if result.reason == ResultReason.SynthesizingAudioCompleted: - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, value=len(text))) + await self.start_tts_usage_metrics(text) await self.stop_ttfb_metrics() await self.push_frame(TTSStartedFrame()) # Azure always sends a 44-byte header. Strip it off. @@ -192,7 +192,7 @@ def __init__( self._api_key = api_key self._azure_endpoint = endpoint self._api_version = api_version - self._model = model + self.set_model_name(model) self._image_size = image_size self._aiohttp_session = aiohttp_session diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 97aac634f..ab6026052 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -23,7 +23,6 @@ TTSStoppedFrame, LLMFullResponseEndFrame ) -from pipecat.metrics.metrics import TTSUsageMetricsData from pipecat.processors.frame_processor import FrameDirection from pipecat.transcriptions.language import Language from pipecat.services.ai_services import AsyncWordTTSService, TTSService @@ -90,7 +89,7 @@ def __init__( self._cartesia_version = cartesia_version self._url = url self._voice_id = voice_id - self._model_id = model_id + self.set_model_name(model_id) self._output_format = { "container": "raw", "encoding": encoding, @@ -106,8 +105,8 @@ def can_generate_metrics(self) -> bool: return True async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching TTS model to: [{model}]") - self._model_id = model async def set_voice(self, voice: str): logger.debug(f"Switching TTS voice to: [{voice}]") @@ -141,7 +140,7 @@ async def _connect(self): async def _disconnect(self): try: - await self.stop_all_metrics(self._model_id) + await self.stop_all_metrics() if self._websocket: await self._websocket.close() @@ -156,9 +155,14 @@ async def _disconnect(self): except Exception as e: logger.error(f"{self} error closing websocket: {e}") + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): await super()._handle_interruption(frame, direction) - await self.stop_all_metrics(self._model_id) + await self.stop_all_metrics() await self.push_frame(LLMFullResponseEndFrame()) self._context_id = None @@ -170,7 +174,7 @@ async def flush_audio(self): "transcript": "", "continue": False, "context_id": self._context_id, - "model_id": self._model_id, + "model_id": self.model_name, "voice": { "mode": "id", "id": self._voice_id @@ -183,12 +187,12 @@ async def flush_audio(self): async def _receive_task_handler(self): try: - async for message in self._websocket: + async for message in self._get_websocket(): msg = json.loads(message) if not msg or msg["context_id"] != self._context_id: continue if msg["type"] == "done": - await self.stop_ttfb_metrics(self._model_id) + await self.stop_ttfb_metrics() await self.push_frame(TTSStoppedFrame()) # Unset _context_id but not the _context_id_start_timestamp # because we are likely still playing out audio and need the @@ -200,7 +204,7 @@ async def _receive_task_handler(self): list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"])) ) elif msg["type"] == "chunk": - await self.stop_ttfb_metrics(self._model_id) + await self.stop_ttfb_metrics() self.start_word_timestamps() frame = AudioRawFrame( audio=base64.b64decode(msg["data"]), @@ -211,7 +215,7 @@ async def _receive_task_handler(self): elif msg["type"] == "error": logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) - await self.stop_all_metrics(self._model_id) + await self.stop_all_metrics() await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}')) else: logger.error(f"Cartesia error, unknown message type: {msg}") @@ -236,7 +240,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "transcript": text + " ", "continue": True, "context_id": self._context_id, - "model_id": self._model_id, + "model_id": self.model_name, "voice": { "mode": "id", "id": self._voice_id @@ -246,8 +250,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "add_timestamps": True, } try: - await self._websocket.send(json.dumps(msg)) - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, model=self._model_id, value=len(text))) + await self._get_websocket().send(json.dumps(msg)) + await self.start_tts_usage_metrics(text) except Exception as e: logger.error(f"{self} error sending message: {e}") await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 6dfc9d5cb..7e5e16d3e 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -19,7 +19,6 @@ TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame) -from pipecat.metrics.metrics import TTSUsageMetricsData from pipecat.services.ai_services import STTService, TTSService from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 @@ -100,7 +99,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {response_text})") return - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, value=len(text))) + await self.start_tts_usage_metrics(text) await self.push_frame(TTSStartedFrame()) async for data in r.content: @@ -139,6 +138,7 @@ def __init__(self, self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching STT model to: [{model}]") self._live_options.model = model await self._disconnect() @@ -165,7 +165,7 @@ async def cancel(self, frame: CancelFrame): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self._connection.send(audio) - await self.stop_processing_metrics(self._live_options.model) + await self.stop_processing_metrics() yield None async def _connect(self): diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 35af8b5e3..10015e791 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -20,7 +20,6 @@ StartInterruptionFrame, TTSStartedFrame, TTSStoppedFrame) -from pipecat.metrics.metrics import TTSUsageMetricsData from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import AsyncWordTTSService @@ -108,7 +107,7 @@ def __init__( self._api_key = api_key self._voice_id = voice_id - self._model = model + self.set_model_name(model) self._url = url self._params = params @@ -123,8 +122,8 @@ def can_generate_metrics(self) -> bool: return True async def set_model(self, model: str): + await super().set_model(model) logger.debug(f"Switching TTS model to: [{model}]") - self._model = model await self._disconnect() await self._connect() @@ -161,7 +160,7 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect async def _connect(self): try: voice_id = self._voice_id - model = self._model + model = self.model_name output_format = self._params.output_format url = f"{ self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}" @@ -181,7 +180,7 @@ async def _connect(self): async def _disconnect(self): try: - await self.stop_all_metrics(self._model) + await self.stop_all_metrics() if self._websocket: await self._websocket.send(json.dumps({"text": ""})) @@ -207,7 +206,7 @@ async def _receive_task_handler(self): async for message in self._websocket: msg = json.loads(message) if msg.get("audio"): - await self.stop_ttfb_metrics(self._model) + await self.stop_ttfb_metrics() self.start_word_timestamps() audio = base64.b64decode(msg["audio"]) @@ -253,7 +252,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: self._cumulative_time = 0 await self._send_text(text) - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, model=self._model, value=len(text))) + await self.start_tts_usage_metrics(text) except Exception as e: logger.error(f"{self} error sending message: {e}") await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/services/fal.py b/src/pipecat/services/fal.py index 672135d02..58768180f 100644 --- a/src/pipecat/services/fal.py +++ b/src/pipecat/services/fal.py @@ -46,7 +46,7 @@ def __init__( **kwargs ): super().__init__(**kwargs) - self._model = model + self.set_model_name(model) self._params = params self._aiohttp_session = aiohttp_session if key: @@ -56,7 +56,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating image from prompt: {prompt}") response = await fal_client.run_async( - self._model, + self.model_name, arguments={"prompt": prompt, **self._params.model_dump(exclude_none=True)} ) diff --git a/src/pipecat/services/fireworks.py b/src/pipecat/services/fireworks.py index 7fa4d64e8..87fddd838 100644 --- a/src/pipecat/services/fireworks.py +++ b/src/pipecat/services/fireworks.py @@ -22,4 +22,4 @@ def __init__(self, *, model: str = "accounts/fireworks/models/firefunction-v1", base_url: str = "https://api.fireworks.ai/inference/v1"): - super().__init__(model, base_url) + super().__init__(model=model, base_url=base_url) diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index b542f247d..b72169b70 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -45,12 +45,12 @@ def __init__(self, *, api_key: str, model: str = "gemini-1.5-flash-latest", **kw super().__init__(**kwargs) gai.configure(api_key=api_key) self._create_client(model) - self._model = model def can_generate_metrics(self) -> bool: return True def _create_client(self, model: str): + self.set_model_name(model) self._client = gai.GenerativeModel(model) def _get_messages_from_openai_context( @@ -93,7 +93,7 @@ async def _process_context(self, context: OpenAILLMContext): response = self._client.generate_content(messages, stream=True) - await self.stop_ttfb_metrics(self._model) + await self.stop_ttfb_metrics() async for chunk in self._async_generator_wrapper(response): try: diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 2fec2d21f..60f0cb7df 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -20,7 +20,6 @@ TTSStartedFrame, TTSStoppedFrame, ) -from pipecat.metrics.metrics import TTSUsageMetricsData from pipecat.services.ai_services import AsyncTTSService from loguru import logger @@ -155,7 +154,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: try: await self._connection.append_text(text) await self._connection.flush() - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, value=len(text))) + await self.start_tts_usage_metrics(text) except Exception as e: logger.error(f"{self} error sending message: {e}") await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index 3441aeeb9..b6391cc93 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -54,6 +54,8 @@ def __init__( ): super().__init__(**kwargs) + self.set_model_name(model) + if not use_cpu: device, dtype = detect_device() else: @@ -73,7 +75,7 @@ def __init__( async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: if not self._model: - logger.error(f"{self} error: Moondream model not available") + logger.error(f"{self} error: Moondream model not available ({self.model_name})") yield ErrorFrame("Moondream model not available") return diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 7ccec379f..6fe710b5e 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -33,7 +33,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) -from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator, LLMAssistantContextAggregator from pipecat.processors.aggregators.openai_llm_context import ( @@ -84,7 +84,7 @@ class BaseOpenAILLMService(LLMService): def __init__(self, *, model: str, api_key=None, base_url=None, **kwargs): super().__init__(**kwargs) - self._model: str = model + self.set_model_name(model) self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs) def create_client(self, api_key=None, base_url=None, **kwargs): @@ -105,7 +105,7 @@ async def get_chat_completions( context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]: chunks = await self._client.chat.completions.create( - model=self._model, + model=self.model_name, stream=True, messages=messages, tools=context.tools, @@ -149,9 +149,7 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in chunk_stream: if chunk.usage: - tokens = LLMUsageMetricsData( - processor=self.name, - model=self._model, + tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, completion_tokens=chunk.usage.completion_tokens, total_tokens=chunk.usage.total_tokens @@ -161,7 +159,7 @@ async def _process_context(self, context: OpenAILLMContext): if len(chunk.choices) == 0: continue - await self.stop_ttfb_metrics(self._model) + await self.stop_ttfb_metrics() if chunk.choices[0].delta.tool_calls: # We're streaming the LLM response to enable the fastest response times. @@ -224,7 +222,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = OpenAILLMContext.from_image_frame(frame) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) else: await self.push_frame(frame, direction) @@ -232,7 +230,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(LLMFullResponseStartFrame()) await self.start_processing_metrics() await self._process_context(context) - await self.stop_processing_metrics(self._model) + await self.stop_processing_metrics() await self.push_frame(LLMFullResponseEndFrame()) @@ -274,7 +272,7 @@ def __init__( model: str = "dall-e-3", ): super().__init__() - self._model = model + self.set_model_name(model) self._image_size = image_size self._client = AsyncOpenAI(api_key=api_key) self._aiohttp_session = aiohttp_session @@ -284,7 +282,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: image = await self._client.images.generate( prompt=prompt, - model=self._model, + model=self.model_name, n=1, size=self._image_size ) @@ -326,7 +324,7 @@ def __init__( super().__init__(sample_rate=sample_rate, **kwargs) self._voice: ValidVoice = VALID_VOICES.get(voice, "alloy") - self._model = model + self.set_model_name(model) self._sample_rate = sample_rate self._client = AsyncOpenAI(api_key=api_key) @@ -349,7 +347,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async with self._client.audio.speech.with_streaming_response.create( input=text, - model=self._model, + model=self.model_name, voice=self._voice, response_format="pcm", ) as r: @@ -360,12 +358,12 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield ErrorFrame(f"Error getting audio (status: {r.status_code}, error: {error})") return - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, model=self._model, value=len(text))) + await self.start_tts_usage_metrics(text) await self.push_frame(TTSStartedFrame()) async for chunk in r.iter_bytes(8192): if len(chunk) > 0: - await self.stop_ttfb_metrics(self._model) + await self.stop_ttfb_metrics() frame = AudioRawFrame(chunk, self.sample_rate, 1) yield frame await self.push_frame(TTSStoppedFrame()) diff --git a/src/pipecat/services/openpipe.py b/src/pipecat/services/openpipe.py index ada7824fb..e4e14dc15 100644 --- a/src/pipecat/services/openpipe.py +++ b/src/pipecat/services/openpipe.py @@ -60,7 +60,7 @@ async def get_chat_completions( context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]: chunks = await self._client.chat.completions.create( - model=self._model, + model=self.model_name, stream=True, messages=messages, openpipe={ diff --git a/src/pipecat/services/playht.py b/src/pipecat/services/playht.py index 736d995c5..c3200fee9 100644 --- a/src/pipecat/services/playht.py +++ b/src/pipecat/services/playht.py @@ -10,7 +10,6 @@ from typing import AsyncGenerator from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame -from pipecat.metrics.metrics import TTSUsageMetricsData from pipecat.services.ai_services import TTSService from loguru import logger @@ -72,7 +71,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: voice_engine="PlayHT2.0-turbo", options=self._options) - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, value=len(text))) + await self.start_tts_usage_metrics(text) await self.push_frame(TTSStartedFrame()) async for chunk in playht_gen: diff --git a/src/pipecat/services/together.py b/src/pipecat/services/together.py index 9bdad950b..004236ac8 100644 --- a/src/pipecat/services/together.py +++ b/src/pipecat/services/together.py @@ -26,7 +26,7 @@ FunctionCallInProgressFrame, StartInterruptionFrame ) -from pipecat.metrics.metrics import LLMUsageMetricsData +from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame @@ -68,7 +68,7 @@ def __init__( **kwargs): super().__init__(**kwargs) self._client = AsyncTogether(api_key=api_key) - self._model = model + self.set_model_name(model) self._max_tokens = max_tokens def can_generate_metrics(self) -> bool: @@ -94,7 +94,7 @@ async def _process_context(self, context: OpenAILLMContext): stream = await self._client.chat.completions.create( messages=context.messages, - model=self._model, + model=self.model_name, max_tokens=self._max_tokens, stream=True, ) @@ -107,9 +107,7 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in stream: # logger.debug(f"Together LLM event: {chunk}") if chunk.usage: - tokens = LLMUsageMetricsData( - processor=self.name, - model=self._model, + tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, completion_tokens=chunk.usage.completion_tokens, total_tokens=chunk.usage.total_tokens @@ -120,7 +118,7 @@ async def _process_context(self, context: OpenAILLMContext): continue if not got_first_chunk: - await self.stop_ttfb_metrics(self._model) + await self.stop_ttfb_metrics() if chunk.choices[0].delta.content: got_first_chunk = True if chunk.choices[0].delta.content[0] == "<": @@ -142,7 +140,7 @@ async def _process_context(self, context: OpenAILLMContext): except Exception as e: logger.exception(f"{self} exception: {e}") finally: - await self.stop_processing_metrics(self._model) + await self.stop_processing_metrics() await self.push_frame(LLMFullResponseEndFrame()) async def process_frame(self, frame: Frame, direction: FrameDirection): @@ -155,7 +153,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): context = TogetherLLMContext.from_messages(frame.messages) elif isinstance(frame, LLMModelUpdateFrame): logger.debug(f"Switching LLM model to: [{frame.model}]") - self._model = frame.model + self.set_model_name(frame.model) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 952c30416..9f54f9ca0 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -52,7 +52,7 @@ def __init__(self, super().__init__(**kwargs) self._device: str = device self._compute_type = compute_type - self._model_name: str | Model = model + self.set_model_name(model if isinstance(model, str) else model.value) self._no_speech_prob = no_speech_prob self._model: WhisperModel | None = None self._load() @@ -60,15 +60,12 @@ def __init__(self, def can_generate_metrics(self) -> bool: return True - def _get_model_name_as_str(self) -> str: - return self._model_name.value if isinstance(self._model_name, Enum) else self._model_name - def _load(self): """Loads the Whisper model. Note that if this is the first time this model is being run, it will take time to download.""" logger.debug("Loading Whisper model...") self._model = WhisperModel( - self._get_model_name_as_str(), + self.model_name, device=self._device, compute_type=self._compute_type) logger.debug("Loaded Whisper model") @@ -92,8 +89,8 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: if segment.no_speech_prob < self._no_speech_prob: text += f"{segment.text} " - await self.stop_ttfb_metrics(self._get_model_name_as_str()) - await self.stop_processing_metrics(self._get_model_name_as_str()) + await self.stop_ttfb_metrics() + await self.stop_processing_metrics() if text: logger.debug(f"Transcription: [{text}]") diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index 47c398c6a..38f0f9a64 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -15,7 +15,6 @@ StartFrame, TTSStartedFrame, TTSStoppedFrame) -from pipecat.metrics.metrics import TTSUsageMetricsData from pipecat.services.ai_services import TTSService from loguru import logger @@ -104,7 +103,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") return - await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, value=len(text))) + await self.start_tts_usage_metrics(text) await self.push_frame(TTSStartedFrame()) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 5a5baef31..e28fe6083 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -736,20 +736,19 @@ async def send_metrics(self, frame: MetricsFrame): if isinstance(d, TTFBMetricsData): if "ttfb" not in metrics: metrics["ttfb"] = [] - metrics["ttfb"].append(d.model_dump_json()) + metrics["ttfb"].append(d.model_dump()) elif isinstance(d, ProcessingMetricsData): if "processing" not in metrics: metrics["processing"] = [] - metrics["processing"].append(d.model_dump_json()) + metrics["processing"].append(d.model_dump()) elif isinstance(d, LLMUsageMetricsData): if "tokens" not in metrics: metrics["tokens"] = [] - metrics["tokens"].append(d.model_dump_json()) + metrics["tokens"].append(d.value.model_dump(exclude_none=True)) elif isinstance(d, TTSUsageMetricsData): if "characters" not in metrics: metrics["characters"] = [] - metrics["characters"].append(d.model_dump_json()) - print(f"Sending metrics: {metrics}") + metrics["characters"].append(d.model_dump()) message = DailyTransportMessageFrame(message={ "type": "pipecat-metrics",