-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathgNMI_Subscribe.py
277 lines (227 loc) · 12 KB
/
gNMI_Subscribe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/python
##############################################################################
# #
# gNMI_Subscribe.py #
# #
# History Change Log: #
# #
# 1.0 [SW] 2017/06/02 first version #
# 1.1 [SW] 2017/07/06 timeout behavior improved #
# 1.2 [SW] 2017/08/08 logging improved, options added #
# 1.3 [SW] 2017/12/04 support for gNMI v0.4 #
# #
# Objective: #
# #
# Testing tool for the gNMI (GRPC Network Management Interface) in Python #
# #
# Features supported: #
# #
# - gNMI Subscribe (based on Nokia SROS 15.0 TLM feature-set) #
# - secure and insecure mode #
# - multiple subscriptions paths #
# #
# Not yet supported: #
# #
# - Disable server name verification against TLS cert (opt: noHostCheck) #
# - Disable cert validation against root certificate (InsecureSkipVerify) #
# #
# License: #
# #
# Licensed under the MIT license #
# See LICENSE.md delivered with this project for more information. #
# #
# Author: #
# #
# Sven Wisotzky #
# mail: sven.wisotzky(at)nokia.com #
##############################################################################
"""
gNMI Subscribe Client in Python Version 1.3
Copyright (C) 2017 Nokia. All Rights Reserved.
"""
__title__ = "gNMI_Subscribe"
__version__ = "1.3"
__status__ = "released"
__author__ = "Sven Wisotzky"
__date__ = "2017 December 4th"
##############################################################################
import argparse
import re
import sys
import os
import logging
import time
##############################################################################
def list_from_path(path='/'):
if path:
if path[0]=='/':
if path[-1]=='/':
return re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[1:-1]
else:
return re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[1:]
else:
if path[-1]=='/':
return re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[:-1]
else:
return re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)
return []
def path_from_string(path='/'):
mypath = []
for e in list_from_path(path):
eName = e.split("[", 1)[0]
eKeys = re.findall('\[(.*?)\]', e)
dKeys = dict(x.split('=', 1) for x in eKeys)
mypath.append(gnmi_pb2.PathElem(name=eName, key=dKeys))
return gnmi_pb2.Path(elem=mypath)
def gen_request( opt ):
mysubs = []
for path in opt.xpaths:
mypath = path_from_string(path)
mysub = gnmi_pb2.Subscription(path=mypath, mode=opt.submode, suppress_redundant=opt.suppress, sample_interval=opt.interval*1000000000, heartbeat_interval=opt.heartbeat)
mysubs.append(mysub)
if opt.prefix:
myprefix = path_from_string(opt.prefix)
else:
myprefix = None
if opt.qos:
myqos = gnmi_pb2.QOSMarking(marking=opt.qos)
else:
myqos = None
mysblist = gnmi_pb2.SubscriptionList(prefix=myprefix, mode=opt.mode, allow_aggregation=opt.aggregate, encoding=opt.encoding, subscription=mysubs, use_aliases=opt.use_alias, qos=myqos)
mysubreq = gnmi_pb2.SubscribeRequest( subscribe=mysblist )
log.info('Sending SubscribeRequest\n'+str(mysubreq))
yield mysubreq
##############################################################################
if __name__ == '__main__':
prog = os.path.splitext(os.path.basename(sys.argv[0]))[0]
parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version=prog+' '+__version__)
group = parser.add_mutually_exclusive_group()
group.add_argument('-q', '--quiet', action='store_true', help='disable logging')
group.add_argument('-v', '--verbose', action='count', help='enhanced logging')
group = parser.add_argument_group()
group.add_argument('--server', default='localhost:57400', help='server/port (default: localhost:57400)')
group.add_argument('--username', default='admin', help='username (default: admin)')
group.add_argument('--password', default='admin', help='password (default: admin)')
group.add_argument('--cert', metavar='<filename>', help='CA root certificate')
group.add_argument('--tls', action='store_true', help='enable TLS security')
group.add_argument('--ciphers', help='override environment "GRPC_SSL_CIPHER_SUITES"')
group.add_argument('--altName', help='subjectAltName/CN override for server host validation')
group.add_argument('--noHostCheck', action='store_true', help='disable server host validation')
group = parser.add_argument_group()
group.add_argument('--logfile', metavar='<filename>', type=argparse.FileType('wb', 0), default='-', help='Specify the logfile (default: <stdout>)')
group.add_argument('--stats', action='store_true', help='collect stats')
group = parser.add_argument_group()
group.add_argument('--interval', default=10, type=int, help='sample interval (default: 10s)')
group.add_argument('--timeout', type=int, help='subscription duration in seconds (default: none)')
group.add_argument('--heartbeat', type=int, help='heartbeat interval (default: none)')
group.add_argument('--aggregate', action='store_true', help='allow aggregation')
group.add_argument('--suppress', action='store_true', help='suppress redundant')
group.add_argument('--submode', default=2, type=int, help='subscription mode [TARGET_DEFINED, ON_CHANGE, SAMPLE]')
group.add_argument('--mode', default=0, type=int, help='[STREAM, ONCE, POLL]')
group.add_argument('--encoding', default=0, type=int, help='[JSON, BYTES, PROTO, ASCII, JSON_IETF]')
group.add_argument('--qos', default=0, type=int, help='[JSON, BYTES, PROTO, ASCII, JSON_IETF]')
group.add_argument('--use_alias', action='store_true', help='use alias')
group.add_argument('--prefix', default='', help='gRPC path prefix (default: none)')
group.add_argument('xpaths', nargs=argparse.REMAINDER, help='path(s) to subscriber (default: /)')
options = parser.parse_args()
if len(options.xpaths)==0:
options.xpaths=['/']
if options.ciphers:
os.environ["GRPC_SSL_CIPHER_SUITES"] = options.ciphers
# setup logging
if options.quiet:
loghandler = logging.NullHandler()
loglevel = logging.NOTSET
else:
if options.verbose==None:
logformat = '%(asctime)s,%(msecs)-3d %(message)s'
else:
logformat = '%(asctime)s,%(msecs)-3d %(levelname)-8s %(threadName)s %(message)s'
if options.verbose==None or options.verbose==1:
loglevel = logging.INFO
else:
loglevel = logging.DEBUG
# For supported GRPC trace options check:
# https://github.com/grpc/grpc/blob/master/doc/environment_variables.md
if options.verbose==3:
os.environ["GRPC_TRACE"] = "all"
os.environ["GRPC_VERBOSITY"] = "ERROR"
if options.verbose==4:
os.environ["GRPC_TRACE"] = "api,call_error,channel,connectivity_state,op_failure"
os.environ["GRPC_VERBOSITY"] = "INFO"
if options.verbose==5:
os.environ["GRPC_TRACE"] = "all"
os.environ["GRPC_VERBOSITY"] = "INFO"
if options.verbose==6:
os.environ["GRPC_TRACE"] = "all"
os.environ["GRPC_VERBOSITY"] = "DEBUG"
timeformat = '%y/%m/%d %H:%M:%S'
loghandler = logging.StreamHandler(options.logfile)
loghandler.setFormatter(logging.Formatter(logformat, timeformat))
log = logging.getLogger(prog)
log.setLevel(loglevel)
log.addHandler(loghandler)
try:
import grpc
import gnmi_pb2
except ImportError as err:
log.error(str(err))
quit()
if options.tls or options.cert:
log.debug("Create SSL Channel")
if options.cert:
cred = grpc.ssl_channel_credentials(root_certificates=open(options.cert).read())
opts = []
if options.altName:
opts.append(('grpc.ssl_target_name_override', options.altName,))
if options.noHostCheck:
log.error('Disable server name verification against TLS cert is not yet supported!')
# TODO: Clarify how to setup gRPC with SSLContext using check_hostname:=False
channel = grpc.secure_channel(options.server, cred, opts)
else:
log.error('Disable cert validation against root certificate (InsecureSkipVerify) is not yet supported!')
# TODO: Clarify how to setup gRPC with SSLContext using verify_mode:=CERT_NONE
cred = grpc.ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None)
channel = grpc.secure_channel(options.server, cred)
else:
log.info("Create insecure Channel")
channel = grpc.insecure_channel(options.server)
log.debug("Create gNMI stub")
stub = gnmi_pb2.gNMIStub(channel)
req_iterator = gen_request( options )
metadata = [('username',options.username), ('password', options.password)]
msgs = 0
upds = 0
secs = 0
start = 0
try:
responses = stub.Subscribe(req_iterator, options.timeout, metadata=metadata)
for response in responses:
if response.HasField('sync_response'):
log.debug('Sync Response received\n'+str(response))
secs += time.time() - start
start = 0
if options.stats:
log.info("%d updates and %d messages within %1.2f seconds", upds, msgs, secs)
log.info("Statistics: %5.0f upd/sec, %5.0f msg/sec", upds/secs, msgs/secs)
elif response.HasField('error'):
log.error('gNMI Error '+str(response.error.code)+' received\n'+str(response.error.message))
elif response.HasField('update'):
if start==0:
start=time.time()
msgs += 1
upds += len(response.update.update)
if not options.stats:
log.info('Update received\n'+str(response))
else:
log.error('Unknown response received:\n'+str(response))
except KeyboardInterrupt:
log.info("%s stopped by user", prog)
except grpc.RpcError as x:
log.error("grpc.RpcError received:\n%s", x.details)
except Exception as err:
log.error(err)
if (msgs>1):
log.info("%d update messages received", msgs)
# EOF