-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
280 lines (231 loc) · 9.91 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
##### Libraries #####
import dotenv
dotenv.load_dotenv(".env")
import os
os.environ["HF_HOME"] = os.getenv("HF_HOME")
import time
import json
import json5
import openai
import typing
import asyncio
import discord
import logging
import functools
import subprocess
from discord.threads import Thread
from typing import Union, List, Dict
from qwen_agent.utils.utils import extract_code
from libs import (
HfBaseModel,
HfZephyr7bBeta,
HfQwen,
HfDeepseekCoderInstruct,
VllmDockerLcModel,
VllmDockerQwenAgent,
)
from discord.channel import (
TextChannel,
DMChannel,
GroupChannel,
PartialMessageable,
VoiceChannel,
StageChannel,
)
MessageableChannel = Union[TextChannel, VoiceChannel, StageChannel, Thread,
DMChannel, PartialMessageable, GroupChannel]
VllmDockerModel = Union[VllmDockerLcModel, VllmDockerQwenAgent]
##### Parameters #####
MODEL_NAME : str = str(os.getenv("MODEL_NAME"))
MAX_MODEL_LEN : int = int(os.getenv("MAX_MODEL_LEN"))
VLLM_PORT : int = int(os.getenv("VLLM_PORT"))
DISCORD_TOKEN : str = str(os.getenv("DISCORD_TOKEN"))
DC_LOG_LEVEL : int = logging.WARNING
MAIN_LOG_LEVEL: int = logging.INFO
# MAIN_LOG_LEVEL: int = logging.DEBUG
##### Loggers #####
DC_LOGGER = logging.getLogger("discord")
DC_LOGGER.addHandler(logging.StreamHandler())
DC_LOGGER.setLevel(DC_LOG_LEVEL)
MAIN_LOGGER = logging.getLogger("Main")
MAIN_LOGGER.setLevel(MAIN_LOG_LEVEL)
MAIN_HANDLER = logging.StreamHandler()
MAIN_HANDLER.setLevel(MAIN_LOG_LEVEL)
MAIN_HANDLER.setFormatter(logging.Formatter('\n'+os.environ["LOG_FMT"], datefmt=os.environ["LOG_DATE_FMT"]))
MAIN_LOGGER.addHandler(MAIN_HANDLER)
##### Functions #####
def to_thread(func: typing.Callable) -> typing.Coroutine:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
return wrapper
async def log_and_send(
channel: MessageableChannel,
message: str,
level: int = logging.INFO
) -> None:
MAIN_LOGGER.log(msg=message, level=level)
await channel.send(f"**[SYSTEM]** *{message}*")
return
def check_server_is_started(model: VllmDockerModel) -> bool:
try:
model(":)")
return True
except openai.APIConnectionError:
return False
@to_thread
def report_server_started(model: VllmDockerModel) -> None:
first_sleep_time = 330
MAIN_LOGGER.debug(f"First time waiting for the docker to start... sleep for {first_sleep_time} secs.")
time.sleep(first_sleep_time)
while True:
try:
model(":)")
return
except openai.APIConnectionError:
MAIN_LOGGER.debug("Docker is still starting... sleep for 5 secs.")
time.sleep(5)
async def start_docker(
model: VllmDockerModel,
channel: MessageableChannel
) -> None:
await log_and_send(channel, "Checking the docker is started or not...")
if check_server_is_started(model):
await log_and_send(channel, "The docker is already started!")
else:
await log_and_send(channel, "Starting the docker... This takes about 6 minutes.")
await channel.send("**[SYSTEM]** *I will notice you when the docker is successfully started.*")
subprocess.check_call(args=[ "docker-compose", "-f", "docker-compose.yml", "start" ])
await report_server_started(model)
await log_and_send(channel, "The docker has successfully started!")
async def restart_docker(
model: VllmDockerModel,
channel: MessageableChannel
) -> None:
await log_and_send(channel, "Checking the docker is started or not...")
if check_server_is_started(model):
await log_and_send(channel, "Restarting the docker... This takes about 6 minutes.")
await channel.send("**[SYSTEM]** *I will notice you when the docker is successfully restarted.*")
subprocess.check_call(args=[ "docker-compose", "-f", "docker-compose.yml", "restart" ])
await report_server_started(model, channel)
await log_and_send(channel, "The docker has successfully restarted!")
else:
await log_and_send(channel, "The docker isn't started yet, please use the command \"!Start\" instead.")
async def force_restart_docker(
model: VllmDockerModel,
channel: MessageableChannel
) -> None:
await log_and_send(channel, "Force restarting the docker... This takes about 6 minutes.\n" + \
"I will notice you when the docker is successfully restarted.")
subprocess.check_call(args=[ "docker-compose", "-f", "docker-compose.yml", "restart" ])
await report_server_started(model, channel)
await log_and_send(channel, "The docker has successfully restarted!")
async def stop_docker(
model: VllmDockerModel,
channel: MessageableChannel
) -> None:
await log_and_send(channel, "Stopping the docker...")
subprocess.check_call(args=[ "docker-compose", "-f", "docker-compose.yml", "stop" ])
await log_and_send(channel, "The docker has successfully stopped!")
def split_message(message: str) -> List[str]:
if len(message) <= 1900: return [ message ]
split_messages = [ '' ]
in_markdown = False
markdown_python = False
for msg in message.split('\n'):
if len(split_messages[-1]) + len(msg) <= 1900:
split_messages[-1] += msg+'\n'
if "```" in msg:
in_markdown = not in_markdown
markdown_python = in_markdown and "```py" in msg
else:
if in_markdown:
split_messages[-1] += "```"
msg = f"```py\n{msg}" if markdown_python else f"```{msg}"
split_messages.append(msg+'\n')
MAIN_LOGGER.debug(f"Response was splitted into {len(split_messages)} slices.")
return split_messages
def process_qwen_response_list(response_list: List[Dict]) -> List[Dict]:
adjusted_response_list = []
for response in response_list:
content: str = response.get("content")
if content:
# content = content.replace("stdout:", '').strip()
if response["role"] == "assistant":
role = "Bot"
elif response["role"] == "function":
function_name: str = response["name"]
function_name_split = function_name.split('_')
role = ' '.join([ n.capitalize() for n in function_name_split ])
else:
role = response["role"]
elif "function_call" in response:
role = "Function Call"
called_function = response['function_call']['name']
# Temp
if called_function == "code_interpreter":
content = f"Called function: {called_function}\n"
content += "Arguments: Skipped due to parsing problem."
else:
content = f"Called function: {called_function}\n"
# # print('\n\n', response['function_call']['arguments'], '\n\n')
# arguments = json.loads(response['function_call']['arguments'])
# if len(arguments["content"]) > 30:
# arguments["content"] = arguments["content"][:30] + "..."
# content += f"Arguments:\n```{json.dumps(arguments, indent=4)}```"
adjusted_response_list.append((role, content))
return adjusted_response_list
##### Classes #####
class DiscordBot(discord.Client):
def __init__(
self,
model: HfBaseModel | VllmDockerModel,
intents: discord.Intents,
**options: dotenv.Any
) -> None:
super().__init__(intents=intents, **options)
self.model = model
async def on_ready(self) -> None:
MAIN_LOGGER.info(f"Discord bot \"{self.user}\" connected!")
async def on_message(self, dc_msg: discord.message.Message) -> None:
# Prevent the bot from replying its own message
if dc_msg.author.id != int(os.getenv("USER_ID")): return
MAIN_LOGGER.debug(f"dc_msg: {dc_msg}")
message = dc_msg.content
message_pruned = message[:20] + "..." if len(message) > 20 else message
MAIN_LOGGER.info(f"Received message: \"{message_pruned}\" from \"{dc_msg.author.name}\".")
if type(self.model) is VllmDockerModel:
if message == "!Start":
await start_docker(self.model, dc_msg.channel)
return
elif message == "!Restart":
await restart_docker(self.model, dc_msg.channel)
return
elif message == "!ForceRestart":
await force_restart_docker(self.model, dc_msg.channel)
return
elif message == "!Stop":
await stop_docker(self.model, dc_msg.channel)
return
if type(self.model) is not VllmDockerQwenAgent:
response = self.model(dc_msg.content)
MAIN_LOGGER.debug(f"Generated response: \"{response}\".")
msg = await dc_msg.channel.send(response)
response_pruned = response[:20] + "..." if len(response) > 20 else response
MAIN_LOGGER.info(f"Replied: \"{response_pruned}\".")
else:
response_list = self.model(dc_msg.content)
response_list = process_qwen_response_list(response_list)
for role, content in response_list:
split_messages = split_message(f"# {role}:\n{content}")
for message in split_messages:
msg = await dc_msg.channel.send(message)
time.sleep(1)
content_pruned = content[:20] + "..." if len(content) > 20 else content
MAIN_LOGGER.info(f"Replied: \"{content_pruned}\".")
##### Execution #####
if __name__ == "__main__":
# model: VllmDockerLcModel = VllmDockerLcModel(MODEL_NAME, MAX_MODEL_LEN, VLLM_PORT)
model: VllmDockerQwenAgent = VllmDockerQwenAgent(MODEL_NAME, VLLM_PORT)
bot = DiscordBot(model=model, intents=discord.Intents.default())
bot.run(DISCORD_TOKEN)