Skip to content

Commit

Permalink
Be quiet by default, add tracing logging
Browse files Browse the repository at this point in the history
Still need to find a frontend for the log messages, but this will give you timings and all for each of the steps and stages, and hopefully in the future even for each individual filter
  • Loading branch information
jelmervdl committed Sep 21, 2023
1 parent c960408 commit 0311261
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 98 deletions.
198 changes: 100 additions & 98 deletions opuscleaner/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from pydantic import parse_obj_as

from opuscleaner import logging
from opuscleaner.config import COL_PY, FILTER_PATH
from opuscleaner.filters import list_filters, set_global_filters, filter_format_command, Filter, FilterStep, FilterPipeline, quote, format_shell
from opuscleaner._util import none_throws, ThreadPool, CancelableQueue, Cancelled
Expand All @@ -44,18 +45,23 @@
MergeQueue = CancelableQueue[Union[None,Tuple[int,str]]]


@logging.trace
def babysit_child(n: int, child: Popen, name: str, print_queue: PrintQueue, ctrl_queue: ControlQueue) -> None:
"""Thread that looks after a child process and passes (and prefixes) all of
its stderr to a queue. It will tell the parent thread about the end of the
child through the ctrl_queue.
"""
logging.update(pid=child.pid)

prefix = f'[{name}] '.encode()

for line in none_throws(child.stderr):
print_queue.put(prefix + line)

child.wait()

logging.event('child_exited', n=n, retval=child.returncode)

ctrl_queue.put((n, child.returncode))


Expand Down Expand Up @@ -95,6 +101,7 @@ class Child(NamedTuple):
process: Popen
babysitter: Thread

