From 49ce3dcb27c4a6db1703ec8ece083cba49f37443 Mon Sep 17 00:00:00 2001 From: vipyne Date: Tue, 3 Dec 2024 23:03:58 -0600 Subject: [PATCH 01/13] add nvidia riva - fastpitch --- examples/foundational/01c-fastpitch.py | 56 ++++ .../foundational/07r-interruptible-riva.py | 94 +++++++ src/pipecat/services/riva.py | 244 ++++++++++++++++++ 3 files changed, 394 insertions(+) create mode 100644 examples/foundational/01c-fastpitch.py create mode 100644 examples/foundational/07r-interruptible-riva.py create mode 100644 src/pipecat/services/riva.py diff --git a/examples/foundational/01c-fastpitch.py b/examples/foundational/01c-fastpitch.py new file mode 100644 index 000000000..49499cdfc --- /dev/null +++ b/examples/foundational/01c-fastpitch.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import EndFrame, TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.runner import PipelineRunner +from pipecat.services.riva import FastpitchTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True) + ) + + tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) + + runner = PipelineRunner() + + task = PipelineTask(Pipeline([tts, transport.output()])) + + # Register an event handler so we can play the audio when the + # participant joins. + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + participant_name = participant.get("info", {}).get("userName", "") + await task.queue_frames([TTSSpeakFrame(f"Aloha, {participant_name}!"), EndFrame()]) + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/07r-interruptible-riva.py b/examples/foundational/07r-interruptible-riva.py new file mode 100644 index 000000000..e65257dc4 --- /dev/null +++ b/examples/foundational/07r-interruptible-riva.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext + +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.riva import FastpitchTTSService, ParakeetSTTService + +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, + None, + "Respond bot", + DailyParams( + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + # stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY")) + + tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py new file mode 100644 index 000000000..d5365df47 --- /dev/null +++ b/src/pipecat/services/riva.py @@ -0,0 +1,244 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +from typing import AsyncGenerator, List, Optional, Union, Iterator + +from loguru import logger +from pydantic.main import BaseModel + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InterimTranscriptionFrame, + StartFrame, + TranscriptionFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.ai_services import STTService, TTSService +from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 + +try: + import riva.client + +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use nvidia riva TTS or STT, you need to `pip install pipecat-ai[riva]`. Also, set `NVIDIA_API_KEY` environment variable." + ) + raise Exception(f"Missing module: {e}") + + +class FastpitchTTSService(TTSService): + class InputParams(BaseModel): + language: Optional[str] = "en-US" + + def __init__( + self, + *, + api_key: str, + server: str = "grpc.nvcf.nvidia.com:443", + voice_id: str = "English-US.Female-1", + sample_rate_hz: int = 24000, + # nvidia riva calls this 'function-id' + model: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972", + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate_hz, **kwargs) + self._api_key = api_key + + self.set_model_name("fastpitch-hifigan-tts") + self.set_voice(voice_id) + + self.voice_id = voice_id + self.sample_rate_hz = sample_rate_hz + self.language_code = params.language + self.nchannels = 1 + self.sampwidth = 2 + self.quality = None + + metadata = [ + ["function-id", model], + ["authorization", f"Bearer {api_key}"], + ] + auth = riva.client.Auth(None, True, server, metadata) + + self.service = riva.client.SpeechSynthesisService(auth) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + await self.start_ttfb_metrics() + yield TTSStartedFrame() + + try: + custom_dictionary_input = {} + responses = self.service.synthesize_online( + text, + self.voice_id, + self.language_code, + sample_rate_hz=self.sample_rate_hz, + audio_prompt_file=None, + quality=20 if self.quality is None else self.quality, + custom_dictionary=custom_dictionary_input, + ) + + for resp in responses: + await self.stop_ttfb_metrics() + + frame = TTSAudioRawFrame( + audio=resp.audio, + sample_rate=self.sample_rate_hz, + num_channels=self.nchannels, + ) + yield frame + + except Exception as e: + logger.error(f"{self} exception: {e}") + + await self.start_tts_usage_metrics(text) + yield TTSStoppedFrame() + + +class ParakeetSTTService(STTService): + def __init__( + self, + *, + api_key: str, + server: str = "grpc.nvcf.nvidia.com:443", + # nvidia calls this 'function-id' + model: str = "1598d209-5e27-4d3c-8079-4751568b1081", + **kwargs, + ): + super().__init__(**kwargs) + self._api_key = api_key + + self.set_model_name("parakeet-ctc-1.1b-asr") + + input_device = 0 + list_devices = False + profanity_filter = False + automatic_punctuation = False + no_verbatim_transcripts = False + language_code = "en-US" + model_name = "" + boosted_lm_words = None + boosted_lm_score = 4.0 + speaker_diarization = False + diarization_max_speakers = 3 + start_history = -1 + start_threshold = -1.0 + stop_history = -1 + stop_threshold = -1.0 + stop_history_eou = -1 + stop_threshold_eou = -1.0 + custom_configuration = "" + ssl_cert = None + use_ssl = True + sample_rate_hz: int = 16000 + file_streaming_chunk = 1600 + + metadata = [ + ["function-id", model], + ["authorization", f"Bearer {api_key}"], + ] + auth = riva.client.Auth(None, True, server, metadata) + + self.asr_service = riva.client.ASRService(auth) + + config = riva.client.StreamingRecognitionConfig( + config=riva.client.RecognitionConfig( + encoding=riva.client.AudioEncoding.LINEAR_PCM, + language_code=language_code, + model="", + max_alternatives=1, + profanity_filter=profanity_filter, + enable_automatic_punctuation=automatic_punctuation, + verbatim_transcripts=not no_verbatim_transcripts, + sample_rate_hertz=sample_rate_hz, + audio_channel_count=1, + ), + interim_results=True, + ) + self.config = config + riva.client.add_word_boosting_to_config(config, boosted_lm_words, boosted_lm_score) + riva.client.add_endpoint_parameters_to_config( + config, + start_history, + start_threshold, + stop_history, + stop_history_eou, + stop_threshold, + stop_threshold_eou, + ) + riva.client.add_custom_configuration_to_config(config, custom_configuration) + + # this doesn't work, but something like this perhaps? part 1 + self.audio = [] + self.responses = self.asr_service.streaming_response_generator( + audio_chunks=[self.audio], + streaming_config=self.config, + ) + + def can_generate_metrics(self) -> bool: + return False + + async def start(self, frame: StartFrame): + await super().start(frame) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + # this doesn't work, but something like this perhaps? part 2 + self.audio.append(audio) + + # need to start to run this generator only once somewhere... + # 'start' function doesn't work... + # something about the event loop... + # maybe an audio buffer... though my attempt at that didn't work either + for response in self.responses: + if not response.results: + continue + partial_transcript = "" + for result in response.results: + if result: + if not result.alternatives: + continue + transcript = result.alternatives[0].transcript + if transcript: + language = None + if len(transcript) > 0: + await self.stop_ttfb_metrics() + if result.is_final: + await self.stop_processing_metrics() + yield TranscriptionFrame( + transcript, "", time_now_iso8601(), language + ) + else: + yield InterimTranscriptionFrame( + transcript, "", time_now_iso8601(), language + ) + yield None + + async def _on_speech_started(self, *args, **kwargs): + await self.start_ttfb_metrics() + await self.start_processing_metrics() From ce94421c90b1628af3d690366f8b3c80b7aeb3d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 9 Dec 2024 12:55:22 -0800 Subject: [PATCH 02/13] pyproject: add riva option and update protobuf and playht --- dev-requirements.txt | 2 +- pyproject.toml | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index c706d8fe6..92b6ec4d3 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ build~=1.2.1 -grpcio-tools~=1.62.2 +grpcio-tools~=1.65.4 pip-tools~=7.4.1 pyright~=1.1.376 pytest~=8.3.2 diff --git a/pyproject.toml b/pyproject.toml index b2bd0c48f..f88770ee0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ dependencies = [ "Markdown~=3.7", "numpy~=1.26.4", "Pillow~=10.4.0", - "protobuf~=4.25.4", + "protobuf~=5.26.1", "pydantic~=2.8.2", "pyloudnorm~=0.1.1", "resampy~=0.4.3", @@ -63,7 +63,8 @@ nim = [ "openai~=1.50.2" ] noisereduce = [ "noisereduce~=3.0.3" ] openai = [ "openai~=1.50.2", "websockets~=13.1", "python-deepcompare~=1.0.1" ] openpipe = [ "openpipe~=4.38.0" ] -playht = [ "pyht~=0.1.4", "websockets~=13.1" ] +playht = [ "pyht~=0.1.8", "websockets~=13.1" ] +riva = [ "nvidia-riva-client~=2.17.0" ] silero = [ "onnxruntime~=1.19.2" ] soundfile = [ "soundfile~=0.12.1" ] together = [ "openai~=1.50.2" ] From 7e407e5548ab03442c2e153f0e11beaa9dd2281b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 9 Dec 2024 12:56:26 -0800 Subject: [PATCH 03/13] services(riva): first working version of ParakeetSTTService --- src/pipecat/services/riva.py | 120 +++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index d5365df47..d6c5f85f5 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -5,7 +5,7 @@ # import asyncio -from typing import AsyncGenerator, List, Optional, Union, Iterator +from typing import AsyncGenerator, Optional from loguru import logger from pydantic.main import BaseModel @@ -13,7 +13,6 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, - ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -23,7 +22,6 @@ TTSStoppedFrame, ) from pipecat.services.ai_services import STTService, TTSService -from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 try: @@ -74,12 +72,6 @@ def __init__( self.service = riva.client.SpeechSynthesisService(auth) - async def stop(self, frame: EndFrame): - await super().stop(frame) - - async def cancel(self, frame: CancelFrame): - await super().cancel(frame) - async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") @@ -130,17 +122,12 @@ def __init__( self.set_model_name("parakeet-ctc-1.1b-asr") - input_device = 0 - list_devices = False profanity_filter = False automatic_punctuation = False no_verbatim_transcripts = False language_code = "en-US" - model_name = "" boosted_lm_words = None boosted_lm_score = 4.0 - speaker_diarization = False - diarization_max_speakers = 3 start_history = -1 start_threshold = -1.0 stop_history = -1 @@ -148,10 +135,7 @@ def __init__( stop_history_eou = -1 stop_threshold_eou = -1.0 custom_configuration = "" - ssl_cert = None - use_ssl = True sample_rate_hz: int = 16000 - file_streaming_chunk = 1600 metadata = [ ["function-id", model], @@ -189,56 +173,86 @@ def __init__( riva.client.add_custom_configuration_to_config(config, custom_configuration) # this doesn't work, but something like this perhaps? part 1 - self.audio = [] - self.responses = self.asr_service.streaming_response_generator( - audio_chunks=[self.audio], - streaming_config=self.config, - ) + self._queue = asyncio.Queue() def can_generate_metrics(self) -> bool: return False async def start(self, frame: StartFrame): await super().start(frame) + self._thread_task = self.get_event_loop().create_task(self._thread_task_handler()) + self._response_task = self.get_event_loop().create_task(self._response_task_handler()) + self._response_queue = asyncio.Queue() async def stop(self, frame: EndFrame): await super().stop(frame) + await self._stop_tasks() async def cancel(self, frame: CancelFrame): await super().cancel(frame) + await self._stop_tasks() - async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - # this doesn't work, but something like this perhaps? part 2 - self.audio.append(audio) - - # need to start to run this generator only once somewhere... - # 'start' function doesn't work... - # something about the event loop... - # maybe an audio buffer... though my attempt at that didn't work either - for response in self.responses: + async def _stop_tasks(self): + self._thread_task.cancel() + await self._thread_task + self._response_task.cancel() + await self._response_task + + def _response_handler(self): + responses = self.asr_service.streaming_response_generator( + audio_chunks=self, + streaming_config=self.config, + ) + for response in responses: if not response.results: continue - partial_transcript = "" - for result in response.results: - if result: - if not result.alternatives: - continue - transcript = result.alternatives[0].transcript - if transcript: - language = None - if len(transcript) > 0: - await self.stop_ttfb_metrics() - if result.is_final: - await self.stop_processing_metrics() - yield TranscriptionFrame( - transcript, "", time_now_iso8601(), language - ) - else: - yield InterimTranscriptionFrame( - transcript, "", time_now_iso8601(), language - ) + asyncio.run_coroutine_threadsafe( + self._response_queue.put(response), self.get_event_loop() + ) + + async def _thread_task_handler(self): + try: + self._thread_running = True + await asyncio.to_thread(self._response_handler) + except asyncio.CancelledError: + self._thread_running = False + pass + + async def _handle_response(self, response): + for result in response.results: + if result and not result.alternatives: + continue + + transcript = result.alternatives[0].transcript + if transcript and len(transcript) > 0: + await self.stop_ttfb_metrics() + if result.is_final: + await self.stop_processing_metrics() + await self.push_frame( + TranscriptionFrame(transcript, "", time_now_iso8601(), None) + ) + else: + await self.push_frame( + InterimTranscriptionFrame(transcript, "", time_now_iso8601(), None) + ) + + async def _response_task_handler(self): + while True: + try: + response = await self._response_queue.get() + await self._handle_response(response) + except asyncio.CancelledError: + break + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self._queue.put(audio) yield None - async def _on_speech_started(self, *args, **kwargs): - await self.start_ttfb_metrics() - await self.start_processing_metrics() + def __next__(self) -> bytes: + if not self._thread_running: + raise StopIteration + future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) + return future.result() + + def __iter__(self): + return self From 4b55c73fbefb509c451aaf0f1b94d6570c611dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 9 Dec 2024 13:03:19 -0800 Subject: [PATCH 04/13] services(riva): make FastpitchTTSService asyncio --- src/pipecat/services/riva.py | 50 +++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index d6c5f85f5..38920927e 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -60,8 +60,6 @@ def __init__( self.voice_id = voice_id self.sample_rate_hz = sample_rate_hz self.language_code = params.language - self.nchannels = 1 - self.sampwidth = 2 self.quality = None metadata = [ @@ -73,35 +71,39 @@ def __init__( self.service = riva.client.SpeechSynthesisService(auth) async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + def read_audio_responses(): + try: + custom_dictionary_input = {} + responses = self.service.synthesize_online( + text, + self.voice_id, + self.language_code, + sample_rate_hz=self.sample_rate_hz, + audio_prompt_file=None, + quality=20 if self.quality is None else self.quality, + custom_dictionary=custom_dictionary_input, + ) + return responses + except Exception as e: + logger.error(f"{self} exception: {e}") + return [] + logger.debug(f"Generating TTS: [{text}]") await self.start_ttfb_metrics() yield TTSStartedFrame() - try: - custom_dictionary_input = {} - responses = self.service.synthesize_online( - text, - self.voice_id, - self.language_code, - sample_rate_hz=self.sample_rate_hz, - audio_prompt_file=None, - quality=20 if self.quality is None else self.quality, - custom_dictionary=custom_dictionary_input, - ) + responses = await asyncio.to_thread(read_audio_responses) - for resp in responses: - await self.stop_ttfb_metrics() - - frame = TTSAudioRawFrame( - audio=resp.audio, - sample_rate=self.sample_rate_hz, - num_channels=self.nchannels, - ) - yield frame + for resp in responses: + await self.stop_ttfb_metrics() - except Exception as e: - logger.error(f"{self} exception: {e}") + frame = TTSAudioRawFrame( + audio=resp.audio, + sample_rate=self.sample_rate_hz, + num_channels=1, + ) + yield frame await self.start_tts_usage_metrics(text) yield TTSStoppedFrame() From 8a9fdaf441351008608b6dd8e7d1afb990ef07f7 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 9 Dec 2024 15:40:29 -0600 Subject: [PATCH 05/13] services(riva): cleanup --- ...-riva.py => 07r-interruptible-riva-nim.py} | 10 +-- src/pipecat/services/riva.py | 90 ++++++++++--------- 2 files changed, 51 insertions(+), 49 deletions(-) rename examples/foundational/{07r-interruptible-riva.py => 07r-interruptible-riva-nim.py} (91%) diff --git a/examples/foundational/07r-interruptible-riva.py b/examples/foundational/07r-interruptible-riva-nim.py similarity index 91% rename from examples/foundational/07r-interruptible-riva.py rename to examples/foundational/07r-interruptible-riva-nim.py index e65257dc4..d6e6481fa 100644 --- a/examples/foundational/07r-interruptible-riva.py +++ b/examples/foundational/07r-interruptible-riva-nim.py @@ -19,11 +19,8 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext - -from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.nim import NimLLMService from pipecat.services.riva import FastpitchTTSService, ParakeetSTTService - -from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport load_dotenv(override=True) @@ -48,12 +45,13 @@ async def main(): ), ) - # stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY")) tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + llm = NimLLMService( + api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.1-405b-instruct" + ) messages = [ { diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index 38920927e..3a4ae9fe8 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -22,6 +22,7 @@ TTSStoppedFrame, ) from pipecat.services.ai_services import STTService, TTSService +from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 try: @@ -37,7 +38,7 @@ class FastpitchTTSService(TTSService): class InputParams(BaseModel): - language: Optional[str] = "en-US" + language: Optional[Language] = Language.EN_US def __init__( self, @@ -49,19 +50,19 @@ def __init__( # nvidia riva calls this 'function-id' model: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972", params: InputParams = InputParams(), + quality: int = 20, **kwargs, ): super().__init__(sample_rate=sample_rate_hz, **kwargs) self._api_key = api_key + self._voice_id = voice_id + self._sample_rate_hz = sample_rate_hz + self._language_code = params.language + self.quality = quality self.set_model_name("fastpitch-hifigan-tts") self.set_voice(voice_id) - self.voice_id = voice_id - self.sample_rate_hz = sample_rate_hz - self.language_code = params.language - self.quality = None - metadata = [ ["function-id", model], ["authorization", f"Bearer {api_key}"], @@ -73,15 +74,14 @@ def __init__( async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: def read_audio_responses(): try: - custom_dictionary_input = {} responses = self.service.synthesize_online( text, - self.voice_id, - self.language_code, - sample_rate_hz=self.sample_rate_hz, + self._voice_id, + self._language_code, + sample_rate_hz=self._sample_rate_hz, audio_prompt_file=None, - quality=20 if self.quality is None else self.quality, - custom_dictionary=custom_dictionary_input, + quality=self.quality, + custom_dictionary={}, ) return responses except Exception as e: @@ -100,7 +100,7 @@ def read_audio_responses(): frame = TTSAudioRawFrame( audio=resp.audio, - sample_rate=self.sample_rate_hz, + sample_rate=self._sample_rate_hz, num_channels=1, ) yield frame @@ -110,6 +110,9 @@ def read_audio_responses(): class ParakeetSTTService(STTService): + class InputParams(BaseModel): + language: Optional[Language] = Language.EN_US + def __init__( self, *, @@ -117,28 +120,28 @@ def __init__( server: str = "grpc.nvcf.nvidia.com:443", # nvidia calls this 'function-id' model: str = "1598d209-5e27-4d3c-8079-4751568b1081", + params: InputParams = InputParams(), **kwargs, ): super().__init__(**kwargs) self._api_key = api_key + self._profanity_filter = False + self._automatic_punctuation = False + self._no_verbatim_transcripts = False + self._language_code = params.language + self._boosted_lm_words = None + self._boosted_lm_score = 4.0 + self._start_history = -1 + self._start_threshold = -1.0 + self._stop_history = -1 + self._stop_threshold = -1.0 + self._stop_history_eou = -1 + self._stop_threshold_eou = -1.0 + self._custom_configuration = "" + self._sample_rate_hz: int = 16000 self.set_model_name("parakeet-ctc-1.1b-asr") - profanity_filter = False - automatic_punctuation = False - no_verbatim_transcripts = False - language_code = "en-US" - boosted_lm_words = None - boosted_lm_score = 4.0 - start_history = -1 - start_threshold = -1.0 - stop_history = -1 - stop_threshold = -1.0 - stop_history_eou = -1 - stop_threshold_eou = -1.0 - custom_configuration = "" - sample_rate_hz: int = 16000 - metadata = [ ["function-id", model], ["authorization", f"Bearer {api_key}"], @@ -150,31 +153,32 @@ def __init__( config = riva.client.StreamingRecognitionConfig( config=riva.client.RecognitionConfig( encoding=riva.client.AudioEncoding.LINEAR_PCM, - language_code=language_code, + language_code=self._language_code, model="", max_alternatives=1, - profanity_filter=profanity_filter, - enable_automatic_punctuation=automatic_punctuation, - verbatim_transcripts=not no_verbatim_transcripts, - sample_rate_hertz=sample_rate_hz, + profanity_filter=self._profanity_filter, + enable_automatic_punctuation=self._automatic_punctuation, + verbatim_transcripts=not self._no_verbatim_transcripts, + sample_rate_hertz=self._sample_rate_hz, audio_channel_count=1, ), interim_results=True, ) - self.config = config - riva.client.add_word_boosting_to_config(config, boosted_lm_words, boosted_lm_score) + riva.client.add_word_boosting_to_config( + config, self._boosted_lm_words, self._boosted_lm_score + ) riva.client.add_endpoint_parameters_to_config( config, - start_history, - start_threshold, - stop_history, - stop_history_eou, - stop_threshold, - stop_threshold_eou, + self._start_history, + self._start_threshold, + self._stop_history, + self._stop_history_eou, + self._stop_threshold, + self._stop_threshold_eou, ) - riva.client.add_custom_configuration_to_config(config, custom_configuration) + riva.client.add_custom_configuration_to_config(config, self._custom_configuration) + self.config = config - # this doesn't work, but something like this perhaps? part 1 self._queue = asyncio.Queue() def can_generate_metrics(self) -> bool: From d74e7283328debb61636bd8c861d1a9be402cd00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 9 Dec 2024 14:00:30 -0800 Subject: [PATCH 06/13] pyproject: update google-cloud-texttospeech to 2.21.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f88770ee0..549ecece8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ elevenlabs = [ "websockets~=13.1" ] examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.1" ] gladia = [ "websockets~=13.1" ] -google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.17.2" ] +google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.21.1" ] grok = [ "openai~=1.50.2" ] groq = [ "openai~=1.50.2" ] gstreamer = [ "pygobject~=3.48.2" ] From 5a467a30a3c865a3418666a22d2d0a82a0b9dbda Mon Sep 17 00:00:00 2001 From: vipyne Date: Tue, 3 Dec 2024 23:03:58 -0600 Subject: [PATCH 07/13] add nvidia riva - fastpitch --- .../foundational/07r-interruptible-riva.py | 94 +++++++++++++++++++ src/pipecat/services/riva.py | 12 +++ 2 files changed, 106 insertions(+) create mode 100644 examples/foundational/07r-interruptible-riva.py diff --git a/examples/foundational/07r-interruptible-riva.py b/examples/foundational/07r-interruptible-riva.py new file mode 100644 index 000000000..e65257dc4 --- /dev/null +++ b/examples/foundational/07r-interruptible-riva.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext + +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.riva import FastpitchTTSService, ParakeetSTTService + +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, + None, + "Respond bot", + DailyParams( + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + # stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY")) + + tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index 3a4ae9fe8..5d62666b5 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -5,7 +5,11 @@ # import asyncio +<<<<<<< HEAD from typing import AsyncGenerator, Optional +======= +from typing import AsyncGenerator, List, Optional, Union, Iterator +>>>>>>> 07e3942e (add nvidia riva - fastpitch) from loguru import logger from pydantic.main import BaseModel @@ -13,6 +17,10 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, +<<<<<<< HEAD +======= + ErrorFrame, +>>>>>>> 07e3942e (add nvidia riva - fastpitch) Frame, InterimTranscriptionFrame, StartFrame, @@ -38,7 +46,11 @@ class FastpitchTTSService(TTSService): class InputParams(BaseModel): +<<<<<<< HEAD language: Optional[Language] = Language.EN_US +======= + language: Optional[str] = "en-US" +>>>>>>> 07e3942e (add nvidia riva - fastpitch) def __init__( self, From 9222d9f721baa039c1efbacf04584ed2e45aba06 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 9 Dec 2024 15:40:29 -0600 Subject: [PATCH 08/13] services(riva): cleanup --- .../foundational/07r-interruptible-riva.py | 94 ------------------- src/pipecat/services/riva.py | 34 +++---- 2 files changed, 11 insertions(+), 117 deletions(-) delete mode 100644 examples/foundational/07r-interruptible-riva.py diff --git a/examples/foundational/07r-interruptible-riva.py b/examples/foundational/07r-interruptible-riva.py deleted file mode 100644 index e65257dc4..000000000 --- a/examples/foundational/07r-interruptible-riva.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import os -import sys - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from runner import configure - -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMMessagesFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext - -from pipecat.services.deepgram import DeepgramSTTService -from pipecat.services.riva import FastpitchTTSService, ParakeetSTTService - -from pipecat.services.openai import OpenAILLMService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - - -async def main(): - async with aiohttp.ClientSession() as session: - (room_url, _) = await configure(session) - - transport = DailyTransport( - room_url, - None, - "Respond bot", - DailyParams( - audio_out_enabled=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - ), - ) - - # stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY")) - - tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) - - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - }, - ] - - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) - - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses - ] - ) - - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - # Kick off the conversation. - messages.append({"role": "system", "content": "Please introduce yourself to the user."}) - await task.queue_frames([LLMMessagesFrame(messages)]) - - runner = PipelineRunner() - - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index 5d62666b5..23c6b83a4 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -5,11 +5,7 @@ # import asyncio -<<<<<<< HEAD from typing import AsyncGenerator, Optional -======= -from typing import AsyncGenerator, List, Optional, Union, Iterator ->>>>>>> 07e3942e (add nvidia riva - fastpitch) from loguru import logger from pydantic.main import BaseModel @@ -17,10 +13,6 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, -<<<<<<< HEAD -======= - ErrorFrame, ->>>>>>> 07e3942e (add nvidia riva - fastpitch) Frame, InterimTranscriptionFrame, StartFrame, @@ -46,11 +38,8 @@ class FastpitchTTSService(TTSService): class InputParams(BaseModel): -<<<<<<< HEAD language: Optional[Language] = Language.EN_US -======= - language: Optional[str] = "en-US" ->>>>>>> 07e3942e (add nvidia riva - fastpitch) + quality: Optional[int] = 20 def __init__( self, @@ -58,19 +47,18 @@ def __init__( api_key: str, server: str = "grpc.nvcf.nvidia.com:443", voice_id: str = "English-US.Female-1", - sample_rate_hz: int = 24000, + sample_rate: int = 24000, # nvidia riva calls this 'function-id' model: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972", params: InputParams = InputParams(), - quality: int = 20, **kwargs, ): - super().__init__(sample_rate=sample_rate_hz, **kwargs) + super().__init__(sample_rate=sample_rate, **kwargs) self._api_key = api_key self._voice_id = voice_id - self._sample_rate_hz = sample_rate_hz + self._sample_rate = sample_rate self._language_code = params.language - self.quality = quality + self.quality = params.quality self.set_model_name("fastpitch-hifigan-tts") self.set_voice(voice_id) @@ -81,16 +69,16 @@ def __init__( ] auth = riva.client.Auth(None, True, server, metadata) - self.service = riva.client.SpeechSynthesisService(auth) + self._service = riva.client.SpeechSynthesisService(auth) async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: def read_audio_responses(): try: - responses = self.service.synthesize_online( + responses = self._service.synthesize_online( text, self._voice_id, self._language_code, - sample_rate_hz=self._sample_rate_hz, + sample_rate_hz=self._sample_rate, audio_prompt_file=None, quality=self.quality, custom_dictionary={}, @@ -112,7 +100,7 @@ def read_audio_responses(): frame = TTSAudioRawFrame( audio=resp.audio, - sample_rate=self._sample_rate_hz, + sample_rate=self._sample_rate, num_channels=1, ) yield frame @@ -150,7 +138,7 @@ def __init__( self._stop_history_eou = -1 self._stop_threshold_eou = -1.0 self._custom_configuration = "" - self._sample_rate_hz: int = 16000 + self._sample_rate: int = 16000 self.set_model_name("parakeet-ctc-1.1b-asr") @@ -171,7 +159,7 @@ def __init__( profanity_filter=self._profanity_filter, enable_automatic_punctuation=self._automatic_punctuation, verbatim_transcripts=not self._no_verbatim_transcripts, - sample_rate_hertz=self._sample_rate_hz, + sample_rate_hertz=self._sample_rate, audio_channel_count=1, ), interim_results=True, From 8caad15e9ba105473e1b98d0732d02701f7f6146 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 9 Dec 2024 20:14:10 -0600 Subject: [PATCH 09/13] examples trivial update --- examples/foundational/07r-interruptible-riva-nim.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/foundational/07r-interruptible-riva-nim.py b/examples/foundational/07r-interruptible-riva-nim.py index d6e6481fa..09d261bc3 100644 --- a/examples/foundational/07r-interruptible-riva-nim.py +++ b/examples/foundational/07r-interruptible-riva-nim.py @@ -47,12 +47,12 @@ async def main(): stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY")) - tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) - llm = NimLLMService( api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.1-405b-instruct" ) + tts = FastpitchTTSService(api_key=os.getenv("NVIDIA_API_KEY")) + messages = [ { "role": "system", From 5a98ae63807281ac2f9c4ceef43f88ea27dc84f1 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 9 Dec 2024 20:24:33 -0600 Subject: [PATCH 10/13] chore: update test-requirements --- test-requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index 62be00385..f974121b0 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -7,8 +7,8 @@ deepgram-sdk~=3.5.0 fal-client~=0.4.1 fastapi~=0.115.0 faster-whisper~=1.0.3 -google-cloud-texttospeech~=2.17.2 -google-generativeai~=0.7.2 +google-cloud-texttospeech~=2.21.1 +google-generativeai~=0.8.3 langchain~=0.2.14 livekit~=0.13.1 lmnt~=1.1.4 From 3f9d39329cb7945c0be49674e79a99a60ffb82d4 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 9 Dec 2024 20:57:16 -0600 Subject: [PATCH 11/13] services(riva): model -> function_id --- src/pipecat/services/riva.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index 23c6b83a4..6652b5298 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -48,8 +48,7 @@ def __init__( server: str = "grpc.nvcf.nvidia.com:443", voice_id: str = "English-US.Female-1", sample_rate: int = 24000, - # nvidia riva calls this 'function-id' - model: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972", + function_id: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972", params: InputParams = InputParams(), **kwargs, ): @@ -64,7 +63,7 @@ def __init__( self.set_voice(voice_id) metadata = [ - ["function-id", model], + ["function-id", function_id], ["authorization", f"Bearer {api_key}"], ] auth = riva.client.Auth(None, True, server, metadata) @@ -118,8 +117,7 @@ def __init__( *, api_key: str, server: str = "grpc.nvcf.nvidia.com:443", - # nvidia calls this 'function-id' - model: str = "1598d209-5e27-4d3c-8079-4751568b1081", + function_id: str = "1598d209-5e27-4d3c-8079-4751568b1081", params: InputParams = InputParams(), **kwargs, ): @@ -143,7 +141,7 @@ def __init__( self.set_model_name("parakeet-ctc-1.1b-asr") metadata = [ - ["function-id", model], + ["function-id", function_id], ["authorization", f"Bearer {api_key}"], ] auth = riva.client.Auth(None, True, server, metadata) From 9211a37efcfd138e3c314242b8d115fab8825195 Mon Sep 17 00:00:00 2001 From: vipyne Date: Mon, 9 Dec 2024 21:01:43 -0600 Subject: [PATCH 12/13] services(riva): convention tweaks --- src/pipecat/services/riva.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index 6652b5298..2b4c5ccc1 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -57,7 +57,7 @@ def __init__( self._voice_id = voice_id self._sample_rate = sample_rate self._language_code = params.language - self.quality = params.quality + self._quality = params.quality self.set_model_name("fastpitch-hifigan-tts") self.set_voice(voice_id) @@ -79,7 +79,7 @@ def read_audio_responses(): self._language_code, sample_rate_hz=self._sample_rate, audio_prompt_file=None, - quality=self.quality, + quality=self._quality, custom_dictionary={}, ) return responses @@ -146,7 +146,7 @@ def __init__( ] auth = riva.client.Auth(None, True, server, metadata) - self.asr_service = riva.client.ASRService(auth) + self._asr_service = riva.client.ASRService(auth) config = riva.client.StreamingRecognitionConfig( config=riva.client.RecognitionConfig( @@ -175,7 +175,7 @@ def __init__( self._stop_threshold_eou, ) riva.client.add_custom_configuration_to_config(config, self._custom_configuration) - self.config = config + self._config = config self._queue = asyncio.Queue() @@ -203,9 +203,9 @@ async def _stop_tasks(self): await self._response_task def _response_handler(self): - responses = self.asr_service.streaming_response_generator( + responses = self._asr_service.streaming_response_generator( audio_chunks=self, - streaming_config=self.config, + streaming_config=self._config, ) for response in responses: if not response.results: From a49b4e450b115521f414de1ae7a4ba78e152c6b0 Mon Sep 17 00:00:00 2001 From: vipyne Date: Tue, 10 Dec 2024 15:15:13 -0600 Subject: [PATCH 13/13] services(riva): check service config before running tts --- src/pipecat/services/riva.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/riva.py b/src/pipecat/services/riva.py index 2b4c5ccc1..0bbaee0ef 100644 --- a/src/pipecat/services/riva.py +++ b/src/pipecat/services/riva.py @@ -70,6 +70,11 @@ def __init__( self._service = riva.client.SpeechSynthesisService(auth) + # warm up the service + config_response = self._service.stub.GetRivaSynthesisConfig( + riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest() + ) + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: def read_audio_responses(): try: @@ -87,11 +92,10 @@ def read_audio_responses(): logger.error(f"{self} exception: {e}") return [] - logger.debug(f"Generating TTS: [{text}]") - await self.start_ttfb_metrics() yield TTSStartedFrame() + logger.debug(f"Generating TTS: [{text}]") responses = await asyncio.to_thread(read_audio_responses) for resp in responses: