-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
bot.py
244 lines (204 loc) · 9.52 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import os
import yaml
import datetime
from threading import Lock
from telegram import Update, ParseMode
from telegram.ext import Updater, CommandHandler, CallbackContext, MessageHandler, Filters
import requests
import anthropic
import logging
# Load your Claude API key and Telegram token from environment variables or direct string assignment
CLAUDE_KEY = os.getenv('CLAUDE_KEY')
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
# Initialize Claude client
client = anthropic.Anthropic(
# defaults to os.environ.get("ANTHROPIC_API_KEY")
api_key=CLAUDE_KEY
)
# Initialize a lock for thread-safe file writing
file_lock = Lock()
# Load or initialize the admin list and group whitelist
admins = {}
groups = {}
usage_data = {}
def start(update: Update, context: CallbackContext) -> None:
update.message.reply_text('Hello! Ask me anything about ApeWorX!')
# Load knowledge base
knowledge_base = ''
with open('knowledge-base.txt', 'r', encoding="utf-8") as file:
knowledge_base = file.read()
# Default configurations
DEFAULT_ADMINS = {
'67950696': True,
}
DEFAULT_GROUPS = {
'-1001868541493': {'messages_today': 0, 'last_reset': str(datetime.date.today())},
'-4069234649': {'messages_today': 0, 'last_reset': str(datetime.date.today())},
}
def safe_split_message(text, max_length=4000):
"""Split message while preserving markdown code blocks."""
messages = []
current_message = ""
code_block = False
for line in text.split('\n'):
if line.startswith('```'):
code_block = not code_block
if len(current_message + line + '\n') > max_length and not code_block:
messages.append(current_message)
current_message = line + '\n'
else:
current_message += line + '\n'
if current_message:
messages.append(current_message)
return messages
def load_data():
global admins, groups, usage_data
try:
with open('admins.yml', 'r') as f:
admins = yaml.safe_load(f) or DEFAULT_ADMINS
except FileNotFoundError:
admins = DEFAULT_ADMINS.copy()
try:
with open('groups.yml', 'r') as f:
groups = yaml.safe_load(f) or DEFAULT_GROUPS
except FileNotFoundError:
groups = DEFAULT_GROUPS.copy()
try:
with open('usage.yml', 'r') as f:
usage_data = yaml.safe_load(f) or {}
except FileNotFoundError:
usage_data = {}
# Ensure default admins and groups are always present
for admin_id, value in DEFAULT_ADMINS.items():
admins.setdefault(admin_id, value)
for group_id, group_data in DEFAULT_GROUPS.items():
groups.setdefault(group_id, group_data)
def save_data():
with file_lock:
with open('admins.yml', 'w') as f:
yaml.dump(admins, f)
with open('groups.yml', 'w') as f:
yaml.dump(groups, f)
with open('usage.yml', 'w') as f:
yaml.dump(usage_data, f)
def start(update: Update, context: CallbackContext) -> None:
update.message.reply_text('Hello! Ask me anything about ApeWorX!')
def add_admin(update: Update, context: CallbackContext) -> None:
owner_id = '67950696'
if update.message.from_user.id == int(owner_id):
new_admin_id = context.args[0] if context.args else ''
admins[new_admin_id] = True
save_data()
update.message.reply_text('Admin added successfully.')
else:
update.message.reply_text('You are not authorized to add admins.')
def add_group(update: Update, context: CallbackContext) -> None:
if str(update.message.from_user.id) in admins:
new_group_id = context.args[0] if context.args else ''
groups[new_group_id] = {'messages_today': 0, 'last_reset': str(datetime.date.today())}
save_data()
update.message.reply_text('Group added to whitelist successfully.')
else:
update.message.reply_text('You are not authorized to add groups.')
def preaudit(update: Update, context: CallbackContext) -> None:
url = context.args[0] if context.args else ''
if not url:
update.message.reply_text('Please provide a URL.')
return
try:
response = requests.get(url)
response.raise_for_status()
code_content = response.text
prompt = '''
/- Read and match the natspec documentation made for each function in the code above with its code, for each important function list the differences if they don't match perfectly.
/- Make a list with function signatures and assessments for parts that do not match according to your interpretation.
/- You can NEVER say that code is too long to make a review, you have more context size than the source code to craft your answer so you are allowed to make big analysis.
/- For large codebases it's ok to analyze only the most important functions (normally the external ones).
/- You don't need to execute any part of the code, just read it.
'''
messages = [{
"role": "user",
"content": f"{prompt}\n\n{code_content}"
}]
response = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=4000,
temperature=0,
messages=messages
)
bot_response = response.content[0].text
for msg in safe_split_message(bot_response):
update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN)
except requests.RequestException as e:
update.message.reply_text(f"Error fetching data from the URL: {e}")
except (APIError, APIConnectionError, APITimeoutError) as e:
update.message.reply_text(f"Claude API error: {str(e)}")
except Exception as e:
update.message.reply_text(f"Unexpected error: {str(e)}")
def handle_message(update: Update, context: CallbackContext) -> None:
group_id = str(update.message.chat_id)
if group_id in groups:
# Check if the daily limit has been reached
group_data = groups[group_id]
if group_data['last_reset'] != str(datetime.date.today()):
group_data['messages_today'] = 0
group_data['last_reset'] = str(datetime.date.today())
if group_data['messages_today'] >= 10:
update.message.reply_text('GPT limit for this group has been reached (10 msgs a day).')
return
user_message = update.message.text
command_to_remove = update.message.text.split()[0] # This will be either /p or /prompt
user_message = user_message.replace(command_to_remove, '', 1).strip()
system_prompt = '''
/- You are a bot helping people understand Ape.
/- I have prefixed a KNOWLEDGE BASE that help you understand what is Ape.
/- The answer must exist within the source files, otherwise don't answer.
/- You can use ```language to write code that shows in a pretty way.
/- Do not invent anything about ape that is not in source files unless you said you were going creative.
/- False certanty about what ape can do is the worse thing you can do, avoid it at all costs.
/- ALWAYS Answer the user question using the source files and tell the source of your answer.
/- ALWAYS provide a % score of how much of your answer matches the KNOWLEDGE BASE.
/- If the task is of creative nature it's ok to go wild and beyond just the sources, but you MUST state that confidence score is -1 in that case.
'''
knowledge_base_content = "---START OF KNOWLEDGE BASE---\n\n" + knowledge_base + "\n\n---END OF KNOWLEDGE BASE---"
content = f"{system_prompt}\n\n{knowledge_base_content}\n\n{user_message}"
if update.message.reply_to_message:
content = f"{system_prompt}\n\n{knowledge_base_content}\n\nPrevious message: {update.message.reply_to_message.text}\n\nNew message: {user_message}"
messages = [{
"role": "user",
"content": content
}]
try:
response = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=4000,
temperature=0,
messages=messages
)
bot_response = response.content[0].text
for msg in safe_split_message(bot_response):
update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN)
if not admins.get(str(update.message.from_user.id)):
groups[group_id]['messages_today'] += 1
save_data()
except (APIError, APIConnectionError, APITimeoutError) as e:
error_message = f"Claude API error: {str(e)}"
update.message.reply_text(error_message)
except Exception as e:
error_message = f"Unexpected error: {str(e)}"
update.message.reply_text(error_message)
def main() -> None:
load_data()
updater = Updater(TELEGRAM_TOKEN)
dispatcher = updater.dispatcher
dispatcher.add_handler(CommandHandler("prompt", handle_message))
dispatcher.add_handler(CommandHandler("p", handle_message))
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("add_admin", add_admin))
dispatcher.add_handler(CommandHandler("add_group", add_group))
dispatcher.add_handler(CommandHandler("preaudit", preaudit))
dispatcher.add_handler(MessageHandler(Filters.text & Filters.regex(r'^y\s'), handle_message))
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()