Skip to content

Commit

Permalink
Update script to generate reference data for the integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
robomics committed Nov 12, 2024
1 parent ec95469 commit 3fd892e
Showing 1 changed file with 173 additions and 63 deletions.
236 changes: 173 additions & 63 deletions test/scripts/generate_integration_test_reference_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import subprocess as sp
import sys
import tempfile
from typing import List
from typing import Any, Dict, List

import pandas as pd


def existing_file(path: str) -> pathlib.Path:
Expand Down Expand Up @@ -105,6 +107,48 @@ def add_common_flags(parser):
)


def make_cross_product_sc(main_parser):
sc: argparse.ArgumentParser = main_parser.add_parser(
"cross-product",
help="Generate the reference dataset for NCHG cross-product.",
)

sc.add_argument(
"1d-domains",
type=existing_file,
help="Path to a BED3+ files with the list of domains to be processed.",
)

sc.add_argument(
"out-path",
type=pathlib.Path,
help="Output path where to store the resulting domains.",
)

sc.add_argument(
"--type",
type=str,
choices={"gw", "cis", "trans"},
default="gw",
help="Type of domains to be generated.",
)

sc.add_argument(
"--verbosity",
choices={"debug", "info", "warnings", "error", "critical"},
type=str,
default="info",
help="Tweak log verbosity.",
)

sc.add_argument(
"--force",
action="store_true",
default=False,
help="Overwrite existing file(s).",
)


def make_compute_sc(main_parser):
sc: argparse.ArgumentParser = main_parser.add_parser(
"compute",
Expand Down Expand Up @@ -286,6 +330,50 @@ def run_nchg_command(nchg_bin: pathlib.Path, subcmd: str, *args, **kwargs):
raise RuntimeError(f"{cmd} returned with exit code {res.returncode}")


def generate_2d_domains(
domains: pathlib.Path,
output_path: pathlib.Path,
domain_type: str,
force: bool,
) -> pathlib.Path:
if output_path.exists() and not force:
raise RuntimeError(f'refusing to overwrite file "{output_path}": pass --force to overwrite.')

parent_dir = output_path.parent
parent_dir.mkdir(exist_ok=True)

logging.info("reading domains from %s...", domains)

df = pd.read_table(domains, names=["chrom", "start", "end"], usecols=[0, 1, 2])
chroms = {name: i for i, name in enumerate(df["chrom"].unique())}

logging.info("parsed %d chromosomes...", len(chroms))
df["id"] = df["chrom"].map(chroms)

df = df.merge(df, how="cross", suffixes=("1", "2"))
df = (
df[(df["id1"] <= df["id2"]) & (df["start1"] <= df["start2"])]
.reset_index(drop=True)
.drop(columns=["id1", "id2"])
)

size = len(df)
logging.info("generated %d domain pairs...", size)
if domain_type == "cis":
logging.info("dropping trans domains...")
df = df[df["chrom1"] == df["chrom2"]]
logging.info("dropped %d domains...", size - len(df))
elif domain_type == "trans":
logging.info("dropping cis domains...")
df = df[df["chrom1"] != df["chrom2"]]
logging.info("dropped %d domains...", size - len(df))

logging.info("writing 2D domains to %s...", output_path)
df.to_csv(output_path, sep="\t", index=False, header=False)

return output_path


def run_nchg_compute(
nchg_bin: pathlib.Path,
uri: str,
Expand Down Expand Up @@ -395,6 +483,80 @@ def run_nchg_expected(
return output_h5


def generate_all_files(nchg_bin: pathlib.Path, force: bool, timeout: float, args: Dict[str, Any]):
suffix = pathlib.Path(args["cool-uri"].partition("::")[0]).stem

for dom_type in ["gw", "cis", "trans"]:
output = args["output-dir"] / "cross_product" / f"{suffix}.{dom_type}-domains.bedpe"
generate_2d_domains(args["domains"], output, dom_type, args["force"])

domains = args["output-dir"] / "cross_product" / f"{suffix}.cis-domains.bedpe"
outprefix = args["output-dir"] / "compute_with_domains" / suffix
run_nchg_compute(
nchg_bin,
args["cool-uri"],
outprefix,
domains,
args["nproc"],
force,
timeout,
)

outprefix = args["output-dir"] / "compute" / suffix
run_nchg_compute(
nchg_bin,
args["cool-uri"],
outprefix,
None,
args["nproc"],
force,
timeout,
)

output = args["output-dir"] / "merge" / f"{suffix}.parquet"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_merge(
nchg_bin,
outprefix,
output,
force,
timeout,
)

input = output
output = args["output-dir"] / "filter" / f"{suffix}.parquet"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_filter(
nchg_bin,
input,
output,
force,
timeout,
)

input = output
output = args["output-dir"] / "view" / f"{suffix}.tsv"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_view(
nchg_bin,
input,
output,
force,
timeout,
)

output = args["output-dir"] / "expected" / f"{suffix}.h5"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_expected(
nchg_bin,
args["cool-uri"],
output,
[],
force,
timeout,
)


def find_nchg_exec(path: pathlib.Path | None) -> pathlib.Path:
if path:
if shutil.which(path):
Expand All @@ -421,7 +583,14 @@ def main():

cmd = args["command"]

if cmd == "compute":
if cmd == "cross-product":
generate_2d_domains(
args["domains"],
args["out-path"],
args["type"],
args["force"],
)
elif cmd == "compute":
run_nchg_compute(
nchg_bin,
args["cool-uri"],
Expand Down Expand Up @@ -465,70 +634,11 @@ def main():
timeout,
)
elif not cmd:
suffix = pathlib.Path(args["cool-uri"].partition("::")[0]).stem
outprefix = args["output-dir"] / "compute_with_domains" / suffix
run_nchg_compute(
generate_all_files(
nchg_bin,
args["cool-uri"],
outprefix,
args["domains"],
args["nproc"],
force,
timeout,
)

outprefix = args["output-dir"] / "compute" / suffix
run_nchg_compute(
nchg_bin,
args["cool-uri"],
outprefix,
None,
args["nproc"],
force,
timeout,
)

output = args["output-dir"] / "merge" / f"{suffix}.parquet"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_merge(
nchg_bin,
outprefix,
output,
force,
timeout,
)

input = output
output = args["output-dir"] / "filter" / f"{suffix}.parquet"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_filter(
nchg_bin,
input,
output,
force,
timeout,
)

input = output
output = args["output-dir"] / "view" / f"{suffix}.tsv"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_view(
nchg_bin,
input,
output,
force,
timeout,
)

output = args["output-dir"] / "expected" / f"{suffix}.h5"
output.parent.mkdir(parents=True, exist_ok=True)
run_nchg_expected(
nchg_bin,
args["cool-uri"],
output,
[],
force,
timeout,
args,
)
else:
raise NotImplementedError
Expand Down

0 comments on commit 3fd892e

Please sign in to comment.