-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
165 lines (136 loc) · 5.44 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
""" python-squarelet handles authentication and requests to MuckRock services """
# Standard Library
import logging
from functools import partial
# Third Party
import ratelimit
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Local
from exceptions import APIError, CredentialsFailedError, DoesNotExistError
logger = logging.getLogger("squarelet")
BULK_LIMIT = 25
TIMEOUT = 20
RATE_LIMIT = 10
RATE_PERIOD = 1
DEFAULT_AUTH_URI = "https://accounts.muckrock.com/api/"
class SquareletClient:
"""Handles token auth and requests"""
def __init__(
self,
base_uri,
username=None,
password=None,
auth_uri=None,
timeout=TIMEOUT,
rate_limit=True,
rate_limit_sleep=True,
):
self.username = username
self.password = password
self.base_uri = base_uri
self.auth_uri = auth_uri or DEFAULT_AUTH_URI
self.timeout = timeout
self.session = requests.Session()
self.access_token = None
self.refresh_token = None
self._set_tokens()
# Apply rate limiting
if rate_limit:
# Apply rate limit decorator
self._request = ratelimit.limits(calls=RATE_LIMIT, period=RATE_PERIOD)(self._request)
# Apply sleep_and_retry if rate_limit_sleep is enabled
if rate_limit_sleep:
self._request = ratelimit.sleep_and_retry(self._request)
def _set_tokens(self):
"""Set the refresh and access tokens"""
if self.refresh_token:
self.access_token, self.refresh_token = self._refresh_tokens(self.refresh_token)
elif self.access_token:
pass # Already have access token, do nothing
else:
raise ValueError("No tokens found")
if self.access_token:
self.session.headers.update(
{"Authorization": f"Bearer {self.access_token}"}
)
def _get_tokens(self, username, password):
"""Get an access and refresh token in exchange for the username and password"""
response = self._post(
f"{self.auth_uri}/token/", json={"username": username, "password": password}
)
return response["access"], response["refresh"]
def _refresh_tokens(self, refresh_token):
"""Refresh the access and refresh tokens"""
response = self._post(
f"{self.auth_uri}/refresh/", json={"refresh": refresh_token}
)
return response["access"], response["refresh"]
def _request(self, method, url, raise_error=True, **kwargs):
"""Generic method to make API requests"""
# pylint: disable=method-hidden
logger.info("request: %s - %s - %s", method, url, kwargs)
# Add custom headers or other kwargs using the set_request_kwargs method
custom_kwargs = self.set_request_kwargs(**kwargs)
# Merge custom kwargs (headers, etc.) with the default kwargs
kwargs.update(custom_kwargs)
# Track if we should set tokens in case of 403/429 response
set_tokens = kwargs.pop("set_tokens", True)
full_url = kwargs.pop("full_url", False)
if not full_url:
url = f"{self.base_uri}{url}"
response = self.request_or_retry_session(session=self.session).request(
method, url, timeout=self.timeout, **kwargs
)
logger.debug("response: %s - %s", response.status_code, response.content)
if response.status_code in [403, 429] and set_tokens:
self._set_tokens() # Refresh tokens
kwargs["set_tokens"] = False # Prevent infinite loop
return self._request(
method, url, full_url=True, **kwargs
) # Retry the request
if raise_error:
self.raise_for_status(response)
return response
def set_request_kwargs(self, **kwargs):
"""Allow clients to customize request kwargs (e.g., adding headers)"""
return {}
def __getattr__(self, attr):
"""Generate methods for each HTTP request type (GET, POST, etc.)"""
methods = ["get", "post", "put", "delete", "patch", "head", "options"]
if attr in methods:
return partial(self._request, attr)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{attr}'"
)
def requests_retry_session(
self,
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
"""Automatic retries for HTTP requests"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def raise_for_status(self, response):
"""Raise for status with a custom error class"""
try:
response.raise_for_status()
except requests.exceptions.RequestException as exc:
if exc.response.status_code == 404:
raise DoesNotExistError(response=exc.response) from exc
if exc.response.status_code == 401:
raise CredentialsFailedError(response=exc.response) from exc
raise APIError(response=exc.response) from exc