-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
283 lines (215 loc) · 9.02 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import logging
import os
import re
from datetime import datetime
from bs4 import BeautifulSoup
import requests
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, MessageHandler, filters, CallbackQueryHandler, ContextTypes
from tinydb import TinyDB, Query
from tinydb.operations import increment
from pytube import YouTube
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the API key for YouTube
YOUTUBE_API_KEY = os.environ['YOUTUBE_API_KEY']
# Define the API key for Telegram
TELEGRAM_API_KEY = os.environ['TELEGRAM_API_KEY']
# Domains divided by ;
IGNORED_DOMAINS = os.environ['IGNORED_DOMAINS']
# Define the download directory and data directory
DOWNLOAD_DIR = os.environ['DOWNLOAD_DIR']
DATA_DIR = os.environ['DATA_DIR']
# Initialize the database
db = TinyDB(DATA_DIR + '/db_music.json')
def tguser_to_dict(user):
"""
Converts a Telegram user object to a usable dictionary.
@user: The user object.
@return: The dictionary.
"""
return {
'id': user.id,
'first_name': user.first_name,
'last_name': user.last_name,
'username': user.username,
}
def save_entry(chat_id, youtube_url, song_title, artist, user):
"""
Saves the user's chat ID and the YouTube URL to the database.
@chat_id: The chat ID of the user.
@youtube_url: The YouTube URL of the video.
@song_title: The title of the song.
@artist: The artist of the song.
@user: The user object.
@return: The entry that was saved to the database.
"""
query = Query()
# Check if the entry already exists in the database
if db.contains((query.chat_id == chat_id) and (query.youtube_url == youtube_url)):
logger.info(f'Entry {youtube_url} already exists in the database!')
db.update(increment('mentions'), (query.chat_id == chat_id)
and (query.youtube_url == youtube_url))
else:
# Insert the entry into the database
db.insert({
'chat_id': chat_id,
'youtube_url': youtube_url,
'song_title': song_title,
'artist': artist,
'user': tguser_to_dict(user),
'mentions': 1,
'date': datetime.now().strftime("%d/%m/%Y %H:%M:%S")
})
return db.get((query.chat_id == chat_id) and (query.youtube_url == youtube_url))
def extract_audio(youtube_url, output_path):
"""
Extracts the audio from a YouTube video and saves it to the specified output path.
@youtube_url: The YouTube URL of the video.
@output_path: The path to save the audio to.
@return: The path to the audio file.
"""
try:
# Create a YouTube object using the provided URL
yt = YouTube(youtube_url)
title = re.sub("[!@#$%^&*()[]{};:,./<>?\|`~-=_+]",
" ", yt.title).replace(" ", "_")
audio_stream = yt.streams.get_audio_only()
# Download the audio stream to the specified output path
audio_path = audio_stream.download(
output_path=output_path, filename=f'{title}.mp4')
logger.info(f'Audio {youtube_url} extraction successful!')
return audio_path
except Exception as e:
logger.error(f'Error during audio extraction from {youtube_url}: {e}')
def search_song_on_youtube(song_title, artist):
"""
Searches YouTube for a song with the given title and artist and returns the first result.
@song_title: The title of the song.
@artist: The artist of the song.
@return: The YouTube URL of the first result.
"""
# Build the query string
query = song_title + " " + artist
query = query.replace(" ", "+")
# Make the request to the YouTube API
url = f"https://www.googleapis.com/youtube/v3/search?part=snippet&q={query}&type=video&key={YOUTUBE_API_KEY}"
response = requests.get(url)
data = response.json()
# Get the first result
result = data['items'][0]
youtube_url = f"https://www.youtube.com/watch?v={result['id']['videoId']}"
return youtube_url
def extract_song_info_from_apple_music_link(link):
"""
Extracts the song title and artist from an Apple Music link.
@link: The Apple Music link.
@return: The song title and artist.
"""
# Split the URL into the resource name and the rest of the path
protocol, resource_name_and_rest = link.split('://')
resource_name, language, album, title, * \
rest = resource_name_and_rest.split('/')
# Fetch the webpage
response = requests.get(link)
# Parse the response
soup = BeautifulSoup(response.text, "html.parser")
title_element = title
# Find the anchor element with the class "metadata-lockup__subheadline"
artist_element = soup.find('a', class_='click-action')
song_title = title_element.strip()
artist = artist_element.text.strip()
return song_title, artist
def extract_song_info_from_spotify_link(link):
"""
Extracts the song title and artist from a Spotify link.
@link: The Spotify link.
@return: The song title and artist.
"""
# Split the URL into the resource name and the rest of the path
protocol, resource_name_and_rest = link.split('://')
resource_name, *rest = resource_name_and_rest.split('/')
# Construct the modified URL with /embed after the resource name
modified_link = f'{protocol}://{resource_name}/embed/{"/".join(rest)}'
# Fetch the webpage
response = requests.get(modified_link)
# Parse the response
soup = BeautifulSoup(response.text, "html.parser")
# Find all the anchor elements in the webpage
anchors = soup.find_all('a')
# Extract the text from the first two anchor elements
song = anchors[0].text
artist = anchors[1].text
return song, artist
async def search_song(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handler function that is called when a message containing an Apple Music or Spotify link is received.
Searches YouTube for the song and returns the first result.
@param update: The update object.
@param context: The context object.
@return: None
"""
# Get the message and extract the link
message = update.message.text
user = update.message.from_user
chat_id = update.message.chat_id
try:
link = re.search(r"(https?://[^\s]+)", message).group(0)
except Exception as e:
logger.warning(f'Ignoring message since it does not contain a link.')
# Extract the song title and artist from the link
if "apple.com" in link:
song_title, artist = extract_song_info_from_apple_music_link(link)
elif "spotify.com" in link:
song_title, artist = extract_song_info_from_spotify_link(link)
elif any(domain in link for domain in IGNORED_DOMAINS.split(";")):
return
else:
await update.message.reply_text(
f"Sorry {user.first_name}, I only support Apple Music (apple.com) and Spotify links (spotify.com).")
return
# Search YouTube for the song
youtube_url = search_song_on_youtube(song_title, artist)
entry = save_entry(chat_id, youtube_url, song_title, artist, user)
keyboard = [[InlineKeyboardButton(
"Download 🚀", callback_data=youtube_url)]]
download_markup = InlineKeyboardMarkup(keyboard)
if(entry['mentions'] == 1):
# Send the YouTube link as a message
await update.message.reply_text(f"Here is the Youtube Link, keep on bangin\' 😎\n{youtube_url}", reply_markup=download_markup)
else:
await update.message.reply_text(f"This song has been mention here {entry['mentions']} times and was first mentioned by {entry['user']['first_name']} {entry['user']['last_name']}, keep on bagin\' 😎\n{youtube_url}", reply_markup=download_markup)
async def download_button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handler function that is called when the download button is clicked.
@param update: The update object.
@param context: The context object.
@return: None
"""
query = update.callback_query
# await for someone to click button
await query.answer()
youtube_url = query.data
# Edit message so that button disappears
await query.edit_message_text(text=f'Download started, audio will be coming shortly, keep on scratchin\' 😘\n{youtube_url}')
path = extract_audio(youtube_url, DOWNLOAD_DIR)
# Upload file
await context.bot.send_document(chat_id=query.message.chat_id, document=open(path, 'rb'))
# Clean file
os.remove(path)
def main():
"""
Main function that starts the bot.
"""
# Create the Updater and pass it the API key
application = Application.builder().token(
os.environ['TELEGRAM_API_KEY']).build()
# Add the message handler
application.add_handler(MessageHandler(filters.TEXT, search_song))
application.add_handler(CallbackQueryHandler(download_button))
# Start the bot
application.run_polling()
if __name__ == '__main__':
main()