diff --git a/examples/canonical-metrics/.gitignore b/examples/canonical-metrics/.gitignore new file mode 100644 index 000000000..50d9d205e --- /dev/null +++ b/examples/canonical-metrics/.gitignore @@ -0,0 +1,161 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +recordings/ +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +runpod.toml diff --git a/examples/canonical-metrics/Dockerfile b/examples/canonical-metrics/Dockerfile new file mode 100644 index 000000000..704080eec --- /dev/null +++ b/examples/canonical-metrics/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-bullseye + +RUN mkdir /app +RUN mkdir /app/assets +RUN mkdir /app/utils +COPY *.py /app/ +COPY requirements.txt /app/ +copy assets/* /app/assets/ +copy utils/* /app/utils/ + +WORKDIR /app +RUN pip3 install -r requirements.txt + +EXPOSE 7860 + +CMD ["python3", "server.py"] \ No newline at end of file diff --git a/examples/canonical-metrics/README.md b/examples/canonical-metrics/README.md new file mode 100644 index 000000000..13c0b31e0 --- /dev/null +++ b/examples/canonical-metrics/README.md @@ -0,0 +1,37 @@ +# Simple Chatbot + + + +This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion. + +See a video of it in action: https://x.com/kwindla/status/1778628911817183509 + +And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416 + +ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded. + +## Get started + +```python +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp env.example .env # and add your credentials + +``` + +## Run the server + +```bash +python server.py +``` + +Then, visit `http://localhost:7860/start` in your browser to start a chatbot session. + +## Build and test the Docker image + +``` +docker build -t chatbot . +docker run --env-file .env -p 7860:7860 chatbot +``` diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py new file mode 100644 index 000000000..efad7710c --- /dev/null +++ b/examples/canonical-metrics/bot.py @@ -0,0 +1,149 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +import uuid + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor +from pipecat.services.canonical import CanonicalMetricsService +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + audio_in_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_audio_passthrough=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ), + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + # + # English + # + voice_id="cgSgspJ2msm6clMCkdW9", + aiohttp_session=session, + # + # Spanish + # + # model="eleven_multilingual_v2", + # voice_id="gD1IexrzCvsXPHUuT0s3", + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + # + # English + # + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your responses to 12 words or fewer.", + # + # Spanish + # + # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", + }, + ] + + user_response = LLMUserResponseAggregator() + assistant_response = LLMAssistantResponseAggregator() + + """ + CanonicalMetrics uses AudioBufferProcessor under the hood to buffer the audio. On + call completion, CanonicalMetrics will send the audio buffer to Canonical for + analysis. Visit https://voice.canonical.chat to learn more. + """ + audio_buffer_processor = AudioBufferProcessor() + canonical = CanonicalMetricsService( + audio_buffer_processor=audio_buffer_processor, + aiohttp_session=session, + api_key=os.getenv("CANONICAL_API_KEY"), + api_url=os.getenv("CANONICAL_API_URL"), + call_id=str(uuid.uuid4()), + assistant="pipecat-chatbot", + assistant_speaks_first=True, + ) + pipeline = Pipeline( + [ + transport.input(), # microphone + user_response, + llm, + tts, + transport.output(), + audio_buffer_processor, # captures audio into a buffer + canonical, # uploads audio buffer to Canonical AI for metrics + assistant_response, + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + print(f"Participant left: {participant}") + await task.queue_frame(EndFrame()) + + @transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + if state == "left": + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/canonical-metrics/env.example b/examples/canonical-metrics/env.example new file mode 100644 index 000000000..39f209372 --- /dev/null +++ b/examples/canonical-metrics/env.example @@ -0,0 +1,5 @@ +DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev) +DAILY_API_KEY=7df... +OPENAI_API_KEY=sk-PL... +ELEVENLABS_API_KEY=aeb... +CANONICAL_API_KEY=can... \ No newline at end of file diff --git a/examples/canonical-metrics/requirements.txt b/examples/canonical-metrics/requirements.txt new file mode 100644 index 000000000..7e53edc6b --- /dev/null +++ b/examples/canonical-metrics/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv +fastapi[all] +uvicorn +pipecat-ai[daily,openai,silero,elevenlabs,canonical] + diff --git a/examples/canonical-metrics/runner.py b/examples/canonical-metrics/runner.py new file mode 100644 index 000000000..a0b46ca36 --- /dev/null +++ b/examples/canonical-metrics/runner.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +import aiohttp + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper + + +async def configure(aiohttp_session: aiohttp.ClientSession): + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-u", "--url", type=str, required=False, help="URL of the Daily room to join" + ) + parser.add_argument( + "-k", + "--apikey", + type=str, + required=False, + help="Daily API Key (needed to create an owner token for the room)", + ) + + args, unknown = parser.parse_known_args() + + url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") + key = args.apikey or os.getenv("DAILY_API_KEY") + + if not url: + raise Exception( + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL." + ) + + if not key: + raise Exception( + "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers." + ) + + daily_rest_helper = DailyRESTHelper( + daily_api_key=key, + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, + ) + + # Create a meeting token for the given room with an expiration 1 hour in + # the future. + expiry_time: float = 60 * 60 + + token = await daily_rest_helper.get_token(url, expiry_time) + + return (url, token) + return (url, token) diff --git a/examples/canonical-metrics/server.py b/examples/canonical-metrics/server.py new file mode 100644 index 000000000..62ce899be --- /dev/null +++ b/examples/canonical-metrics/server.py @@ -0,0 +1,139 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os +import subprocess +from contextlib import asynccontextmanager + +import aiohttp +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, RedirectResponse + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams + +MAX_BOTS_PER_ROOM = 1 + +# Bot sub-process dict for status reporting and concurrency control +bot_procs = {} + +daily_helpers = {} + +load_dotenv(override=True) + + +def cleanup(): + # Clean up function, just to be extra safe + for entry in bot_procs.values(): + proc = entry[0] + proc.terminate() + proc.wait() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + aiohttp_session = aiohttp.ClientSession() + daily_helpers["rest"] = DailyRESTHelper( + daily_api_key=os.getenv("DAILY_API_KEY", ""), + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, + ) + yield + await aiohttp_session.close() + cleanup() + + +app = FastAPI(lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/start") +async def start_agent(request: Request): + print(f"!!! Creating room") + room = await daily_helpers["rest"].create_room(DailyRoomParams()) + print(f"!!! Room URL: {room.url}") + # Ensure the room property is present + if not room.url: + raise HTTPException( + status_code=500, + detail="Missing 'room' property in request data. Cannot start agent without a target room!", + ) + + # Check if there is already an existing process running in this room + num_bots_in_room = sum( + 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None + ) + if num_bots_in_room >= MAX_BOTS_PER_ROOM: + raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}") + + # Get the token for the room + token = await daily_helpers["rest"].get_token(room.url) + + if not token: + raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}") + + # Spawn a new agent, and join the user session + # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) + try: + proc = subprocess.Popen( + [f"python3 -m bot -u {room.url} -t {token}"], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__)), + ) + bot_procs[proc.pid] = (proc, room.url) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") + + return RedirectResponse(room.url) + + +@app.get("/status/{pid}") +def get_status(pid: int): + # Look up the subprocess + proc = bot_procs.get(pid) + + # If the subprocess doesn't exist, return an error + if not proc: + raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found") + + # Check the status of the subprocess + if proc[0].poll() is None: + status = "running" + else: + status = "finished" + + return JSONResponse({"bot_id": pid, "status": status}) + + +if __name__ == "__main__": + import uvicorn + + default_host = os.getenv("HOST", "0.0.0.0") + default_port = int(os.getenv("FAST_API_PORT", "7860")) + + parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, default=default_host, help="Host address") + parser.add_argument("--port", type=int, default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", help="Reload code on change") + + config = parser.parse_args() + + uvicorn.run( + "server:app", + host=config.host, + port=config.port, + reload=config.reload, + ) diff --git a/examples/chatbot-audio-recording/.gitignore b/examples/chatbot-audio-recording/.gitignore new file mode 100644 index 000000000..2bc1403d1 --- /dev/null +++ b/examples/chatbot-audio-recording/.gitignore @@ -0,0 +1,161 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +runpod.toml diff --git a/examples/chatbot-audio-recording/Dockerfile b/examples/chatbot-audio-recording/Dockerfile new file mode 100644 index 000000000..704080eec --- /dev/null +++ b/examples/chatbot-audio-recording/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-bullseye + +RUN mkdir /app +RUN mkdir /app/assets +RUN mkdir /app/utils +COPY *.py /app/ +COPY requirements.txt /app/ +copy assets/* /app/assets/ +copy utils/* /app/utils/ + +WORKDIR /app +RUN pip3 install -r requirements.txt + +EXPOSE 7860 + +CMD ["python3", "server.py"] \ No newline at end of file diff --git a/examples/chatbot-audio-recording/README.md b/examples/chatbot-audio-recording/README.md new file mode 100644 index 000000000..13c0b31e0 --- /dev/null +++ b/examples/chatbot-audio-recording/README.md @@ -0,0 +1,37 @@ +# Simple Chatbot + + + +This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion. + +See a video of it in action: https://x.com/kwindla/status/1778628911817183509 + +And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416 + +ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded. + +## Get started + +```python +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp env.example .env # and add your credentials + +``` + +## Run the server + +```bash +python server.py +``` + +Then, visit `http://localhost:7860/start` in your browser to start a chatbot session. + +## Build and test the Docker image + +``` +docker build -t chatbot . +docker run --env-file .env -p 7860:7860 chatbot +``` diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py new file mode 100644 index 000000000..6acdb08e6 --- /dev/null +++ b/examples/chatbot-audio-recording/bot.py @@ -0,0 +1,132 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + audio_in_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_audio_passthrough=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ), + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + # + # English + # + voice_id="cgSgspJ2msm6clMCkdW9", + aiohttp_session=session, + # + # Spanish + # + # model="eleven_multilingual_v2", + # voice_id="gD1IexrzCvsXPHUuT0s3", + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + # + # English + # + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your response to 12 words or fewer.", + # + # Spanish + # + # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", + }, + ] + + user_response = LLMUserResponseAggregator() + assistant_response = LLMAssistantResponseAggregator() + + audiobuffer = AudioBufferProcessor() + pipeline = Pipeline( + [ + transport.input(), # microphone + user_response, + llm, + tts, + transport.output(), + audiobuffer, # used to buffer the audio in the pipeline + assistant_response, + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + print(f"Participant left: {participant}") + await task.queue_frame(EndFrame()) + + @transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + if state == "left": + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/chatbot-audio-recording/env.example b/examples/chatbot-audio-recording/env.example new file mode 100644 index 000000000..d368ae510 --- /dev/null +++ b/examples/chatbot-audio-recording/env.example @@ -0,0 +1,4 @@ +DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev) +DAILY_API_KEY=7df... +OPENAI_API_KEY=sk-PL... +ELEVENLABS_API_KEY=aeb... \ No newline at end of file diff --git a/examples/chatbot-audio-recording/requirements.txt b/examples/chatbot-audio-recording/requirements.txt new file mode 100644 index 000000000..9786b52de --- /dev/null +++ b/examples/chatbot-audio-recording/requirements.txt @@ -0,0 +1,4 @@ +python-dotenv +fastapi[all] +uvicorn +pipecat-ai[daily,openai,silero,elevenlabs] diff --git a/examples/chatbot-audio-recording/runner.py b/examples/chatbot-audio-recording/runner.py new file mode 100644 index 000000000..a0b46ca36 --- /dev/null +++ b/examples/chatbot-audio-recording/runner.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +import aiohttp + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper + + +async def configure(aiohttp_session: aiohttp.ClientSession): + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-u", "--url", type=str, required=False, help="URL of the Daily room to join" + ) + parser.add_argument( + "-k", + "--apikey", + type=str, + required=False, + help="Daily API Key (needed to create an owner token for the room)", + ) + + args, unknown = parser.parse_known_args() + + url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") + key = args.apikey or os.getenv("DAILY_API_KEY") + + if not url: + raise Exception( + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL." + ) + + if not key: + raise Exception( + "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers." + ) + + daily_rest_helper = DailyRESTHelper( + daily_api_key=key, + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, + ) + + # Create a meeting token for the given room with an expiration 1 hour in + # the future. + expiry_time: float = 60 * 60 + + token = await daily_rest_helper.get_token(url, expiry_time) + + return (url, token) + return (url, token) diff --git a/examples/chatbot-audio-recording/server.py b/examples/chatbot-audio-recording/server.py new file mode 100644 index 000000000..62ce899be --- /dev/null +++ b/examples/chatbot-audio-recording/server.py @@ -0,0 +1,139 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os +import subprocess +from contextlib import asynccontextmanager + +import aiohttp +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, RedirectResponse + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams + +MAX_BOTS_PER_ROOM = 1 + +# Bot sub-process dict for status reporting and concurrency control +bot_procs = {} + +daily_helpers = {} + +load_dotenv(override=True) + + +def cleanup(): + # Clean up function, just to be extra safe + for entry in bot_procs.values(): + proc = entry[0] + proc.terminate() + proc.wait() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + aiohttp_session = aiohttp.ClientSession() + daily_helpers["rest"] = DailyRESTHelper( + daily_api_key=os.getenv("DAILY_API_KEY", ""), + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, + ) + yield + await aiohttp_session.close() + cleanup() + + +app = FastAPI(lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/start") +async def start_agent(request: Request): + print(f"!!! Creating room") + room = await daily_helpers["rest"].create_room(DailyRoomParams()) + print(f"!!! Room URL: {room.url}") + # Ensure the room property is present + if not room.url: + raise HTTPException( + status_code=500, + detail="Missing 'room' property in request data. Cannot start agent without a target room!", + ) + + # Check if there is already an existing process running in this room + num_bots_in_room = sum( + 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None + ) + if num_bots_in_room >= MAX_BOTS_PER_ROOM: + raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}") + + # Get the token for the room + token = await daily_helpers["rest"].get_token(room.url) + + if not token: + raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}") + + # Spawn a new agent, and join the user session + # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) + try: + proc = subprocess.Popen( + [f"python3 -m bot -u {room.url} -t {token}"], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__)), + ) + bot_procs[proc.pid] = (proc, room.url) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") + + return RedirectResponse(room.url) + + +@app.get("/status/{pid}") +def get_status(pid: int): + # Look up the subprocess + proc = bot_procs.get(pid) + + # If the subprocess doesn't exist, return an error + if not proc: + raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found") + + # Check the status of the subprocess + if proc[0].poll() is None: + status = "running" + else: + status = "finished" + + return JSONResponse({"bot_id": pid, "status": status}) + + +if __name__ == "__main__": + import uvicorn + + default_host = os.getenv("HOST", "0.0.0.0") + default_port = int(os.getenv("FAST_API_PORT", "7860")) + + parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, default=default_host, help="Host address") + parser.add_argument("--port", type=int, default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", help="Reload code on change") + + config = parser.parse_args() + + uvicorn.run( + "server:app", + host=config.host, + port=config.port, + reload=config.reload, + ) diff --git a/pyproject.toml b/pyproject.toml index 5fd5d6790..1aaf0a5e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ Website = "https://pipecat.ai" anthropic = [ "anthropic~=0.34.0" ] aws = [ "boto3~=1.35.27" ] azure = [ "azure-cognitiveservices-speech~=1.40.0" ] +canonical = [ "aiofiles~=24.1.0" ] cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ] daily = [ "daily-python~=0.11.0" ] deepgram = [ "deepgram-sdk~=3.7.3" ] diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py new file mode 100644 index 000000000..0c07d6815 --- /dev/null +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -0,0 +1,101 @@ +import wave +from io import BytesIO + +from pipecat.frames.frames import ( + AudioRawFrame, + BotInterruptionFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, + StartInterruptionFrame, + StopInterruptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class AudioBufferProcessor(FrameProcessor): + def __init__(self, **kwargs): + """ + Initialize the AudioBufferProcessor. + + This constructor sets up the initial state for audio processing: + - audio_buffer: A bytearray to store incoming audio data. + - num_channels: The number of audio channels (initialized as None). + - sample_rate: The sample rate of the audio (initialized as None). + + The num_channels and sample_rate are set to None initially and will be + populated when the first audio frame is processed. + """ + super().__init__(**kwargs) + self._user_audio_buffer = bytearray() + self._assistant_audio_buffer = bytearray() + self._num_channels = None + self._sample_rate = None + + def _buffer_has_audio(self, buffer: bytearray): + return buffer is not None and len(buffer) > 0 + + def _has_audio(self): + return ( + self._buffer_has_audio(self._user_audio_buffer) + and self._buffer_has_audio(self._assistant_audio_buffer) + and self._sample_rate is not None + ) + + def _reset_audio_buffer(self): + self._user_audio_buffer = bytearray() + self._assistant_audio_buffer = bytearray() + + def _merge_audio_buffers(self): + with BytesIO() as buffer: + with wave.open(buffer, "wb") as wf: + wf.setnchannels(2) + wf.setsampwidth(2) + wf.setframerate(self._sample_rate) + # Interleave the two audio streams + max_length = max(len(self._user_audio_buffer), len(self._assistant_audio_buffer)) + interleaved = bytearray(max_length * 2) + + for i in range(0, max_length, 2): + if i < len(self._user_audio_buffer): + interleaved[i * 2] = self._user_audio_buffer[i] + interleaved[i * 2 + 1] = self._user_audio_buffer[i + 1] + else: + interleaved[i * 2] = 0 + interleaved[i * 2 + 1] = 0 + + if i < len(self._assistant_audio_buffer): + interleaved[i * 2 + 2] = self._assistant_audio_buffer[i] + interleaved[i * 2 + 3] = self._assistant_audio_buffer[i + 1] + else: + interleaved[i * 2 + 2] = 0 + interleaved[i * 2 + 3] = 0 + + wf.writeframes(interleaved) + return buffer.getvalue() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, AudioRawFrame) and self._sample_rate is None: + self._sample_rate = frame.sample_rate + + # include all audio from the user + if isinstance(frame, InputAudioRawFrame): + self._user_audio_buffer.extend(frame.audio) + # Sync the assistant's buffer to the user's buffer by adding silence if needed + if len(self._user_audio_buffer) > len(self._assistant_audio_buffer): + silence_length = len(self._user_audio_buffer) - len(self._assistant_audio_buffer) + silence = b"\x00" * silence_length + self._assistant_audio_buffer.extend(silence) + + # if the assistant is speaking, include all audio from the assistant, + if isinstance(frame, OutputAudioRawFrame): + self._assistant_audio_buffer.extend(frame.audio) + + # do not push the user's audio frame, doing so will result in echo + if not isinstance(frame, InputAudioRawFrame): + await self.push_frame(frame, direction) diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py new file mode 100644 index 000000000..0f4fb6a2f --- /dev/null +++ b/src/pipecat/services/canonical.py @@ -0,0 +1,188 @@ +import os +import uuid +from datetime import datetime +from typing import Dict, List, Tuple + +import aiohttp +from loguru import logger + +try: + import aiofiles + import aiofiles.os +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Canonical Metrics, you need to `pip install pipecat-ai[canonical]`. " + + "Also, set the `CANONICAL_API_KEY` environment variable." + ) + raise Exception(f"Missing module: {e}") + + +from pipecat.frames.frames import CancelFrame, EndFrame, Frame +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import AIService + +# Multipart upload part size in bytes, cannot be smaller than 5MB +PART_SIZE = 1024 * 1024 * 5 +""" +This class extends AudioBufferProcessor to handle audio processing and uploading +for the Canonical Voice API. +""" + + +class CanonicalMetricsService(AIService): + """ + Initialize a CanonicalAudioProcessor instance. + + This class extends AudioBufferProcessor to handle audio processing and uploading + for the Canonical Voice API. + + Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Canonical Voice system to the call in your system. + assistant (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish + between different assistants and a grouping mechanism for calls. + assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. + output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". + + Attributes: + call_id (str): Stores the unique call identifier. + assistant (str): Stores the assistant identifier. + assistant_speaks_first (bool): Indicates whether the assistant speaks first. + output_dir (str): Directory path for saving temporary audio files. + + The constructor also ensures that the output directory exists. + This class requires a Canonical API key to be set in the CANONICAL_API_KEY environment variable. + """ + + def __init__( + self, + *, + aiohttp_session: aiohttp.ClientSession, + audio_buffer_processor: AudioBufferProcessor, + call_id: str, + assistant: str, + api_key: str, + api_url: str = "https://voiceapp.canonical.chat/api/v1", + assistant_speaks_first: bool = True, + output_dir: str = "recordings", + **kwargs, + ): + super().__init__(**kwargs) + self._aiohttp_session = aiohttp_session + self._audio_buffer_processor = audio_buffer_processor + self._api_key = api_key + self._api_url = api_url + self._call_id = call_id + self._assistant = assistant + self._assistant_speaks_first = assistant_speaks_first + self._output_dir = output_dir + + async def stop(self, frame: EndFrame): + await self._process_audio() + + async def cancel(self, frame: CancelFrame): + await self._process_audio() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) + + async def _process_audio(self): + pipeline = self._audio_buffer_processor + if pipeline._has_audio(): + os.makedirs(self._output_dir, exist_ok=True) + filename = self._get_output_filename() + wave_data = pipeline._merge_audio_buffers() + + async with aiofiles.open(filename, "wb") as file: + await file.write(wave_data) + + try: + await self._multipart_upload(filename) + pipeline._reset_audio_buffer() + await aiofiles.os.remove(filename) + except FileNotFoundError: + pass + except Exception as e: + logger.error(f"Failed to upload recording: {e}") + + def _get_output_filename(self): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{self._output_dir}/{timestamp}-{uuid.uuid4().hex}.wav" + + def _request_headers(self): + return {"Content-Type": "application/json", "X-Canonical-Api-Key": self._api_key} + + async def _multipart_upload(self, file_path: str): + upload_request, upload_response = await self._request_upload(file_path) + if upload_request is None or upload_response is None: + return + parts = await self._upload_parts(file_path, upload_response) + if parts is None: + return + await self._upload_complete(parts, upload_request, upload_response) + + async def _request_upload(self, file_path: str) -> Tuple[Dict, Dict]: + filename = os.path.basename(file_path) + filesize = os.path.getsize(file_path) + numparts = int((filesize + PART_SIZE - 1) / PART_SIZE) + + params = { + "filename": filename, + "parts": numparts, + "callId": self._call_id, + "assistant": {"id": self._assistant, "speaksFirst": self._assistant_speaks_first}, + } + logger.debug(f"Requesting presigned URLs for {numparts} parts") + response = await self._aiohttp_session.post( + f"{self._api_url}/recording/uploadRequest", headers=self._request_headers(), json=params + ) + if not response.ok: + logger.error(f"Failed to get presigned URLs: {await response.text()}") + return None, None + response_json = await response.json() + return params, response_json + + async def _upload_parts(self, file_path: str, upload_response: Dict) -> List[Dict]: + urls = upload_response["urls"] + parts = [] + try: + async with aiofiles.open(file_path, "rb") as file: + for partnum, upload_url in enumerate(urls, start=1): + data = await file.read(PART_SIZE) + if not data: + break + + response = await self._aiohttp_session.put(upload_url, data=data) + if not response.ok: + logger.error(f"Failed to upload part {partnum}: {await response.text()}") + return None + + etag = response.headers["ETag"] + parts.append({"partnum": str(partnum), "etag": etag}) + + except Exception as e: + logger.error(f"Multipart upload aborted, an error occurred: {str(e)}") + return parts + + async def _upload_complete( + self, parts: List[Dict], upload_request: Dict, upload_response: Dict + ): + params = { + "filename": upload_request["filename"], + "parts": parts, + "slug": upload_response["slug"], + "callId": self._call_id, + "assistant": {"id": self._assistant, "speaksFirst": self._assistant_speaks_first}, + } + logger.debug(f"Completing upload for {params['filename']}") + logger.debug(f"Slug: {params['slug']}") + response = await self._aiohttp_session.post( + f"{self._api_url}/recording/uploadComplete", + headers=self._request_headers(), + json=params, + ) + if not response.ok: + logger.error(f"Failed to complete upload: {await response.text()}") + return