-
Notifications
You must be signed in to change notification settings - Fork 0
/
load_all_diffs.py
executable file
·134 lines (102 loc) · 3.32 KB
/
load_all_diffs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3.10
# Args:
## $1 - token in format "user:password" for API authentication
## $2 - dst json file with XXX sequence for replacing with backup number
## $3..$x - source csv files for searching & loading raw data from API
import csv
from datetime import datetime
import json
import requests
from requests.auth import HTTPBasicAuth
import sys
import time
# Usually 1000 requests per 1 hour, but saving some jitter values
# https://support.atlassian.com/bitbucket-cloud/docs/api-request-limits/
BITBUCKET_RATE_LIMIT = 970
BITBUCKET_RATE_LIMIT_INTERVAL_SECONDS = 3600 + 60
API_PREFIX = "https://api.bitbucket.org/2.0/"
PATTER_REPLACE_VALUE = "XXX"
def init():
try:
# see https://stackoverflow.com/a/15063941/6818663
csv.field_size_limit(sys.maxsize)
except OverflowError:
maxLong = (1 << 31) - 1
# Looks like Windows uses long instead of long long
csv.field_size_limit(maxLong)
def parse_args(argv):
USER_PASS = argv[1]
userPassSplit = USER_PASS.split(':')
global AUTH
AUTH = HTTPBasicAuth(userPassSplit[0], userPassSplit[1])
global DST_FILE
DST_FILE = argv[2]
global SRC_FILES
SRC_FILES = []
for p in argv[3:]:
SRC_FILES.append(p)
def single_query(url):
res = requests.get(url, auth=AUTH)
res.raise_for_status()
return res.text
def load_csv_data():
res = []
for f in SRC_FILES:
with open(f, "r", encoding="utf8") as src:
inReader = csv.reader(src)
for row in inReader:
for r in row:
res.append(r)
return res
def select_only_urls(data):
res = [d for d in data if d.startswith(API_PREFIX)]
# need only unique urls
res = set(res)
return res
def load_data_from_urls_with_backup(urls):
step = 0
i = 0
# For saving results
res = {}
for d in urls:
if i == 0:
ts = time.time()
i += 1
try:
res[d] = single_query(d)
except requests.exceptions.HTTPError as e:
print(f"HTTP Exception was caught for url '{d}'")
print(f"HTTP code {e.response.status_code}")
print(e.response.text)
print()
except Exception as e:
print(f"Exception was caught for url '{d}'")
print(e)
print()
if i >= BITBUCKET_RATE_LIMIT:
i = 0
step += 1
# temp saving
jsonTxt = json.dumps(res)
resFileName = DST_FILE.replace(PATTER_REPLACE_VALUE, str(step))
with open(resFileName, "w", encoding="utf8") as f:
f.write(jsonTxt)
print("File", resFileName, "was written")
newTs = ts + BITBUCKET_RATE_LIMIT_INTERVAL_SECONDS
tsDiff = newTs - time.time()
print(datetime.now(), "Will get up in", tsDiff, "seconds")
time.sleep(tsDiff)
# saving full results
jsonTxt = json.dumps(res)
resFileName = DST_FILE.replace(PATTER_REPLACE_VALUE, 'final')
with open(resFileName, "w", encoding="utf8") as f:
f.write(jsonTxt)
print("Final file", resFileName, "was written")
def main(argv):
init()
parse_args(argv)
data = load_csv_data()
urls = select_only_urls(data)
load_data_from_urls_with_backup(urls)
if __name__ == '__main__':
main(sys.argv)