-
Notifications
You must be signed in to change notification settings - Fork 1
/
scrobbler.py
294 lines (241 loc) · 11.7 KB
/
scrobbler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
"""
This is a script to get up-to-date track information from a Spinitron page for a radio station,
and "scrobble" that information to a user's Last.fm profile (allowing the user to record the
songs they are listening to on the radio station).
This functionality requires prerequsites of an API key and API secret from Last.fm, as well as a Spinitron API
key associated with the desired station. These should be entered into the .env file with the variable names
LASTFM_API_KEY, LASTFM_API_SECRET, and SPINITRON_API_KEY.
Before starting scrobbling, there must also be a web service session established with an associated
session key. If one has not already been obtained, the script should be run with the --setup flag
to establish one. The session key should be entered into the .env file with the variable name
LASTFM_SESSION_KEY.
"""
import argparse
from datetime import datetime
from dateutil import parser, tz
from dotenv import load_dotenv, set_key
import hashlib
import logging
import os
import requests as r
import signal
import sys
import time
import webbrowser
import xml.etree.ElementTree as ET
LASTFM_API_URL = "https://ws.audioscrobbler.com/2.0/"
# Pull env variables
load_dotenv()
lastfm_api_key = os.getenv("LASTFM_API_KEY")
lastfm_api_secret = os.getenv("LASTFM_API_SECRET")
lastfm_session_key = os.getenv("LASTFM_SESSION_KEY")
spinitron_api_key = os.getenv("SPINITRON_API_KEY")
spinitron_headers={"Authorization": f"Bearer {spinitron_api_key}"}
logging.basicConfig(level=logging.INFO,
filename="error.log",
filemode='w',
format="%(asctime)s %(name)-4s %(levelname)s \n%(message)s\n")
def signal_handler(sig, frame):
print("Ctrl+C pressed, closing. Goodbye!")
sys.exit(0)
def generate_signature(params):
"""Takes parameters for a request and generates an md5 hash signature as specified in the Last.fm authentication specs
Args:
params (library): Library of stringe representing the parameters for the request
Returns:
str: The generated signature, as a string
"""
signature = ''
for key in sorted(params):
signature += key + str(params[key])
signature += lastfm_api_secret
return hashlib.md5(signature.encode("utf-8")).hexdigest()
def get_token():
"""Performs API call auth.getToken to fetch a request token
Returns:
str: Returned token, as a string
"""
params = {
"method": "auth.getToken",
"api_key": lastfm_api_key
}
params["api_sig"] = generate_signature(params)
response = r.post(LASTFM_API_URL, params=params)
root = ET.fromstring(response.content)
return root.find("./token").text
def get_session_key(token):
"""Performs API call to auth.getSession to create a web service session and fetch the associated session key
Args:
token (str): A request token, retrived from auth.getToken
Returns:
str: Returned session key, as a string
"""
params = {
"method": "auth.getSession",
"api_key": lastfm_api_key,
"token": token
}
params["api_sig"] = generate_signature(params)
response = r.post(LASTFM_API_URL, params=params)
root = ET.fromstring(response.content)
return root.find("./session/key").text
def update_np(session_key, artist, track, album=None, duration=None):
"""Performs API call to track.updateNowPlaying to indicate to Last.fm that the user has started listening to a new track
Args:
session_key (str): A session key for an associated web service session generated by auth.getSession
artist (str): The artist name
track (str): The track name
album (str, optional): The album name
duration (int, optional): The length of the track in seconds
Returns:
int: Status code of the response"""
params = {
"method": "track.updateNowPlaying",
"artist": artist,
"track": track,
"api_key": lastfm_api_key,
"sk": session_key
}
if (album):
params["album"] = album
if (duration):
params["duration"] = duration
params["api_sig"] = generate_signature(params)
response = r.post(LASTFM_API_URL, params=params)
# Handle http error if necessary
if not response.ok:
handle_lastfm_http_error(response=response, request_type="NP")
return response.status_code
def request_scrobble(session_key, artist, track, timestamp, album=None, duration=None):
"""Performs API call to track.scrobble to indicate to Last.fm that the user has listened to a song
Args:
session_key (str): A session key for an associated web service session generated by auth.getSession
artist (str): The artist name
track (str): The track name
timestamp (float): The time the track started playing (UTC), in UNIX timestamp format
album (str, optional): The album name
duration (int, optional): The length of the track in seconds
Returns:
int: Status code of the response
"""
params = {
"method": "track.scrobble",
"artist": artist,
"track": track,
"timestamp": timestamp,
"api_key": lastfm_api_key,
"sk": session_key
}
if (album):
params["album"] = album
if (duration):
params["duration"] = duration
params["api_sig"] = generate_signature(params)
response = r.post(LASTFM_API_URL, params=params)
# Handle http error if necessary
if not response.ok:
handle_lastfm_http_error(response=response, request_type="scrobble")
return response.status_code
def handle_lastfm_http_error(response, request_type):
"""Helper function for update_np and request_scrobble, which takes the returned response from an HTTP error, parses, and logs the information.
Args:
response (requests.Response): Response object that is returned by an HTTP request. Should only be responses where response.ok is false (status code >= 400)
request_type (str): A string indicating the source of the HTTP error ('NP' if it comes from an NP request, 'scrobble' if it comes from a scrobble request)
"""
http_error_str = f"An HTTP error occured while making a {request_type} request.\nHTTP error code {response.status_code}: {response.reason}"
try:
# Get error info sent from last.fm if available
root = ET.fromstring(response.content)
data = root.find('./error')
http_error_str += f"\nLast.fm error code {data.get('code')}: {data.text}"
except:
http_error_str += "\nCould not parse response data for more information."
print(http_error_str)
logging.error(http_error_str)
def setup():
"""Execution to run when the user has not established a web service session
Returns:
str: Established session key
"""
token = get_token()
# Prompt user to authorize for their account
link = f"http://www.last.fm/api/auth/?api_key={lastfm_api_key}&token={token}"
print(f"You should be redirected to a Last.fm page where you can authorize this application for your account. If not, please go here to do so: {link}")
webbrowser.open(link)
# Wait for user to confirm that they have authorized before proceeding
confirmation = ''
print("Please enter 'Y' to confirm once you have authorized the application for your account, or enter 'x' to cancel.")
while confirmation != 'Y':
confirmation = input()
if confirmation.lower() == 'x':
print("Exiting. Goodbye!")
sys.exit(0)
session_key = get_session_key(token)
print(f"Your session key is {session_key}")
return session_key
def run():
"""Execution to run when the user has already established a web service session"""
print("Running.")
# Loop - each iteration is a check to Spinitron for new song data. All paths have blocking of at least 5 seconds to avoid sending too many requests
miss_count = 0
last_spin_id = None
while True:
# Get most recent spin info from Spinitron
current_spin = r.get("https://spinitron.com/api/spins?count=1", headers=spinitron_headers).json()["items"][0]
# Parse song data, get time difference between song end and current time
spin_id = current_spin["id"]
song_end_datetime = parser.parse(current_spin["end"])
current_datetime = datetime.utcnow().replace(tzinfo=tz.UTC)
time_difference = (song_end_datetime - current_datetime).total_seconds()
# Check if a new song is playing
if (time_difference > 0) and (spin_id != last_spin_id):
miss_count = 0
print("Making NP request")
update_np(session_key=lastfm_session_key, artist=current_spin["artist"], track=current_spin["song"], album=current_spin["release"], duration=current_spin["duration"])
# Last.fm asks that we only scrobbly songs longer than 30 seconds
if current_spin["duration"] > 30:
# Idle until end of song, then make scrobble request
time.sleep(time_difference)
print("Making scrobble request")
request_scrobble(session_key=lastfm_session_key, artist=current_spin["artist"], track=current_spin["song"], timestamp=parser.parse(current_spin["end"]).timestamp(), album=current_spin["release"], duration=current_spin["duration"])
else:
too_short_str = f"Song length too short to scrobble. Waiting for {time_difference} seconds..."
print(too_short_str)
logging.info(too_short_str)
time.sleep(time_difference) # Idle until end of song
time.sleep(5)
else:
# Miss - loop has run without a new spin
miss_count += 1
# If miss occurs > 10 times in a row, idle for 5 minutes before next loop
if miss_count > 10:
miss_str = f"{miss_count} requests since last spin. Currently {-1*time_difference} seconds overdue according to last spin's end time value. Waiting 5 minutes before next request."
print(miss_str)
logging.info(miss_str)
time.sleep(270)
time.sleep(30)
last_spin_id = spin_id
if __name__ == '__main__':
# Ctrl+C handler
signal.signal(signal.SIGINT, signal_handler)
# Parse CLI args for --setup flag
cli_parser = argparse.ArgumentParser()
cli_parser.add_argument("--setup", action="store_true", help="Run script in setup mode (when a web service session has not been established)")
args = cli_parser.parse_args()
# Check if necessary env vars are either not present or left as example value
if (not (lastfm_api_key and lastfm_api_secret and spinitron_api_key)) or (any(all(char == 'x' for char in string.lower()) for string in [lastfm_api_key, lastfm_api_secret, spinitron_api_key])):
print("Please make sure you have set your LASTFM_API_KEY, LASTFM_API_SECRET, and SPINITRON_API_KEY values in the \".env\" file.")
else:
if args.setup:
# If setup flag was set, run setup first, then set the obtained session key in the .env file and run main execution
new_session_key = setup()
set_key(".env", "LASTFM_SESSION_KEY", new_session_key)
load_dotenv()
lastfm_session_key = os.getenv("LASTFM_SESSION_KEY")
run()
else:
# Check if session key variable is not present or left as example value
if (not lastfm_session_key) or all(char == 'x' for char in lastfm_session_key.lower()):
print("Please make sure you have set your LASTFM_SESSION_KEY value in the \".env\" file. If you have not yet established a web service session, please run the script in setup mode using the --setup argument.")
else:
run()