-
Notifications
You must be signed in to change notification settings - Fork 0
/
delta.py
384 lines (309 loc) · 11.7 KB
/
delta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
#!/usr/bin/env python
from __future__ import division, absolute_import, print_function
import re
import click
import sys
import time
import subprocess
import os
import colors
import string
import locale
try:
from itertools import izip_longest
except ImportError: # pragma no cover, python 3
from itertools import zip_longest as izip_longest
try:
from io import UnsupportedOperation
except ImportError:
class UnsupportedOperation(Exception):
pass
if sys.version_info[0] == 2:
import codecs
_, encoding = locale.getdefaultlocale()
sys.stdout = codecs.getwriter(encoding)(sys.stdout)
if not hasattr(subprocess, 'check_output'): # pragma no cover, python 2.6
def check_output(cmd):
return subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
subprocess.check_output = check_output
separator = object()
class StringChunk(object):
def __init__(self, static_str):
self.static_str = static_str
def plain(self):
return self
def whitespace(self):
wsp = []
for c in self.static_str:
if c in string.whitespace:
wsp.append(c)
else:
wsp.append(' ')
return self.__class__(''.join(wsp))
def format(self, values, use_colors=True):
return self.static_str
def as_regex(self):
return re.escape(self.static_str)
def __repr__(self): # pragma: no cover
return "'{!r}'".format(self.static_str)
class NumberChunk(object):
@staticmethod
def detect(spaces, n, first, flex=True):
prefix = u''
align = u'<'
plus = u'+'
width = len(n)
if spaces:
prefix = u' '
width += len(spaces) - 1
align = u''
elif first:
align = u''
if u'.' not in n:
if align == u'' and flex and width < 2:
width = 2
if len(n) > 1 and n.startswith(u'0'):
width = u'0{0}'.format(width)
align = u''
return NumberChunk(prefix, align, plus, width, u'')
whole, frac = n.split(u'.', 1)
frac_len = len(frac)
if align == u'' and flex and width < frac_len + 3:
width = frac_len + 3
if len(whole) > 1 and whole.startswith(u'0'):
width = u'0{0}'.format(width)
align = u''
return NumberChunk(prefix, align, plus, width, u'.%df' % len(frac))
def __init__(self, prefix, align, plus, width, fmt):
self.prefix = prefix
self.align = align
self.plus = plus
self.width = width
self.fmt = fmt
def plain(self):
return self.__class__(self.prefix, self.align, u'', self.width, self.fmt)
def whitespace(self):
return self
def format_str(self):
return u'%s{0:%s%s%s%s}' % (self.prefix, self.align, self.plus, self.width, self.fmt)
def __repr__(self): # pragma: no cover
return self.format_str()
@staticmethod
def colorize(val, s):
if val is not None:
if val > 0:
return colors.green(s)
elif val < 0:
return colors.red(s)
return s
def format(self, values, use_colors=True):
value = values.pop(0)
s = self.format_str().format(value)
if use_colors:
s = self.colorize(value, s)
return s
def as_regex(self):
return r'(\s*[0-9]+(?:\.[0-9]+)?)'
class Format(object):
def __init__(self, chunks, colors=True):
self.chunks = chunks
self.colors = colors
self.regex = re.compile(''.join(c.as_regex() for c in self.chunks))
def plain(self):
return self.__class__([c.plain() for c in self.chunks], False)
def whitespace(self):
return self.__class__([c.whitespace() for c in self.chunks], self.colors)
def format_values(self, values, use_colors):
values = list(values)
if use_colors is None: use_colors = self.colors
for chunk in self.chunks:
yield chunk.format(values, use_colors)
def format(self, values, use_colors=None):
return ''.join(self.format_values(values, use_colors))
def __repr__(self): # pragma: no cover
return repr(self.chunks)
class Parser(object):
def __init__(self, flex=True, absolute=False, use_colors=True):
self.values = {}
self.flex = flex
self.absolute = absolute
self.use_colors = use_colors
@staticmethod
def num(n):
if u'.' not in n:
return int(n)
return float(n)
@staticmethod
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return izip_longest(*args, fillvalue=fillvalue)
def parse(self, line):
values = []
chunks = []
elts = re.split(r'(\s*)([0-9]+(?:\.[0-9]+)?)', line)
for i, (prefix, spaces, number) in enumerate(self.grouper(elts, 3)):
if prefix:
chunks.append(StringChunk(prefix))
if number is not None:
values.append(self.num(number))
chunks.append(NumberChunk.detect(spaces, number, i==0 and not prefix))
fmt = Format(chunks, self.use_colors)
self.values[fmt] = values
return fmt.plain(), None, values
def process(self, line):
for fmt, old_values in self.values.items():
m = fmt.regex.match(line)
if m:
values = [self.num(v) for v in m.groups()]
deltas = [n-o for n, o in zip(values, old_values)]
if not self.absolute:
self.values[fmt] = values
return fmt, deltas, values
return self.parse(line)
class Printer(object):
def __init__(self, fp, timestamps, separators, orig, skip_zeros):
self.fp = fp
self.timestamps = timestamps
self.separators = separators
self.orig = orig
self.skip_zeros = skip_zeros
self.separators_pending = 0
self.lines_since_sep = 0
self.multiline = False
@classmethod
def now(self): # pragma: no cover
return time.asctime()
def separator(self):
if self.separators:
if self.lines_since_sep == 1:
self.multiline = False
self.separators_pending += 1
def print_separator(self):
if self.timestamps:
return u'{0}\n'.format(self.now())
else:
return u'--- {0}\n'.format(self.now())
def print_separator_if_needed(self):
sp = self.separators_pending
self.separators_pending = 0
if sp > 1:
self.lines_since_sep = 0
return self.print_separator()
elif sp and self.multiline:
self.lines_since_sep = 0
return self.print_separator()
def print_line(self, line):
if self.timestamps:
return u'{0}: {1}'.format(self.now(), line)
return line
def print_chunks(self, chunks):
for buf in chunks:
self.fp.write(buf)
self.fp.flush()
def make_output(self, fmt, deltas, values):
if deltas is None:
yield self.print_line(fmt.format(values))
return
skip_delta = self.skip_zeros and all(d == 0 for d in deltas)
if self.orig:
if len(values):
yield self.print_line(fmt.plain().format(values))
if not skip_delta:
yield self.print_line(fmt.whitespace().format(deltas))
else:
yield self.print_line(fmt.format(values))
else:
if not skip_delta:
yield self.print_line(fmt.format(deltas))
def output(self, fmt, deltas, values):
if self.separators_pending == 0:
self.multiline = True
chunks = [c for c in self.make_output(fmt, deltas, values) if c is not None]
if chunks:
sep = self.print_separator_if_needed()
if sep:
chunks = [sep] + chunks
self.lines_since_sep += 1
self.print_chunks(chunks)
def fd_feed(fd, sep_interval):
while True:
ts = time.time()
line = fd.readline()
if sys.version_info[0] == 2:
line = line.decode(encoding)
if not line:
break
delta = time.time() - ts
if delta > sep_interval:
yield separator
yield line
def command_feed(cmd, interval, count=None):
_, encoding = locale.getdefaultlocale()
if len(cmd) == 1:
shell = os.getenv(u'SHELL', u'/bin/sh')
cmd = (shell, u'-c') + cmd
while count is None or count > 0:
if count is not None:
count -= 1
output = subprocess.check_output(cmd)
first = True
for line in output.splitlines():
if first:
first = False
yield separator
yield line.decode(encoding) + u'\n'
time.sleep(interval)
def run(feed, parser, printer):
for line in feed:
if line is separator:
printer.separator()
else:
printer.output(*parser.process(line))
def use_separators(cmd, separators, skip_zeros, timestamps):
if cmd:
return separators != 'never'
else:
return (
separators == 'always' or
(separators == 'auto' and skip_zeros and not timestamps))
def use_colors(color, fd):
if color == u'never':
return False
elif color == u'always':
return True
else:
try:
return os.isatty(fd.fileno())
except (AttributeError, UnsupportedOperation):
return False
def real_cli(stdin, stdout, cmd, timestamps, interval, flex, separators, color, orig, skip_zeros, absolute, count):
if cmd:
feed = command_feed(cmd, interval, count)
else:
feed = fd_feed(stdin, interval)
separators = use_separators(cmd, separators, skip_zeros, timestamps)
color = use_colors(color, stdin)
parser = Parser(flex, absolute, color)
printer = Printer(stdout, timestamps, separators, orig, skip_zeros)
try:
run(feed, parser, printer)
except (KeyboardInterrupt, IOError): # pragma: no cover
pass
@click.command()
@click.option(u'-t/-T', u'--timestamps/--no-timestamps', help=u'Show timestamps on all output lines')
@click.option(u'-i', u'--interval', metavar=u'SECONDS', type=click.INT,
help=u'Interval between command runs', default=1)
@click.option(u'-f/-F', u'--flex/--no-flex', help=u'Tweak column widths for better output (default is on)', default=True)
@click.option(u'--separators-auto', u'separators', flag_value=u'auto', help=u'Show chunk separators when needed (default)', default=True)
@click.option(u'-s', u'--separators', u'separators', flag_value=u'always', help=u'Always show chunk separators')
@click.option(u'-S', u'--no-separators', u'separators', flag_value=u'never', help=u'Never show chunk separators')
@click.option(u'-c', u'--color', type=click.Choice([u'never', u'auto', u'always']), help=u'Color output', default=u'auto')
@click.option(u'-o/-O', u'--orig/--no-orig', help=u'Show original output interleaved with deltas')
@click.option(u'-z/-Z', u'--skip-zeros/--with-zeros', help=u'Skip all-zero deltas')
@click.option(u'-a/-A', u'--absolute/--relative', help=u'Show deltas from original value, not last')
@click.option(u'-n', u'--count', metavar=u'NUMBER', type=click.INT, help=u'Number of command runs (default: until Ctrl-C')
@click.argument(u'cmd', nargs=-1, required=False)
def cli(cmd, timestamps, interval, flex, separators, color, orig, skip_zeros, absolute, count): # pragma: no cover
real_cli(sys.stdin, sys.stdout, cmd, timestamps, interval, flex, separators, color, orig, skip_zeros, absolute, count)
if __name__ == u'__main__': # pragma: no cover
cli()