Skip to content

Commit

Permalink
refactor validate_inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
akikuno committed Aug 9, 2023
1 parent b41a5f9 commit 5f55a71
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 119 deletions.
173 changes: 92 additions & 81 deletions src/DAJIN2/preprocess/validate_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,59 @@
from pathlib import Path
from urllib.error import URLError
from urllib.request import urlopen
import xml.etree.ElementTree as ET

import mappy


def _exist_file(input_file: str):
if not Path(input_file).exists():
raise FileNotFoundError(f"{input_file} is not found")


########################################################################
# Check if the sample is in the proper format.
########################################################################


def _fastq_extension(fastq_path: str):
def _validate_file_existence(input_file: str):
if not Path(input_file).exists():
raise FileNotFoundError(f"{input_file} is not found")


def _validate_fastq_extension(fastq_path: str):
if not re.search(r".fastq$|.fastq.gz$|.fq$|.fq.gz$", fastq_path):
raise AttributeError(f"{fastq_path} requires extensions either 'fastq', 'fastq.gz', 'fq' or 'fq.gz'")


# Varidate if the file is in the proper format. See top 100 lines
def _fastq_content(fastq_path: str):
name, seq, qual = [], [], []
for i, (n, s, q) in enumerate(mappy.fastx_read(fastq_path)):
name.append(n)
seq.append(s)
qual.append(q)
if i == 100:
break
if not (len(name) == len(seq) == len(qual) > 0):
raise AttributeError(f"{fastq_path} is not a FASTQ format")


def _fasta_content(fasta_path: str):
name, seq = [], []
for n, s, _ in mappy.fastx_read(fasta_path):
name.append(n)
seq.append(s)
if not len(name) == len(seq) > 0:
raise AttributeError(f"{fasta_path} is not a FASTA format")
if len(name) > len(set(name)):
raise AttributeError(f"{fasta_path} must include unique identifiers")
if len(seq) > len(set(seq)):
raise AttributeError(f"{fasta_path} must include unique DNA sequences")
if name.count("control") == 0:
raise AttributeError(f"One of the headers in the {fasta_path} must be '>control'")
raise ValueError(f"{fastq_path} requires extensions either 'fastq', 'fastq.gz', 'fq' or 'fq.gz'")


# Varidate if the file is in the proper format.
# See top 100 lines
def _validate_fastq_content(fastq_path: str):
try:
names, seqs, quals = zip(*[(n, s, q) for i, (n, s, q) in enumerate(mappy.fastx_read(fastq_path)) if i < 100])
if not (len(names) == len(seqs) == len(quals) > 0):
raise ValueError
except ValueError:
raise ValueError(f"{fastq_path} is not a FASTQ format")


def _validate_fasta_content(fasta_path: str):
try:
names, seqs = zip(*[(n, s) for n, s, _ in mappy.fastx_read(fasta_path)])
if len(names) != len(seqs) or not names:
raise ValueError
except ValueError:
raise ValueError(f"{fasta_path} is not a proper FASTA format")
if len(names) != len(set(names)):
raise ValueError(f"{fasta_path} must include unique identifiers")
if len(seqs) != len(set(seqs)):
raise ValueError(f"{fasta_path} must include unique DNA sequences")
if "control" not in names:
raise ValueError(f"One of the headers in the {fasta_path} must be '>control'")


def validate_files(SAMPLE: str, CONTROL: str, ALLELE: str) -> None:
_exist_file(CONTROL)
_exist_file(SAMPLE)
_exist_file(ALLELE)
_fastq_extension(CONTROL)
_fastq_content(CONTROL)
_fastq_extension(SAMPLE)
_fastq_content(SAMPLE)
_fasta_content(ALLELE)
for file in [CONTROL, SAMPLE, ALLELE]:
_validate_file_existence(file)
for file in [CONTROL, SAMPLE]:
_validate_fastq_extension(file)
_validate_fastq_content(file)
_validate_fasta_content(ALLELE)


########################################################################
Expand Down Expand Up @@ -92,58 +89,72 @@ def exists_cached_genome(genome: str, tempdir: Path, exists_cache_control: bool)
########################################################################


def _check_url_availability(url: str) -> bool:
def _is_webpage_available(url: str) -> bool:
try:
_ = urlopen(url, timeout=10)
return True
except (URLError, TimeoutError):
with urlopen(url) as response:
return 200 <= response.status < 300
except URLError:
return False


def get_first_available_url(urls: list[str]) -> str | None:
return next((url for url in urls if _check_url_availability(url)), None)
return next((url for url in urls if _is_webpage_available(url)), None)


def is_genome_listed_in_UCSC(genome: str, ucsc_url: str) -> bool:
url = f"{ucsc_url}/cgi-bin/das/{genome}/dna?segment=1:1,10"
try:
response = urlopen(url, timeout=10)
return bool(response.read())
except (URLError, TimeoutError):
return False
def _fetch_xml_data(url: str) -> bytes:
"""Fetch XML data from a given URL."""
with urlopen(url) as response:
return response.read()


