-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqueryAPI.py
281 lines (209 loc) · 9.36 KB
/
queryAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
__author__ = 'Mauricio Roman'
""" This module contains functions for generic querying of Loggly API
The code for initiating the search and fetching the results was forked from
https://github.com/mostlyjason/LogglyExportTools
This API has functions to grab more than 5000 results from the Loggly API
"""
import json, csv, time, pytz, traceback
import re, math, sys
import requests # For REST API
import urllib2, simplejson # For logging with Loggly
from numpy import *
import datetime
def initiateSearch(accountFqdn, query, searchFrom, searchTo, username, password):
""" Initiates the search, and returns a temporary ID """
# We request results in pages of 500 events
# We search in ascending order, so as to build a constantly growing timeline
search_url = ("https://" + accountFqdn + "/apiv2/search?q=" + query + "&from=" +
str(searchFrom) + "&until=" + str(searchTo) + "&order=asc&size=500")
# print "Search URL: " + search_url
# We launch the search
r = requests.get(search_url, auth=(username, password), timeout=60, verify=False)
try:
rsid = r.json()['rsid']['id']
# print "rsid: " + str(rsid)
except ValueError:
print "Error obtaining data"
return -1
return rsid
def fetchResults(accountFqdn, rsid, username, password, newFile, formatFunc, jsonFlag):
""" Collects the results from a search that was initialized, one page at a time, and returns the entire list
As one of its inputs, it requires a formatting function, located in the queryFormat module
"""
#We loop through all the pages, each of which has 500 events, up to max_events
page = 0
d = []
max_events = 5000
total_events = 1
page_size = 500
events_loaded = 0
while(page * page_size < min(total_events, max_events)):
results_url = "http://" + accountFqdn + "/apiv2/events"
url_params = {'rsid':str(rsid),'page':str(page)}
retries = 0
resp1 = ""
resp2 = ""
while(retries < 5):
r = requests.get(results_url, params=url_params, auth=(username, password), timeout=300, verify=False)
try:
#resp1 is the raw json tree, which includes info on total events
resp1 = r.json()
#resp2 is a table with the features we want
try:
resp2 = formatFunc(resp1)
except KeyError:
raise ValueError('invalid input for format function')
newFile = 0
break
except ValueError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
print "*** print_tb:"
traceback.print_tb(exc_traceback, file=sys.stdout)
text = ""
if(len(r.text) > 128):
text = r.text[0:127]
else:
text = r.text
print "Error status code: " + str(e) + " " + str(r.status_code) + " " + str(text)
retries = retries + 1
# End Inner While Loop
if(retries == 5):
print "Too many retries, giving up"
break
# If this is the first page...
if not d:
total_events = resp1['total_events']
# If total_events exceeds return capacity, break and return
if jsonFlag:
d = {}
d['events'] = []
if total_events > max_events:
break
if jsonFlag:
for e in resp2['events']:
d['events'].append(e)
# Upper bound on events loaded...
events_loaded = len(d['events'])
else:
for e in resp2['events']:
d.append(e)
# Exact number of events loaded
events_loaded = len(d)
page = page + 1
total = [total_events, events_loaded]
return (total, d)
def getSearch(query, formatFunc, searchFrom_init, searchToFinal, step,
outputFile, newFile, jsonFlag,
accountFqdn, username, password, loggly_key = None):
""" Collects search results in several steps, adjusting the time step dynamically """
session_id = int(round(time.time() * 1000))
# If our flag indicates a new file, open for writing, else append to existing file
if newFile:
f = open(outputFile, 'w')
else:
f = open(outputFile, 'a')
# If writing to CSV, open file as CSV
if not jsonFlag:
wr = csv.writer(f)
else:
totalData = {}
totalData['events'] = []
i = 0
delta_step = []
searchFrom = []
searchTo = []
total_events = []
events_loaded = []
delta_step.append(step) # Step in seconds
searchFrom.append(searchFrom_init)
searchTo.append(searchFrom[0] + datetime.timedelta(0,delta_step[0]))
print "step, total events, events loaded"
while searchTo[i] <= searchToFinal and searchTo[i] != searchFrom[i]:
rsid = initiateSearch(accountFqdn, query, searchFrom[i], searchTo[i],
username, password)
if rsid > 0:
newFile = 0
([x1, x2], data) = fetchResults(accountFqdn, rsid, username, password, newFile, formatFunc, jsonFlag)
total_events.append(x1)
events_loaded.append(x2)
# Print to console for control of process
print str(i) + "," + str(total_events[i]) + "," + str(events_loaded[i])
# We only write to file if the total events returned is less than the limit
write_flag = (total_events[i] <= events_loaded[i])
# We log the query results, before changing the search from and to parameters
if loggly_key is not None:
logQuery(loggly_key, session_id, searchFrom[i], searchTo[i], events_loaded[i], total_events[i], write_flag, i)
# Calculate the time step dynamically based on previous data
delta_step.append(getTimeStep(searchFrom, searchTo, searchToFinal, total_events, write_flag, i, step))
# Write only if we get all the records that we asked for, as there is no guarantee that, if we receive
# a subset of all records, that they will start at the SearchFrom time
if(write_flag):
if jsonFlag:
for e in data['events']:
totalData['events'].append(e)
else:
for item in data:
wr.writerow(item)
searchFrom.append(searchTo[i])
else:
searchFrom.append(searchFrom[i])
#endif
searchTo.append(searchFrom[i+1] + datetime.timedelta(0,delta_step[i+1]))
else:
print "Could not initiate search...exiting"
break
time.sleep(3) # Pause a few seconds so that we do not overwhelm Loggly servers
i += 1
if jsonFlag:
print "Total Events: " + str(len(totalData['events']))
f.write(json.dumps(totalData))
f.close()
return delta_step.pop()
def getTimeStep(searchFrom, searchTo, searchToFinal, total_events, write_flag, count, step):
""" Calculates time step...taking the latest history into account
The API returns 5000 records maximum, and there is no guarantee that
these will be time aligned with the initial time requested. For simplicity,
we prefer to only accept returns that are below 5000 records """
max_events = 5000
vel = []
# Calculate average "speed" using the last 30 records
for i in range(count+1)[-30:]:
diff = searchTo[i] - searchFrom[i]
delta_sec = diff.total_seconds()
vel.append(total_events[i] / delta_sec)
vel = array(vel)
avg = average(vel)
std_dev = std(vel)
if(avg + std_dev == 0):
return step
#If we failed in our previous estimation, use the point velocity measure plus one std. dev.
if not write_flag:
delta_sec = max_events / (vel[-1:] + std_dev)
# if we are not writing, the max time step can't be more than than the final time minus the current from
maxDelta = (searchToFinal - searchFrom[count]).total_seconds()
if(int(asscalar(delta_sec)) > maxDelta):
return maxDelta
#Otherwise, let us just use the average plus one std. dev.
else:
delta_sec = max_events / (avg + std_dev)
# if we are writing, the max time step can't be more than the final minus the current to, which will become the new from time
maxDelta = (searchToFinal - searchTo[count]).total_seconds()
if(int(asscalar(delta_sec)) > maxDelta):
return maxDelta
return asscalar(delta_sec)
def logQuery(loggly_key, session_id, searchFrom, searchTo, events_loaded, total_events, write_flag, count):
""" Sends a log to Loggly reporting the status of the search """
log_data = "PLAINTEXT=" + urllib2.quote(simplejson.dumps(
{
'timestamp':str(datetime.datetime.today()) , 'level':'info', 'session_id':session_id,
'from':str(searchFrom), 'until':str(searchTo),
'events_loaded':events_loaded, 'total_events':total_events, 'write_flag':write_flag, 'count':count
}))
# Send log data to Loggly
urllib2.urlopen("https://logs-01.loggly.com/inputs/" + loggly_key + "/tag/queryAPI/", log_data)
def getUTCtime(timezone, t0, t1, dst0, dst1):
""" Transforms time from 'timezone' to UTC """
local = pytz.timezone (timezone)
t0 = local.localize(t0, is_dst=dst0).astimezone(pytz.utc)
t1 = local.localize(t1, is_dst=dst1).astimezone(pytz.utc)
return t0, t1