Skip to content

Commit

Permalink
adding batched lastz
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard C. Burhans committed Apr 30, 2024
1 parent 9de8344 commit 31de746
Show file tree
Hide file tree
Showing 3 changed files with 381 additions and 0 deletions.
20 changes: 20 additions & 0 deletions tools/batched_lastz/batched_lastz.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<tool id="batched_lastz" name="Batched Lastz" version="@TOOL_VERSION@+galaxy@VERSION_SUFFIX@" profile="@PROFILE@">
<description>: align batches of sequences</description>
<macros>
<import>macros.xml</import>
</macros>
<expand macro="requirements"/>
<command detect_errors="exit_code"><![CDATA[
run_lastz_tarball.py '--input=$input' '--output=$output' '--parallel=\${GALAXY_SLOTS:-2}'
]]></command>
<inputs>
<param argument="--tarball" type="data" format="tgz" label="Tarball"/>
</inputs>
<outputs>
<data name="output" label="Output"/>
</outputs>
<help><![CDATA[
TODO: Fill in help.
]]></help>
<expand macro="citations"/>
</tool>
26 changes: 26 additions & 0 deletions tools/batched_lastz/macros.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<macros>
<xml name="requirements">
<requirements>
<requirement type="package" version="@TOOL_VERSION@">lastz</requirement>
<yield/>
</requirements>
</xml>
<token name="@TOOL_VERSION@">1.04.22</token>
<token name="@VERSION_SUFFIX@">0</token>
<token name="@PROFILE@">21.05</token>
<xml name="citations">
<citations>
<citation type="bibtex">
@misc{
githublastz,
author = {Harris, Robert},
year = {2007},
title = {Improved pairwise alignment of genomic DNA},
publisher = {The Pennsylvania State University},
journal = {Ph. D. Thesis},
url = {http://www.bx.psu.edu/~rsharris/rsharris_phd_thesis_2007.pdf},
}
</citation>
</citations>
</xml>
</macros>
335 changes: 335 additions & 0 deletions tools/batched_lastz/run_lastz_tarball.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
#!/usr/bin/env python

import argparse
import concurrent.futures
import json
import multiprocessing
import os
import queue
import re
import shutil
import sys
import subprocess
import tarfile
import tempfile
import typing
import time


lastz_output_format_regex = re.compile(
r"^(?:axt\+?|blastn|cigar|differences|general-?.+|lav|lav\+text|maf[-+]?|none|paf(?::wfmash)?|rdotplot|sam-?|softsam-?|text)$",
re.IGNORECASE,
)


# Specifies the output format: lav, lav+text, axt, axt+, maf, maf+, maf-, sam, softsam, sam-, softsam-, cigar, BLASTN, PAF, PAF:wfmash, differences, rdotplot, text, general[:<fields>], or general-[:<fields>].
# ‑‑format=none can be used when no alignment output is desired.


def run_command(
instance: int,
input_queue: "queue.Queue[typing.Dict[str, typing.Any]]",
output_queue: "queue.Queue[float]",
debug: bool = False,
) -> None:
os.chdir("galaxy/files")

while True:
command_dict = input_queue.get()

if not command_dict:
return

args = ["lastz"]
args.extend(command_dict["args"])

stdin = command_dict["stdin"]
if stdin is not None:
stdin = open(stdin, "r")

stdout = command_dict["stdout"]
if stdout is not None:
stdout = open(stdout, "w")

stderr = command_dict["stderr"]
if stderr is not None:
stderr = open(stderr, "w")

begin = time.perf_counter()
p = subprocess.run(args, stdin=stdin, stdout=stdout, stderr=stderr)

for var in [stdin, stdout, stderr]:
if var is not None:
var.close()

if p.returncode != 0:
sys.exit(f"command failed: {' '.join(args)}")
else:
stderr = command_dict["stderr"]
if stderr is not None:
try:
statinfo = os.stat(stderr, follow_symlinks=False)
except:
statinfo = None

if statinfo is None:
sys.exit(f"unable to stat stderr file: {' '.join(args)}")

if statinfo.st_size != 0:
sys.exit(f"stderr file is not empty: {' '.join(args)}")

elapsed = time.perf_counter() - begin
output_queue.put(elapsed)

if debug:
print(f"runtime {elapsed}", file=sys.stderr, flush=True)


class BatchTar:
def __init__(self, pathname: str, debug: bool = False) -> None:
self.pathname = pathname
self.debug = debug
self.tarfile = None
self.commands: typing.List[typing.Dict[str, typing.Any]] = []
self._extract()
self._load_commands()

def batch_commands(self) -> typing.Iterator[typing.Dict[str, typing.Any]]:
for command in self.commands:
yield command

def _load_commands(self) -> None:
try:
f = open("galaxy/commands.json")
except FileNotFoundError:
sys.exit(
f"ERROR: input tarball missing galaxy/commands.json: {self.pathname}"
)

begin = time.perf_counter()
for json_line in f:
json_line = json_line.rstrip("\n")
try:
command_dict = json.loads(json_line)
except json.JSONDecodeError:
sys.exit(
f"ERROR: bad json line in galaxy/commands.json: {self.pathname}"
)

self._load_command(command_dict)

f.close()
elapsed = time.perf_counter() - begin

if self.debug:
print(
f"loaded {len(self.commands)} commands in {elapsed} seconds ",
file=sys.stderr,
flush=True,
)

def _load_command(self, command_dict: typing.Dict[str, typing.Any]) -> None:
# check command_dict structure
field_types: typing.Dict[str, typing.List[typing.Any]] = {
"executable": [str],
"args": [list],
"stdin": [str, "None"],
"stdout": [str, "None"],
"stderr": [str, "None"],
}

bad_format = False
for field_name in field_types.keys():
# missing field
if field_name not in command_dict:
bad_format = True
break

# incorrect field type
good_type = False
for field_type in field_types[field_name]:
if isinstance(field_type, str) and field_type == "None":
if command_dict[field_name] is None:
good_type = True
break
elif isinstance(command_dict[field_name], field_type):
good_type = True
break

if good_type is False:
bad_format = True

if not bad_format:
# all args must be strings
for arg in command_dict["args"]:
if not isinstance(arg, str):
bad_format = True
break

if bad_format:
sys.exit(
f"ERROR: unexpected json format in line in galaxy/commands.json: {self.pathname}"
)

self.commands.append(command_dict)

def _extract(self) -> None:
try:
self.tarball = tarfile.open(
name=self.pathname, mode="r:*", format=tarfile.GNU_FORMAT
)
except FileNotFoundError:
sys.exit(f"ERROR: unable to find input tarball: {self.pathname}")
except tarfile.ReadError:
sys.exit(f"ERROR: error reading input tarball: {self.pathname}")

begin = time.perf_counter()
self.tarball.extractall(filter="data")
self.tarball.close()
elapsed = time.perf_counter() - begin

if self.debug:
print(
f"Extracted tarball in {elapsed} seconds", file=sys.stderr, flush=True
)


class TarRunner:
def __init__(
self,
input_pathname: str,
output_pathname: str,
parallel: int,
debug: bool = False,
) -> None:
self.input_pathname = input_pathname
self.output_pathname = output_pathname
self.parallel = parallel
self.debug = debug
self.batch_tar = BatchTar(self.input_pathname, debug=self.debug)
self.output_file_format: typing.Dict[str, str] = {}
self.output_files: typing.Dict[str, typing.List[str]] = {}
self._set_output()
self._set_target_query()

def _set_output(self) -> None:
for command_dict in self.batch_tar.batch_commands():
output_file = None
output_format = None

for arg in command_dict["args"]:
if arg.startswith("--format="):
output_format = arg[9:]
elif arg.startswith("--output="):
output_file = arg[9:]

if output_file is None:
f = tempfile.NamedTemporaryFile(dir="galaxy/files", delete=False)
output_file = os.path.basename(f.name)
f.close()
command_dict["args"].append(f"--output={output_file}")

if output_format is None:
output_format = "lav"
command_dict["args"].append(f"--format={output_format}")

if not lastz_output_format_regex.match(output_format):
sys.exit(f"ERROR: invalid output format: {output_format}")

self.output_file_format[output_file] = output_format

for output_file, output_format in self.output_file_format.items():
self.output_files.setdefault(output_format, [])
self.output_files[output_format].append(output_file)

def _set_target_query(self) -> None:
for command_dict in self.batch_tar.batch_commands():
new_args: typing.List[str] = []

for arg in command_dict["args"]:
if arg.startswith("--target="):
new_args.insert(0, arg[9:])
elif arg.startswith("--query="):
new_args.insert(1, arg[8:])
else:
new_args.append(arg)

command_dict["args"] = new_args

def run(self) -> None:
run_times = []
begin = time.perf_counter()

with multiprocessing.Manager() as manager:
input_queue: queue.Queue[typing.Dict[str, typing.Any]] = manager.Queue()
output_queue: queue.Queue[float] = manager.Queue()

for command_dict in self.batch_tar.batch_commands():
input_queue.put(command_dict)

# use the empty dict as a sentinel
for _ in range(self.parallel):
input_queue.put({})

with concurrent.futures.ProcessPoolExecutor(
max_workers=self.parallel
) as executor:
futures = [
executor.submit(
run_command,
instance,
input_queue,
output_queue,
debug=self.debug,
)
for instance in range(self.parallel)
]

for f in concurrent.futures.as_completed(futures):
if not f.done() or f.cancelled() or f.exception() is not None:
sys.exit("lastz command failed")

while not output_queue.empty():
run_time = output_queue.get()
run_times.append(run_time)

elapsed = time.perf_counter() - begin

if self.debug:
print(f"elapsed {elapsed}", file=sys.stderr, flush=True)

self._cleanup()

def _cleanup(self) -> None:
num_output_files = len(self.output_files.keys())

for file_type, file_list in self.output_files.items():
with open(f"output.{file_type}", "w") as ofh:
for filename in file_list:
with open(f"galaxy/files/{filename}") as ifh:
for line in ifh:
ofh.write(line)

if num_output_files == 1:
file_type = list(self.output_files.keys())[0]
src_filename = f"output.{file_type}"
shutil.copy2(src_filename, self.output_pathname)


def main() -> None:
if not hasattr(tarfile, "data_filter"):
sys.exit("ERROR: extracting may be unsafe; consider updating Python")

parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str, required=True)
parser.add_argument("--output", type=str, required=True)
parser.add_argument("--parallel", type=int, default=1, required=False)
parser.add_argument("--debug", action="store_true", required=False)

args = parser.parse_args()
runner = TarRunner(args.input, args.output, args.parallel, args.debug)
runner.run()


if __name__ == "__main__":
main()

0 comments on commit 31de746

Please sign in to comment.