def _extract_genome_ids_from_xml(xml_data: bytes) -> set:
"""Extract genome IDs from XML data."""
root = ET.fromstring(xml_data)
return {cc.attrib["id"] for child in root for cc in child if cc.tag == "SOURCE"}


def _get_genome_ids_in_ucsc(url_das: str) -> set:
"""Get available genome IDs in UCSC."""
xml_data = _fetch_xml_data(url_das)
return _extract_genome_ids_from_xml(xml_data)


def is_genome_in_ucsc_ids(genome: str, url_das: str) -> bool:
genome_ids = _get_genome_ids_in_ucsc(url_das)
return genome in genome_ids


def validate_genome_and_fetch_urls(genome: str) -> dict[str, str]:
ucsc_blat_servers = [
"https://genome.ucsc.edu/cgi-bin/hgBlat",
"https://genome-asia.ucsc.edu/cgi-bin/hgBlat",
"https://genome-euro.ucsc.edu/cgi-bin/hgBlat",
]
ucsc_das_servers = [
"https://genome.ucsc.edu/cgi-bin/das/dsn/",
"https://genome-asia.ucsc.edu/cgi-bin/das/dsn/",
"https://genome-euro.ucsc.edu/cgi-bin/das/dsn",
]
goldenpath_servers = [
"https://hgdownload.cse.ucsc.edu/goldenPath",
"http://hgdownload-euro.soe.ucsc.edu/goldenPath",
]
available_servers = {
"blat": get_first_available_url(ucsc_blat_servers),
"das": get_first_available_url(ucsc_das_servers),
"goldenpath": get_first_available_url(goldenpath_servers),
server_lists = {
"blat": [
"https://genome.ucsc.edu/cgi-bin/hgBlat",
"https://genome-asia.ucsc.edu/cgi-bin/hgBlat",
"https://genome-euro.ucsc.edu/cgi-bin/hgBlat",
],
"das": [
"https://genome.ucsc.edu/cgi-bin/das/dsn/",
"https://genome-asia.ucsc.edu/cgi-bin/das/dsn/",
"https://genome-euro.ucsc.edu/cgi-bin/das/dsn",
],
"goldenpath": [
"https://hgdownload.cse.ucsc.edu/goldenPath",
"http://hgdownload-euro.soe.ucsc.edu/goldenPath",
],
}

if not available_servers["blat"]:
raise URLError("All UCSC blat servers are currently down. Please wait for a while and try again.")
available_servers = {key: get_first_available_url(urls) for key, urls in server_lists.items()}

if not available_servers["goldenpath"]:
raise URLError("All UCSC GoldenPath servers are currently down. Please wait for a while and try again.")
error_messages = {
"blat": "All UCSC blat servers are currently down. Please wait for a while and try again.",
"das": "All UCSC DAS servers are currently down. Please wait for a while and try again.",
"goldenpath": "All UCSC GoldenPath servers are currently down. Please wait for a while and try again.",
}

if not available_servers["das"]:
raise URLError("All UCSC DAS servers are currently down. Please wait for a while and try again.")
for key, message in error_messages.items():
if not available_servers[key]:
raise URLError(message)

if not is_genome_listed_in_UCSC(genome, available_servers["das"]):
raise AttributeError(f"{genome} is not listed. Available genomes are in {available_servers['das']}")
if not is_genome_in_ucsc_ids(genome, available_servers["das"]):
raise ValueError(f"{genome} is not listed. Available genomes are in {available_servers['das']}")

return available_servers
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest


def pytest_addoption(parser):
parser.addoption("--runslow", action="store_true", default=False, help="run slow tests")


def pytest_configure(config):
config.addinivalue_line("markers", "slow: mark test as slow to run")


def pytest_runtest_setup(item):
if "slow" in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
80 changes: 42 additions & 38 deletions tests/src/preprocess/test_validate_inputs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import pytest
from DAJIN2.preprocess import validate_inputs
from importlib import reload

reload(validate_inputs)


###############################################################################
Expand All @@ -13,7 +10,7 @@
def test_exists():
with pytest.raises(FileNotFoundError) as e:
test = "filenotfound.txt"
validate_inputs._exist_file(test)
validate_inputs._validate_file_existence(test)
assert str(e.value) == f"{test} is not found"


Expand All @@ -23,92 +20,99 @@ def test_exists():


def test_fastq_extension():
with pytest.raises(AttributeError) as e:
with pytest.raises(ValueError) as e:
test = "test.fqq"
validate_inputs._fastq_extension("test.fqq")
validate_inputs._validate_fastq_extension("test.fqq")
assert str(e.value) == f"{test} requires extensions either 'fastq', 'fastq.gz', 'fq' or 'fq.gz'"


def test_fastq_error_not_fastq_format():
with pytest.raises(AttributeError):
with pytest.raises(ValueError):
fastq_path = "tests/data/preprocess/validate_inputs/empty.fq"
_ = validate_inputs._fastq_content(fastq_path)
_ = validate_inputs._validate_fastq_content(fastq_path)


def test_fastq_without_error():
fasta_path = "tests/data/preprocess/validate_inputs/control.fq.gz"
assert validate_inputs._fastq_content(fasta_path) is None
assert validate_inputs._validate_fastq_content(fasta_path) is None


###############################################################################
# validate FASTA
###############################################################################


def test_fasta_error_not_fasta_format():
with pytest.raises(AttributeError) as e:
def test_non_proper_fasta_format():
with pytest.raises(ValueError) as e:
fasta_path = "tests/data/preprocess/validate_inputs/empty.fa"
_ = validate_inputs._fasta_content(fasta_path)
assert str(e.value) == f"{fasta_path} is not a FASTA format"
_ = validate_inputs._validate_fasta_content(fasta_path)
assert str(e.value) == f"{fasta_path} is not a proper FASTA format"


def test_fasta_error_duplicated_identifiers():
with pytest.raises(AttributeError) as e:
with pytest.raises(ValueError) as e:
fasta_path = "tests/data/preprocess/validate_inputs/duplicated_name.fa"
_ = validate_inputs._fasta_content(fasta_path)
_ = validate_inputs._validate_fasta_content(fasta_path)
assert str(e.value) == f"{fasta_path} must include unique identifiers"


def test_fasta_error_duplicated_sequences():
with pytest.raises(AttributeError) as e:
with pytest.raises(ValueError) as e:
fasta_path = "tests/data/preprocess/validate_inputs/duplicated_seq.fa"
_ = validate_inputs._fasta_content(fasta_path)
_ = validate_inputs._validate_fasta_content(fasta_path)
assert str(e.value) == f"{fasta_path} must include unique DNA sequences"


def test_fasta_error_without_control():
with pytest.raises(AttributeError) as e:
with pytest.raises(ValueError) as e:
fasta_path = "tests/data/preprocess/validate_inputs/no_control.fa"
_ = validate_inputs._fasta_content(fasta_path)
_ = validate_inputs._validate_fasta_content(fasta_path)
assert str(e.value) == f"One of the headers in the {fasta_path} must be '>control'"


def test_fasta_without_error():
fasta_path = "tests/data/preprocess/validate_inputs/design_stx2.fa"
assert validate_inputs._fasta_content(fasta_path) is None
assert validate_inputs._validate_fasta_content(fasta_path) is None


###############################################################################
# validate URL
###############################################################################


@pytest.mark.skip("This test takes long time due to URL access")
@pytest.mark.slow
def test_available_url_pass():
flag = validate_inputs._check_url_availabilities(["https://example.com"])
assert flag == [True]
assert validate_inputs._is_webpage_available("https://example.com") is True


@pytest.mark.skip("This test takes long time due to URL access")
@pytest.mark.slow
def test_available_url_fail():
flag = validate_inputs._check_url_availabilities(["https://example_xxx.com"])
assert flag == [False]
assert validate_inputs._is_webpage_available("https://example_xxx.com") is False


@pytest.mark.slow
def test_get_first_available_url():
test = validate_inputs.get_first_available_url(["https://example_xxx.com", "https://example.com"])
answer = "https://example.com"
assert test == answer


@pytest.mark.slow
def test_get_first_available_url_not_found():
test = validate_inputs.get_first_available_url(["https://example_xxx.com", "https://example_yyy.com"])
answer = None
assert test == answer


@pytest.mark.skip("This test takes long time due to URL access")
@pytest.mark.slow
def test_available_genome_pass():
genome = "mm10"
ucsc_url = "https://genome.ucsc.edu/"
assert validate_inputs._is_listed(genome, ucsc_url) is None
url_das = "https://genome.ucsc.edu/cgi-bin/das/dsn"
assert validate_inputs.is_genome_in_ucsc_ids(genome, url_das) is True


@pytest.mark.skip("This test takes long time due to URL access")
@pytest.mark.slow
def test_available_genome_fail():
genome = "xxxx"
ucsc_url = "https://genome.ucsc.edu/"
with pytest.raises(AttributeError) as e:
validate_inputs._is_listed(genome, ucsc_url)
assert (
str(e.value)
== f"{genome} is not listed in UCSC genome browser. Available genomes are in {ucsc_url}/cgi-bin/das/dsn"
)
genome = "mm12345"
url_das = "https://genome.ucsc.edu/cgi-bin/das/dsn"
assert validate_inputs.is_genome_in_ucsc_ids(genome, url_das) is False

0 comments on commit 5f55a71

Please sign in to comment.