-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubprocess2.py
188 lines (154 loc) · 6.15 KB
/
subprocess2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
Extensions to subprocess module
TODO:
serialized safe wrapper threads
broken pipe signal
"""
import subprocess, io, os, threading
from subprocess import PIPE, STDOUT
class Subprocess(subprocess.Popen):
""" Similar to subprocess.Popen with the following enhancement:
* stdin may be another subprocess (producer) or any python iterable
* errorlevel argument may be set to set threshold for converting exit codes to exceptions
"""
def __init__(self, args, **kw):
if 'stdin' in kw:
kw['stdin'] = self._asfiledesc(kw['stdin'])
self._errorlevel = kw.pop('errorlevel', None)
self._cmd = subprocess.list2cmdline(args)[:200]
subprocess.Popen.__init__(self, args=args, **kw)
def _handle_exitstatus(self, sts):
subprocess.Popen._handle_exitstatus(self, sts)
if self._errorlevel is not None:
if self.returncode >= self._errorlevel or self.returncode < 0:
raise subprocess.CalledProcessError(self.returncode, self._cmd)
@staticmethod
def _asfiledesc(arg):
""" Convert argument into something that has a file descriptor """
if hasattr(arg, 'fileno') or isinstance(arg, int):
return arg # use as-is
if isinstance(arg, str):
# special case strings to avoid iteration char by char
return Iter2Pipe(arg)
iterator = iter(arg)
if hasattr(iterator, 'fileno'): # an iterator with a fileno?
return iterator # (e.g. file, urlopen, producer)
else:
return Iter2Pipe(iterator) # no, wrap in bridge thread
class Producer(Subprocess, io.BufferedReader):
""" Exposes a readable file-like interface to stdout of a subprocess """
def __init__(self, args, **kw):
if 'stdout' in kw:
raise ValueError("Producer: stdout already overridden")
kw['stdout'] = subprocess.PIPE
Subprocess.__init__(self, args, **kw)
fd = os.dup(self.stdout.fileno()) # prevents closing of pipe
self.stdout = None # when stdout file object dies
filemode = 'rU' if kw.get('universal_newlines', False) else 'r'
io.BufferedReader.__init__(self, io.FileIO(fd, filemode))
class Consumer(Subprocess, io.BufferedWriter):
""" Exposes a writable file-like interface to stdin of a subprocess """
def __init__(self, args, **kw):
if 'stdin' in kw:
raise ValueError("Consumer: stdin already overridden")
kw['stdin'] = subprocess.PIPE
Subprocess.__init__(self, args, **kw)
fd = os.dup(self.stdin.fileno()) # prevents closing of pipe
self.stdin = None # when stdin file object dies
io.BufferedWriter.__init__(self, io.FileIO(fd), 'w')
class _RawIterIO(io.BufferedIOBase):
""" Helper class for turning python iterator to a file-like object """
def __init__(self, iterable):
self.iterator = self.sourcereader(iterable)
self.remainder = None
def sourcereader(self, iterable):
for x in iterable:
if not isinstance(x, str):
x = '%s\n' % x
yield x
iterable = None # let gc do its work
while True:
yield ''
def readable(self):
return True
def read(self, n=None):
""" Read no more than n bytes from source """
if self.remainder:
data = self.remainder
self.remainder = None
else:
data = next(self.iterator)
if n is not None and 0 < n < len(data):
data, self.remainder = data[:n], data[n:]
return data
def close(self):
io.BufferedIOBase.close(self)
self.iterator = None
def make_readable(obj):
""" Adapt object into something that has a .read() method """
if hasattr(obj, 'read'):
return obj
elif isinstance(obj, str):
from cStringIO import StringIO
return StringIO(obj)
else:
# assume it's somehow iterable:
return io.BufferedReader(_RawIterIO(obj))
class Iter2Pipe(threading.Thread):
""" Bridge thread from python iterator to a pipe """
def __init__(self, obj):
self.source = make_readable(obj)
self._pending_exception = None
threading.Thread.__init__(self)
def fileno(self):
""" File descriptor through which iterator data may be read. """
if not self.is_alive():
self._initthread()
return self._readfd
def close(self):
# Propagate iteration raised in thread:
if self._pending_exception:
etype, evalue, traceback = self._pending_exception
self._pending_exception = None
try:
raise evalue.with_traceback(traceback)
except AttributeError:
exec('raise etype, evalue, traceback')
__del__ = close
### internal methods:
def _initthread(self):
""" Create thread to read from iterable and write to pipe """
self._readfd, self._writefd = os.pipe()
# ensure write side of pipe is not inherited by child process
import fcntl
flags = fcntl.fcntl(self._writefd, fcntl.F_GETFD)
fcntl.fcntl(self._writefd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
# Get pipe buffer size
try:
self._bufsize = os.fstat(self._writefd).st_blksize
except (os.error, AttributeError):
self._bufsize = 2048 # too small better than too big - may cause blocking
self.start()
def run(self):
""" Thread main function """
import sys
try:
while True:
try:
buf = self.source.read(self._bufsize)
except Exception:
self._pending_exception = sys.exc_info()
break
if not buf:
break
try:
os.write(self._writefd, buf)
except OSError:
return
finally:
os.close(self._writefd)
self.source.close()
__all__ = [
'PIPE', 'STDOUT',
'Subprocess', 'Producer', 'Consumer',
]