-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathupload_bike_data.py
149 lines (110 loc) · 3.88 KB
/
upload_bike_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#%%
from io import StringIO
from itertools import chain
import json
from multiprocessing import Pool
import json
import logging
from urllib.request import urlopen
import boto3
from tqdm.contrib.concurrent import process_map
bucket = "biking-data"
csv_buffer = StringIO()
base_bike_url = "https://iot.hamburg.de/v1.1/Things?$skip=0&$top=5000&$filter=((properties%2FownerThing+eq+%27DB+Connect%27))"
def get_stations():
response = json.loads(urlopen(base_bike_url).read())
return response["value"]
def get_station_urls(station):
try:
thingID = station["@iot.id"]
datastream_url = station["[email protected]"]
description = station["description"]
except ValueError:
logging.error("Not valid value found.")
return {
"thingID": thingID,
"datastream_url": datastream_url,
"description": description,
}
def get_obs_stream(station):
try:
response = json.loads(urlopen(station["datastream_url"]).read())["value"]
keep_datastream = response[0]
obs_stream = keep_datastream["[email protected]"]
coordinatesAll = keep_datastream["observedArea"]["coordinates"]
coordinatesX = "NA"
coordinatesY = "NA"
def get_correct_vals(item):
if any(isinstance(subitem, list) for subitem in item):
for subitem in item:
if any(isinstance(subsubitem, list) for subsubitem in subitem):
return get_correct_vals(subitem)
if len(subitem) == 2:
if (
subitem[0] > 0
and subitem[0] <= 180
and subitem[1] > 0
and subitem[1] <= 180
):
return subitem[0], subitem[1]
else:
return item[0], item[1]
coordinatesX, coordinatesY = get_correct_vals(coordinatesAll)
return {
"thingID": station["thingID"],
"description": station["description"],
"obs_stream": obs_stream,
"coordinatesX": coordinatesX,
"coordinatesY": coordinatesY,
}
except KeyError:
logging.error(
f"No proper observation stream for ID {station['thingID']} found."
)
def get_obs(station):
obs_url = (
station["obs_stream"]
+ "?$top=5000&$skip={}&$filter=date(phenomenontime)+gt+date('2010-01-01')"
)
list_of_obs = []
for i in range(0, 10000000, 5000):
obs_iter = json.loads(urlopen(obs_url.format(i)).read())["value"]
if len(obs_iter) == 0:
break
list_of_obs = list_of_obs + obs_iter
return list_of_obs
def clean_obs(obs):
observationID = obs["@iot.id"]
resultTime = obs["resultTime"]
result = obs["result"]
return {
"result": result,
"resultTime": resultTime,
"observationID": observationID,
}
def export_to_s3(name, file):
s3_resource = boto3.resource("s3")
s3_resource.Object(bucket, name).put(
Body=(bytes(json.dumps(file, indent=4).encode("UTF-8")))
)
def export_obs_for_station(station):
list_of_cleaned_obs = []
station_stream = get_station_urls(station)
obs_stream = get_obs_stream(station_stream)
all_obs = get_obs(obs_stream)
for obs in all_obs:
clean_ob = clean_obs(obs)
list_of_cleaned_obs.append(clean_ob)
meta_dict = {
key: obs_stream[key]
for key in ("thingID", "description", "coordinatesX", "coordinatesY")
}
meta_dict["obs"] = list_of_cleaned_obs
filename = f"bike_station_{station['@iot.id']}.json"
export_to_s3(filename, meta_dict)
def run():
all_stations = get_stations()
with Pool() as p:
res = process_map(export_obs_for_station, all_stations)
if __name__ == "__main__":
run()