forked from benhoyt/graphyte
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraphyte.py
224 lines (192 loc) · 8.42 KB
/
graphyte.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""Send data to Graphite metrics server (synchronously or on a background thread).
For example usage, see README.rst.
This code is licensed under a permissive MIT license -- see LICENSE.txt.
The graphyte project lives on GitHub here:
https://github.com/Jetsetter/graphyte
"""
import atexit
import logging
try:
import queue
except ImportError:
import Queue as queue # Python 2.x compatibility
import socket
import threading
import time
__all__ = ['Sender', 'init', 'send']
__version__ = '1.5'
default_sender = None
logger = logging.getLogger(__name__)
class Sender:
def __init__(self, host, port=2003, prefix=None, timeout=5, interval=None,
queue_size=None, log_sends=False, protocol='tcp', batch_size=1000):
"""Initialize a Sender instance, starting the background thread to
send messages at given interval (in seconds) if "interval" is not
None. Send at most "batch_size" messages per socket send operation (default=1000).
Default protocol is TCP; use protocol='udp' for UDP.
"""
self.host = host
self.port = port
self.prefix = prefix
self.timeout = timeout
self.interval = interval
self.log_sends = log_sends
self.protocol = protocol
self.batch_size = batch_size
if self.interval is not None:
if queue_size is None:
queue_size = int(round(interval)) * 100
self._queue = queue.Queue(maxsize=queue_size)
self._thread = threading.Thread(target=self._thread_loop)
self._thread.daemon = True
self._thread.start()
atexit.register(self.stop)
def __del__(self):
self.stop()
def stop(self):
"""Tell the sender thread to finish and wait for it to stop sending
(should be at most "timeout" seconds).
"""
if self.interval is not None:
self._queue.put_nowait(None)
self._thread.join()
self.interval = None
def build_message(self, metric, value, timestamp, tags={}):
"""Build a Graphite message to send and return it as a byte string."""
if not metric or metric.split(None, 1)[0] != metric:
raise ValueError('"metric" must not have whitespace in it')
if not isinstance(value, (int, float)):
raise TypeError('"value" must be an int or a float, not a {}'.format(
type(value).__name__))
tags_suffix = ''.join(';{}={}'.format(x[0], x[1]) for x in sorted(tags.items()))
message = u'{}{}{} {} {}\n'.format(
self.prefix + '.' if self.prefix else '',
metric,
tags_suffix,
value,
int(round(timestamp))
)
message = message.encode('utf-8')
return message
def send(self, metric, value, timestamp=None, tags={}):
"""Send given metric and (int or float) value to Graphite host.
Performs send on background thread if "interval" was specified when
creating this Sender.
If a "tags" dict is specified, send the tags to the Graphite host along with the metric.
"""
if timestamp is None:
timestamp = time.time()
message = self.build_message(metric, value, timestamp, tags)
if self.interval is None:
self.send_socket(message)
else:
try:
self._queue.put_nowait(message)
except queue.Full:
logger.error('queue full when sending {!r}'.format(message))
def send_message(self, message):
if self.protocol == 'tcp':
sock = socket.create_connection((self.host, self.port), self.timeout)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.send(message)
finally: # sockets don't support "with" statement on Python 2.x
sock.close()
elif self.protocol == 'udp':
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.sendto(message, (self.host, self.port))
finally:
sock.close()
else:
raise ValueError('"protocol" must be \'tcp\' or \'udp\', not {!r}'.format(self.protocol))
def send_socket(self, message):
"""Low-level function to send message bytes to this Sender's socket.
You should usually call send() instead of this function (unless you're
subclassing or writing unit tests).
"""
if self.log_sends:
start_time = time.time()
try:
self.send_message(message)
except Exception as error:
logger.error('error sending message {!r}: {}'.format(message, error))
else:
if self.log_sends:
elapsed_time = time.time() - start_time
logger.info('sent message {!r} to {}:{} in {:.03f} seconds'.format(
message, self.host, self.port, elapsed_time))
def _thread_loop(self):
"""Background thread used when Sender is in asynchronous/interval mode."""
last_check_time = time.time()
messages = []
while True:
# Get first message from queue, blocking until the next time we
# should be sending
time_since_last_check = time.time() - last_check_time
time_till_next_check = max(0, self.interval - time_since_last_check)
try:
message = self._queue.get(timeout=time_till_next_check)
except queue.Empty:
pass
else:
if message is None:
# None is the signal to stop this background thread
break
messages.append(message)
# Get any other messages currently on queue without blocking,
# paying attention to None ("stop thread" signal)
should_stop = False
while True:
try:
message = self._queue.get_nowait()
except queue.Empty:
break
if message is None:
should_stop = True
break
messages.append(message)
if should_stop:
break
# If it's time to send, send what we've collected
current_time = time.time()
if current_time - last_check_time >= self.interval:
last_check_time = current_time
for i in range(0, len(messages), self.batch_size):
batch = messages[i:i + self.batch_size]
self.send_socket(b''.join(batch))
messages = []
# Send any final messages before exiting thread
for i in range(0, len(messages), self.batch_size):
batch = messages[i:i + self.batch_size]
self.send_socket(b''.join(batch))
def init(*args, **kwargs):
"""Initialize default Sender instance with given args."""
global default_sender
default_sender = Sender(*args, **kwargs)
def send(*args, **kwargs):
"""Send message using default Sender instance."""
default_sender.send(*args, **kwargs)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('metric',
help='name of metric to send')
parser.add_argument('value', type=float,
help='numeric value to send')
parser.add_argument('-s', '--server', default='localhost',
help='hostname of Graphite server to send to, default %(default)s')
parser.add_argument('-p', '--port', type=int, default=2003,
help='port to send message to, default %(default)d')
parser.add_argument('-u', '--udp', action='store_true',
help='send via UDP instead of TCP')
parser.add_argument('-t', '--timestamp', type=int,
help='Unix timestamp for message (defaults to current time)')
parser.add_argument('-q', '--quiet', action='store_true',
help="quiet mode (don't log send to stdout)")
args = parser.parse_args()
if not args.quiet:
logging.basicConfig(level=logging.INFO, format='%(message)s')
sender = Sender(args.server, port=args.port, log_sends=not args.quiet,
protocol='udp' if args.udp else 'tcp')
sender.send(args.metric, args.value, timestamp=args.timestamp)