-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmk_index.py
239 lines (220 loc) · 8.77 KB
/
mk_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#python mk_index.py & excute it in backgroud
# import data from squid.blod of traffic server to elastic search
import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import os.path
import psutil
import subprocess
import time
#Log data information
FILE_PATH_BLOG = "/usr/local/var/log/trafficserver/squid.blog"
FILE_PATH = "./squid.log.elasticsearch"
#ES information
ES_HOST = {
"host" : "10.0.0.158",
"port" : 9200
}
INDEX_NAME = 'ats'
TYPE_NAME = 'accesslog'
COMMIT_DATA_PER_TIME =5000
bulk_data = []
POLL_INTERVAL = 10 #10 seconds
#For runCommand
procs_id = 0
procs = {}
procs_data = []
# create ES client, create index
es = Elasticsearch(hosts = [ES_HOST])
def indexPrepare():
""" #delete index
if es.indices.exists(INDEX_NAME):
print("deleting '%s' index..." % (INDEX_NAME))
res = es.indices.delete(index = INDEX_NAME)
print(" response: '%s'" % (res))
"""
request_body = {
"mappings" : {
"accesslog" : {
"properties" : {
"accessTime" : {
"type" : "date" ,
"format" : "dateOptionalTime"
},
"authUserName" : {
"type" : "string"
},
"cacheResult" : {
"type" : "string"
},
"cacheCode" : {
"type" : "long"
},
"clientIP" : {
"type" : "string"
},
"contentLength" : {
"type" : "long"
},
"contentType" : {
"type" : "string"
},
"hierarchyRoute" : {
"type" : "string"
},
"requestMethod" : {
"type" : "string",
"index" : "not_analyzed"
},
"requestURL" : {
"type" : "string",
"index" : "not_analyzed"
},
"responseCode" : {
"type" : "long"
},
"routerServer" : {
"type" : "string"
},
"spentTime" : {
"type" : "long"
}
}
}
},
"settings" : {
"number_of_shards": 5,
"number_of_replicas": 0
}
}
if not es.indices.exists(INDEX_NAME):
print("creating '%s' index..." % (INDEX_NAME))
res = es.indices.create(index = INDEX_NAME, body = request_body)
print(" response: '%s'" % (res))
def indexBulkData():
bulk_data = []
logFile = open(FILE_PATH,'r')
line_cnt= 0
for line in logFile:
line= line.replace("\n","")
items=line.split(' ')
#data example
# ['1424376277.821', '0', '10.0.0.210', 'TCP_MEM_HIT/200',
# '86949', 'GET', 'http://www.citrix.co.jp/products.html?posit=glnav', '-', 'NONE/-', 'text/html']
cacheCode =0
# refer to squid-netscape-result-codes
#https://www.websense.com/content/support/library/web/v78/wcg_help/cachrslt.aspx
#Cache Hit : tcp_hit,tcp_refresh_hit, tcp_mem_hit, tcp_ims_hit
cacheResult = items[3].split('/')[0].lower()
if (cacheResult == "tcp_hit" or cacheResult == "tcp_ims_hit" or cacheResult == "tcp_mem_hit" or cacheResult == "tcp_refresh_hit" ):
cacheCode = 1
data_dict = {
"_index": INDEX_NAME,
"_type": TYPE_NAME,
"_source": {
'accessTime': datetime.datetime.fromtimestamp(float(items[0])).strftime('%Y-%m-%dT%H:%M:%SZ'), #The client request timestamp
'spentTime': items[1], #The time Traffic Server spent processing the client request.
#The number of milliseconds between the time the client established the connection with Traffic Server
#and the time Traffic Server sent the last byte of the response back to the client.
'clientIP': items[2], #The IP address of the client's host machine.
'cacheResult': items[3].split('/')[0], #The cache result code; how the cache responded to the request: HIT, MISS, and so on.
'cacheCode' : cacheCode, #1 Cache Hit(), 0 Cache MISS()
'responseCode': items[3].split('/')[1], #The proxy response status code (the HTTP response status code from Traffic Server to client
'contentLength': items[4], #The length of the Traffic Server response to the client in bytes, including headers and content.
'requestMethod': items[5], #The client request method: GET, POST, and so on.
'requestURL': items[6], #The client request canonical URL;
#blanks and other characters that might not be parsed by log analysis tools are
#replaced by escape sequences. The escape sequence is a percentage sign
#followed by the ASCII code number of the replaced character in hex.
'authUserName': items[7], #The username of the authenticated client.
#A hyphen (-) means that no authentication was required.
'hierarchyRoute': items[8].split('/')[0], # The proxy hierarchy route.
'routerServer': items[8].split('/')[1], # The route Traffic Server used to retrieve the object.
'contentType': items[9] # The proxy response content type. The object content type taken from the Traffic Server response header.
}
}
#bulk_data.append(op_dict)
bulk_data.append(data_dict)
line_cnt = line_cnt +1
if line_cnt == COMMIT_DATA_PER_TIME:
# bulk index the data
#print("bulk indexing...")
#print(bulk_data)
helpers.bulk(es,bulk_data)
es.indices.refresh()
#print(" response: '%s'" % (res))
line_cnt =0
bulk_data = []
logFile.close()
#print(bulk_data)
if len(bulk_data) >0:
#print bulk_data
res = helpers.bulk(es,bulk_data)
es.indices.refresh()
# sanity check
#print("searching...")
res = es.search(index = INDEX_NAME, size=3, body={"query": {"match_all": {}}})
#print(" response: '%s'" % (res))
# Runs command silently
def runCommand(cmd, use_shell = False, return_stdout = False, busy_wait = False, poll_duration = 0.5):
# Sanitize cmd to string
cmd = map(lambda x: '%s' % x, cmd)
if return_stdout:
proc = psutil.Popen(cmd, shell = use_shell, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
else:
proc = psutil.Popen(cmd, shell = use_shell,
stdout = open('/dev/null', 'w'),
stderr = open('/dev/null', 'w'))
global procs_id
global procs
global procs_data
proc_id = procs_id
procs[proc_id] = proc
procs_id += 1
data = { }
#print(proc_id)
while busy_wait:
returncode = proc.poll()
if returncode == None:
try:
data = proc.as_dict(attrs = ['io_counters', 'cpu_times'])
except Exception as e:
pass
time.sleep(poll_duration)
else:
break
(stdout, stderr) = proc.communicate()
returncode = proc.returncode
del procs[proc_id]
if returncode != 0:
raise Exception(stderr)
else:
if data:
procs_data.append(data)
return stdout
#Conver squid.blog to squid.log.elasticsearch if exists
def prepareLogFile():
result = True
result = os.path.isfile(FILE_PATH_BLOG)
if os.path.isfile(FILE_PATH):
#rm -f ./squid.log.elasticsearch
cmd = ['rm','-f', FILE_PATH]
#print(cmd,)
runCommand(cmd, return_stdout = False, busy_wait = True)
if result: #In case squid.blog exsits
#traffic_logcat /usr/local/var/log/trafficserver/squid.blog -o ./squid.log.elasticsearch
cmd = ['traffic_logcat',FILE_PATH_BLOG,'-o',FILE_PATH]
#print(cmd,)
runCommand(cmd, return_stdout = False, busy_wait = True)
result = os.path.isfile(FILE_PATH) #double check FILE_PATH exists
f = open(FILE_PATH_BLOG,'w') #clear the contents
f.close()
return result
#Main function
if __name__ == '__main__':
while True:
if prepareLogFile(): #if squid.blog exists and converted to squid.log.elasticsearch with traffic_logcat sucessfully
#print("Inserting...")
indexPrepare()
indexBulkData()
time.sleep(POLL_INTERVAL)