@logging.trace_context
class ProcessPool:
"""Context manager for spawning and babysitting child processes that are
siblings connected by their pipes.
Expand Down Expand Up @@ -138,8 +145,6 @@ def __exit__(self, err_type, err_inst, err_trace) -> None:
# Wait for the children to exit, and depending on their retval exit early
running_children = len(self.children)

print(f"Waiting for {running_children} subprocesses to finish...", file=sys.stderr)

# If we hit __exit__ due to an exception, just terminate everything
if err_type:
for child in self.children:
Expand All @@ -154,6 +159,8 @@ def __exit__(self, err_type, err_inst, err_trace) -> None:
child_i, retval = self.ctrl_queue.get()
running_children -= 1

logging.event('child_stopped', child_i=child_i, retval=retval)

# Early exit when a process errored out. SIGPIPE is retuned by
# processes that can no longer write to the next one. E.g. when
# `head -n 10` stops reading because it has read its 10 lines.
Expand Down Expand Up @@ -242,8 +249,6 @@ def run(self, pool:ProcessPool, stdin:BinaryIO, stdout:BinaryIO, *, tee:bool=Fal
if not is_last_step and not tee:
stdin = none_throws(child.stdout)

pool.print_queue.put(f'[run.py] step {pool.name}{i}/{step.name}: Started {step.command} (pid {child.pid})\n'.encode())

# If we are tee-ing for debug, shunt the output to a separate file
# TODO: uncompressed at the moment. Might be trouble.
if tee:
Expand Down Expand Up @@ -291,8 +296,6 @@ def split_input(print_queue:PrintQueue, parallel: int, batch_queue: BatchQueue,

fh.close()

print_queue.put(f'[run.py] Wrote {lines} lines to batch {batch_index}: {fh.name}\n'.encode())

try:
if lines > 0:
batch_queue.put((batch_index, fh.name))
Expand All @@ -313,6 +316,7 @@ def split_input(print_queue:PrintQueue, parallel: int, batch_queue: BatchQueue,
batch_queue.put(None)


@logging.trace
def run_pipeline(print_queue:PrintQueue, batch_queue:BatchQueue, merge_queue:MergeQueue, pipeline:Pipeline) -> None:
"""Receives an input filename from `batch_queue`, and once that has been processed
with `pipeline`, it will post the output filename to `merge_queue`.
Expand All @@ -339,11 +343,10 @@ def run_pipeline(print_queue:PrintQueue, batch_queue:BatchQueue, merge_queue:Mer
# Write pipeline output to tempfile that is then passed on to merger.
stdout = NamedTemporaryFile(delete=False)

print_queue.put(f'[run.py] Filtering chunk {filename} to {stdout.name}\n'.encode())

# Open chunk file and process pool and run the pipeline with it.
# The pool's __exit__() will make us wait till the pipeline is done.
with open(filename, 'rb') as stdin, \
with logging.span('run_pipeline_batch', batch_index=batch_index), \
open(filename, 'rb') as stdin, \
ProcessPool(print_queue, env={'TMPDIR': tmpdir}, print_prefix=f'{batch_index}/') as pool:
pipeline.run(pool, stdin, stdout)

Expand Down Expand Up @@ -379,10 +382,8 @@ def merge_output(print_queue:PrintQueue, parallel:int, merge_queue:MergeQueue, s
if next_batch_index in pending_batches:
batch_index, filename = next_batch_index, pending_batches[next_batch_index]

print_queue.put(f'[run.py] Merging {filename} into output\n'.encode())

try:
with open(filename, 'rb') as fh:
with logging.span(f'merge_output_batch', batch_index=batch_index), open(filename, 'rb') as fh:
copyfileobj(fh, stdout)
except Exception as exc:
raise RuntimeError(f'Error while merging batch {batch_index}') from exc
Expand All @@ -409,6 +410,7 @@ def merge_output(print_queue:PrintQueue, parallel:int, merge_queue:MergeQueue, s
if len(pending_batches) and next_batch_index <= max(pending_batches.keys()):
raise RuntimeError(f'Not all batches got merged: {next_batch_index=} <= {max(pending_batches.keys())=}')

@logging.trace
def run_parallel(pipeline:Pipeline, stdin:BinaryIO, stdout:BinaryIO, *, parallel:int, batch_size:int, print_queue: PrintQueue) -> None:
batch_queue: BatchQueue = CancelableQueue(parallel * 2)

Expand All @@ -426,8 +428,6 @@ def run_parallel(pipeline:Pipeline, stdin:BinaryIO, stdout:BinaryIO, *, parallel
# Read from `merge_queue` and combine files in order.
pool.start(merge_output, print_queue, parallel, merge_queue, stdout)


print_queue.put(f'[run.py] Waiting for pipelines to finish\n'.encode())
try:
pool.join()
except BaseException as exc: # Note: also catches KeyboardInterrupt
Expand All @@ -447,112 +447,114 @@ def main() -> None:
parser.add_argument('--batch-size', type=int, default=1_000_000, help='Batch size in lines that each parallel copy processes (only if --parallel > 1)')
parser.add_argument('--first', type=int, default=0, help='Limit reading input to the N first lines')
parser.add_argument('--dump', action='store_true', help='Print shell script instead')
parser.add_argument('--trace', type=argparse.FileType('w'), help='Write tracing JSON to file')
parser.add_argument('pipeline', metavar='PIPELINE', type=argparse.FileType('r'), help='Pipeline steps specification file, e.g. *.filters.json')
parser.add_argument('languages', metavar='LANG', type=str, nargs='*', help='Language codes of the columns in the input TSV. Only used when --input is set')

args = parser.parse_args()

# default search path for the data files is next to the configuration file
# which is the default save location for empty-train.
if not args.basedir:
args.basedir = os.path.dirname(args.pipeline.name) or os.getcwd()
with logging.Context(file=args.trace), logging.span('main'):
# default search path for the data files is next to the configuration file
# which is the default save location for empty-train.
if not args.basedir:
args.basedir = os.path.dirname(args.pipeline.name) or os.getcwd()

if args.input is not None and not args.languages:
parser.error('When --input is specified, each column\'s LANG has to be specified as well.')
if args.input is not None and not args.languages:
parser.error('When --input is specified, each column\'s LANG has to be specified as well.')

# load all filter definitions (we need to, to get their name)
filters = {
definition.name: definition
for definition in list_filters(args.filters)
}
# load all filter definitions (we need to, to get their name)
filters = {
definition.name: definition
for definition in list_filters(args.filters)
}

# set_global_filters() provides the filters to the validators in FilterPipeline
set_global_filters(filters)
pipeline_config = parse_obj_as(FilterPipeline, json.load(args.pipeline))
# set_global_filters() provides the filters to the validators in FilterPipeline
set_global_filters(filters)
pipeline_config = parse_obj_as(FilterPipeline, json.load(args.pipeline))

# Order of columns. Matches datasets.py:list_datasets(path)
languages: List[str] = args.languages if args.input else [filename.rsplit('.', 2)[1] for filename in pipeline_config.files]
# Order of columns. Matches datasets.py:list_datasets(path)
languages: List[str] = args.languages if args.input else [filename.rsplit('.', 2)[1] for filename in pipeline_config.files]

# Directory plus basename to write debug (`--tee`) files to
basename: str = 'stdin' if args.input else os.path.commonprefix(pipeline_config.files).rstrip('.')
# Directory plus basename to write debug (`--tee`) files to
basename: str = 'stdin' if args.input else os.path.commonprefix(pipeline_config.files).rstrip('.')

pipeline = Pipeline(filters, languages, pipeline_config)
pipeline = Pipeline(filters, languages, pipeline_config)

# Input for next child
stdin: BinaryIO
# Input for next child
stdin: BinaryIO

# Output of this program
stdout:BinaryIO = args.output
# Output of this program
stdout:BinaryIO = args.output

# If we're just dumping the pipeline, do so to the specified output
if args.dump:
pipeline.dump(TextIOWrapper(stdout))
sys.exit(0)
# If we're just dumping the pipeline, do so to the specified output
if args.dump:
pipeline.dump(TextIOWrapper(stdout))
sys.exit(0)

# Queue filled by the babysitters with the stderr of the children, consumed
# by `print_lines()` to prevent racing on stderr.
print_queue = SimpleQueue() # type: SimpleQueue[Optional[bytes]]
# Queue filled by the babysitters with the stderr of the children, consumed
# by `print_lines()` to prevent racing on stderr.
print_queue = SimpleQueue() # type: SimpleQueue[Optional[bytes]]

# First start the print thread so that we get immediate feedback from the
# children even if all of them haven't started yet.
print_thread = Thread(target=print_lines, args=[print_queue, sys.stderr.buffer])
print_thread.start()
# First start the print thread so that we get immediate feedback from the
# children even if all of them haven't started yet.
print_thread = Thread(target=print_lines, args=[print_queue, sys.stderr.buffer])
print_thread.start()

# Start child processes, each reading the output from the previous sibling
try:
with ProcessPipeline(print_queue) as pool:
# If we're not reading from stdin, read from files and paste them together
if args.input:
stdin = args.input
else:
# Open `gzunip` for each language file
gunzips = [
pool.start(f'gunzip {filename}',
['gzip', '-cd', filename],
# Start child processes, each reading the output from the previous sibling
try:
with ProcessPool(print_queue) as pool:
# If we're not reading from stdin, read from files and paste them together
if args.input:
stdin = args.input
else:
# Open `gzunip` for each language file
gunzips = [
pool.start(f'gunzip {filename}',
['gzip', '-cd', filename],
stdout=PIPE,
stderr=PIPE,
cwd=args.basedir)
for filename in pipeline_config.files
]

fds = [none_throws(gunzip.stdout).fileno() for gunzip in gunzips]

# .. and a `paste` to combine them into columns
paste = pool.start('paste',
['paste'] + [f'/dev/fd/{fd}' for fd in fds],
stdout=PIPE,
stderr=PIPE,
cwd=args.basedir)
for filename in pipeline_config.files
]

fds = [none_throws(gunzip.stdout).fileno() for gunzip in gunzips]

# .. and a `paste` to combine them into columns
paste = pool.start('paste',
['paste'] + [f'/dev/fd/{fd}' for fd in fds],
stdout=PIPE,
stderr=PIPE,
pass_fds=fds)
pass_fds=fds)

# Now that `paste` has inherited all the children, close our connection to them
for gunzip in gunzips:
none_throws(gunzip.stdout).close()
# Now that `paste` has inherited all the children, close our connection to them
for gunzip in gunzips:
none_throws(gunzip.stdout).close()

stdin = none_throws(paste.stdout)
stdin = none_throws(paste.stdout)

# If we only want the first N lines processed, use `head` to chop those off.
if args.first > 0:
head = pool.start('head',
['head', '-n', str(args.first)],
stdin=stdin,
stdout=PIPE,
stderr=PIPE)

stdin.close() # now taken over by `head`.
stdin = none_throws(head.stdout)

if args.parallel > 1:
run_parallel(pipeline, stdin, stdout, print_queue=print_queue, parallel=args.parallel, batch_size=args.batch_size)
else:
pipeline.run(pool, stdin, stdout, tee=args.tee, basename=basename)
except:
# If we didn't cleanly exit all processes, we err as well
traceback.print_exc(file=sys.stderr)
sys.exit(1)
finally:
# Tell print thread to stop (there are no more babysitters now to produce printable stuff)
print_queue.put(None)
print_thread.join()
# If we only want the first N lines processed, use `head` to chop those off.
if args.first > 0:
head = pool.start('head',
['head', '-n', str(args.first)],
stdin=stdin,
stdout=PIPE,
stderr=PIPE)

stdin.close() # now taken over by `head`.
stdin = none_throws(head.stdout)

if args.parallel > 1:
run_parallel(pipeline, stdin, stdout, print_queue=print_queue, parallel=args.parallel, batch_size=args.batch_size)
else:
pipeline.run(pool, stdin, stdout, tee=args.tee, basename=basename)
except:
# If we didn't cleanly exit all processes, we err as well
traceback.print_exc(file=sys.stderr)
sys.exit(1)
finally:
# Tell print thread to stop (there are no more babysitters now to produce printable stuff)
print_queue.put(None)
print_thread.join()


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 0311261

Please sign in to comment.