forked from kgiusti/oslo-messaging-clients
-
Notifications
You must be signed in to change notification settings - Fork 0
/
my-client.py
executable file
·125 lines (108 loc) · 4.78 KB
/
my-client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
#
#import eventlet
#eventlet.monkey_patch()
import optparse, sys, time
import logging
from oslo.config import cfg
from oslo import messaging
loggy = logging.getLogger(__name__)
loggy.setLevel(logging.WARNING)
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)
loggy.addHandler(ch)
def handle_config_option(option, opt_string, opt_value, parser):
name, value = opt_value
print "Name=%s" % name
print "Value=%s" % value
setattr(cfg.CONF, name, int(float(value)))
def main(argv=None):
_usage = """Usage: %prog [options] <topic> <method> [<arg-name> <arg-value>]*"""
parser = optparse.OptionParser(usage=_usage)
parser.add_option("--exchange", action="store", default="my-exchange")
parser.add_option("--server", action="store")
parser.add_option("--namespace", action="store", default="my-namespace")
parser.add_option("--fanout", action="store_true")
parser.add_option("--timeout", action="store", type="int")
parser.add_option("--cast", action="store_true")
parser.add_option("--repeat", action="store", type="int", default=1,
help="Repeat the request N times (0=forever)")
parser.add_option("--version", action="store", default="1.1")
parser.add_option("--url", action="store", default="qpid://localhost")
parser.add_option("--topology", action="store", type="int", default=2,
help="QPID Topology version to use.")
parser.add_option("--auto-delete", action="store_true",
help="Set amqp_auto_delete to True")
parser.add_option("--durable", action="store_true",
help="Set amqp_durable_queues to True")
parser.add_option("--config", action="callback",
callback=handle_config_option, nargs=2, type="string",
help="set a config variable (--config name value)")
parser.add_option("--payload", type="string",
help="Path to a data file to use as message body.")
parser.add_option("--quiet", action="store_true",
help="Supress console output")
opts, extra = parser.parse_args(args=argv)
if not extra:
print "<topic> not supplied!!"
return -1
topic = extra[0]
extra = extra[1:]
if not opts.quiet: print "Calling server on topic %s, server=%s exchange=%s namespace=%s fanout=%s" % (
topic, opts.server, opts.exchange, opts.namespace, str(opts.fanout))
logging.basicConfig(level=logging.WARNING) #make this an option
method = None
args = {}
if extra:
method = extra[0]
extra = extra[1:]
args = dict([(extra[x], extra[x+1]) for x in range(0, len(extra)-1, 2)])
if not opts.quiet: print "Method=%s, args=%s" % (method, str(args))
if opts.payload:
if not opts.quiet: print("Loading payload file %s" % opts.payload)
with open(opts.payload) as f:
args["payload"] = f.read()
transport = messaging.get_transport(cfg.CONF, url=opts.url)
if opts.topology:
if not opts.quiet: print "Using QPID topology version %d" % opts.topology
cfg.CONF.qpid_topology_version = opts.topology
if opts.auto_delete:
if not opts.quiet: print "Enable auto-delete"
cfg.CONF.amqp_auto_delete = True
if opts.durable:
if not opts.quiet: print "Enable durable queues"
cfg.CONF.amqp_durable_queues = True
target = messaging.Target(exchange=opts.exchange,
topic=topic,
namespace=opts.namespace,
server=opts.server,
fanout=opts.fanout,
version=opts.version)
client = messaging.RPCClient(transport, target,
timeout=opts.timeout,
version_cap=opts.version)
test_context = {"application": "my-client",
"time": time.ctime(),
"cast": opts.cast}
start = time.time()
repeat = 0
while opts.repeat == 0 or repeat < opts.repeat:
try:
if opts.cast:
client.cast( test_context, method, **args )
else:
rc = client.call( test_context, method, **args )
if not opts.quiet: print "Return value=%s" % str(rc)
except Exception as e:
loggy.error("Unexpected exception occured: %s" % str(e))
return -1
repeat += 1
stop = time.time()
delta = (stop - start)
pktsec = (repeat / delta)
print ("Payload file: %s\nPacket sent/received: %s\nTime (sec): %s\nPackets/sec: %s" % (opts.payload, repeat, delta, pktsec))
# @todo Need this until synchronous send available
transport.cleanup()
return 0
if __name__ == "__main__":
sys.exit(main())