-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
361 lines (334 loc) · 15.6 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
import asyncio
import os
import sys
from loguru import logger
import json
from dotenv import load_dotenv
import firebase_admin
from firebase_admin import credentials, firestore
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import StartFrame, EndFrame
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.logger import FrameLogger
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService, OpenAILLMContextFrame
from websocket_server import WebsocketServerParams, WebsocketServerTransport
from pipecat.audio.vad.silero import SileroVADAnalyzer
from noisereduce_filter import NoisereduceFilter
#from groqstt import GroqSTTService
from groqSTT import GroqVADSTTService
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
firebase_creds = json.loads(os.environ.get('FIREBASE_CREDENTIALS', '{}'))
cred = credentials.Certificate(firebase_creds)
firebase_admin.initialize_app(cred)
db = firestore.client()
class IntakeProcessor:
def __init__(self, context: OpenAILLMContext):
print(f"Initializing context from IntakeProcessor")
context.add_message(
{
"role": "system",
"content": "You are Jessica, an telephone call agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. To insert pauses, insert “-” where you need the pause.Use two question marks to emphasize questions. For example, “Are you here??” vs. “Are you here?”Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
}
)
context.set_tools(
[
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}
},
},
},
}
]
)
async def verify_birthday(
self, function_name, tool_call_id, args, llm, context, result_callback
):
if args["birthday"] == "1990-01-01":
context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_prescriptions",
"description": "Once the user has provided a list of their prescription medications, call this function.",
"parameters": {
"type": "object",
"properties": {
"prescriptions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"medication": {
"type": "string",
"description": "The medication's name",
},
"dosage": {
"type": "string",
"description": "The prescription's dosage",
},
},
},
}
},
},
},
}
]
)
# It's a bit weird to push this to the LLM, but it gets it into the pipeline
# await llm.push_frame(sounds["ding2.wav"], FrameDirection.DOWNSTREAM)
# We don't need the function call in the context, so just return a new
# system message and let the framework re-prompt
await result_callback(
[
{
"role": "system",
"content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
}
]
)
else:
# The user provided an incorrect birthday; ask them to try again
await result_callback(
[
{
"role": "system",
"content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
}
]
)
async def start_prescriptions(self, function_name, llm, context):
print(f"!!! doing start prescriptions")
# Move on to allergies
context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_allergies",
"description": "Once the user has provided a list of their allergies, call this function.",
"parameters": {
"type": "object",
"properties": {
"allergies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "What the user is allergic to",
}
},
},
}
},
},
},
}
]
)
context.add_message(
{
"role": "system",
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
}
)
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
async def start_allergies(self, function_name, llm, context):
print("!!! doing start allergies")
# Move on to conditions
context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_conditions",
"description": "Once the user has provided a list of their medical conditions, call this function.",
"parameters": {
"type": "object",
"properties": {
"conditions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's medical condition",
}
},
},
}
},
},
},
},
]
)
context.add_message(
{
"role": "system",
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question or confirmed they dont have any, call the list_conditions function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, function_name, llm, context):
print("!!! doing start conditions")
# Move on to visit reasons
context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_visit_reasons",
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
"parameters": {
"type": "object",
"properties": {
"visit_reasons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor",
}
},
},
}
},
},
},
}
]
)
context.add_message(
{
"role": "system",
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_visit_reasons(self, function_name, llm, context):
print("!!! doing start visit reasons")
# move to finish call
context.set_tools([])
context.add_message(
{"role": "system", "content": "Now, thank the user and end the conversation."}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
logger.info(f"Saving data: {args}")
# Get the user document reference
user_ref = db.collection('users').document('chad_bailey') # You might want to make this dynamic
# Update the user document based on the function name
if function_name == "list_prescriptions":
user_ref.update({"prescriptions": args["prescriptions"]})
elif function_name == "list_allergies":
user_ref.update({"allergies": args["allergies"]})
elif function_name == "list_conditions":
user_ref.update({"conditions": args["conditions"]})
elif function_name == "list_visit_reasons":
user_ref.update({"visit_reasons": args["visit_reasons"]})
logger.info(f"Data saved to Firebase for function: {function_name}")
await result_callback(None)
async def main():
transport = WebsocketServerTransport(
params=WebsocketServerParams(
host="",
port=int(os.environ["PORT"]),
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
audio_in_filter=NoisereduceFilter(),
)
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="829ccd10-f8b3-43cd-b8a0-4aeaa81f3b30", # British Lady
)
messages = []
context = OpenAILLMContext(messages=messages)
context_aggregator = llm.create_context_aggregator(context)
intake = IntakeProcessor(context)
# Register functions
llm.register_function("verify_birthday", intake.verify_birthday)
llm.register_function(
"list_prescriptions",
intake.save_data,
start_callback=intake.start_prescriptions
)
llm.register_function(
"list_allergies",
intake.save_data,
start_callback=intake.start_allergies
)
llm.register_function(
"list_conditions",
intake.save_data,
start_callback=intake.start_conditions
)
llm.register_function(
"list_visit_reasons",
intake.save_data,
start_callback=intake.start_visit_reasons
)
fl = FrameLogger("LLM Output")
pipeline = Pipeline([
transport.input(), # WebSocket input
stt, # Speech-To-Text
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # Text-To-Speech
transport.output(), # WebSocket output
context_aggregator.assistant(), # Assistant responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Clear existing messages and create new context
context.messages.clear()
# Reinitialize IntakeProcessor with fresh context
intake = IntakeProcessor(context)
# Reset context aggregator with fresh context
context_aggregator = llm.create_context_aggregator(context)
print(f"New client connected. Fresh context created: {context}")
await task.queue_frames([OpenAILLMContextFrame(context)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())