diff --git a/src/DAJIN2/preprocess/validate_inputs.py b/src/DAJIN2/preprocess/validate_inputs.py index 40e0040b..1a2a0566 100644 --- a/src/DAJIN2/preprocess/validate_inputs.py +++ b/src/DAJIN2/preprocess/validate_inputs.py @@ -5,62 +5,59 @@ from pathlib import Path from urllib.error import URLError from urllib.request import urlopen +import xml.etree.ElementTree as ET import mappy -def _exist_file(input_file: str): - if not Path(input_file).exists(): - raise FileNotFoundError(f"{input_file} is not found") - - ######################################################################## # Check if the sample is in the proper format. ######################################################################## -def _fastq_extension(fastq_path: str): +def _validate_file_existence(input_file: str): + if not Path(input_file).exists(): + raise FileNotFoundError(f"{input_file} is not found") + + +def _validate_fastq_extension(fastq_path: str): if not re.search(r".fastq$|.fastq.gz$|.fq$|.fq.gz$", fastq_path): - raise AttributeError(f"{fastq_path} requires extensions either 'fastq', 'fastq.gz', 'fq' or 'fq.gz'") - - -# Varidate if the file is in the proper format. See top 100 lines -def _fastq_content(fastq_path: str): - name, seq, qual = [], [], [] - for i, (n, s, q) in enumerate(mappy.fastx_read(fastq_path)): - name.append(n) - seq.append(s) - qual.append(q) - if i == 100: - break - if not (len(name) == len(seq) == len(qual) > 0): - raise AttributeError(f"{fastq_path} is not a FASTQ format") - - -def _fasta_content(fasta_path: str): - name, seq = [], [] - for n, s, _ in mappy.fastx_read(fasta_path): - name.append(n) - seq.append(s) - if not len(name) == len(seq) > 0: - raise AttributeError(f"{fasta_path} is not a FASTA format") - if len(name) > len(set(name)): - raise AttributeError(f"{fasta_path} must include unique identifiers") - if len(seq) > len(set(seq)): - raise AttributeError(f"{fasta_path} must include unique DNA sequences") - if name.count("control") == 0: - raise AttributeError(f"One of the headers in the {fasta_path} must be '>control'") + raise ValueError(f"{fastq_path} requires extensions either 'fastq', 'fastq.gz', 'fq' or 'fq.gz'") + + +# Varidate if the file is in the proper format. +# See top 100 lines +def _validate_fastq_content(fastq_path: str): + try: + names, seqs, quals = zip(*[(n, s, q) for i, (n, s, q) in enumerate(mappy.fastx_read(fastq_path)) if i < 100]) + if not (len(names) == len(seqs) == len(quals) > 0): + raise ValueError + except ValueError: + raise ValueError(f"{fastq_path} is not a FASTQ format") + + +def _validate_fasta_content(fasta_path: str): + try: + names, seqs = zip(*[(n, s) for n, s, _ in mappy.fastx_read(fasta_path)]) + if len(names) != len(seqs) or not names: + raise ValueError + except ValueError: + raise ValueError(f"{fasta_path} is not a proper FASTA format") + if len(names) != len(set(names)): + raise ValueError(f"{fasta_path} must include unique identifiers") + if len(seqs) != len(set(seqs)): + raise ValueError(f"{fasta_path} must include unique DNA sequences") + if "control" not in names: + raise ValueError(f"One of the headers in the {fasta_path} must be '>control'") def validate_files(SAMPLE: str, CONTROL: str, ALLELE: str) -> None: - _exist_file(CONTROL) - _exist_file(SAMPLE) - _exist_file(ALLELE) - _fastq_extension(CONTROL) - _fastq_content(CONTROL) - _fastq_extension(SAMPLE) - _fastq_content(SAMPLE) - _fasta_content(ALLELE) + for file in [CONTROL, SAMPLE, ALLELE]: + _validate_file_existence(file) + for file in [CONTROL, SAMPLE]: + _validate_fastq_extension(file) + _validate_fastq_content(file) + _validate_fasta_content(ALLELE) ######################################################################## @@ -92,58 +89,72 @@ def exists_cached_genome(genome: str, tempdir: Path, exists_cache_control: bool) ######################################################################## -def _check_url_availability(url: str) -> bool: +def _is_webpage_available(url: str) -> bool: try: - _ = urlopen(url, timeout=10) - return True - except (URLError, TimeoutError): + with urlopen(url) as response: + return 200 <= response.status < 300 + except URLError: return False def get_first_available_url(urls: list[str]) -> str | None: - return next((url for url in urls if _check_url_availability(url)), None) + return next((url for url in urls if _is_webpage_available(url)), None) -def is_genome_listed_in_UCSC(genome: str, ucsc_url: str) -> bool: - url = f"{ucsc_url}/cgi-bin/das/{genome}/dna?segment=1:1,10" - try: - response = urlopen(url, timeout=10) - return bool(response.read()) - except (URLError, TimeoutError): - return False +def _fetch_xml_data(url: str) -> bytes: + """Fetch XML data from a given URL.""" + with urlopen(url) as response: + return response.read() + + +def _extract_genome_ids_from_xml(xml_data: bytes) -> set: + """Extract genome IDs from XML data.""" + root = ET.fromstring(xml_data) + return {cc.attrib["id"] for child in root for cc in child if cc.tag == "SOURCE"} + + +def _get_genome_ids_in_ucsc(url_das: str) -> set: + """Get available genome IDs in UCSC.""" + xml_data = _fetch_xml_data(url_das) + return _extract_genome_ids_from_xml(xml_data) + + +def is_genome_in_ucsc_ids(genome: str, url_das: str) -> bool: + genome_ids = _get_genome_ids_in_ucsc(url_das) + return genome in genome_ids def validate_genome_and_fetch_urls(genome: str) -> dict[str, str]: - ucsc_blat_servers = [ - "https://genome.ucsc.edu/cgi-bin/hgBlat", - "https://genome-asia.ucsc.edu/cgi-bin/hgBlat", - "https://genome-euro.ucsc.edu/cgi-bin/hgBlat", - ] - ucsc_das_servers = [ - "https://genome.ucsc.edu/cgi-bin/das/dsn/", - "https://genome-asia.ucsc.edu/cgi-bin/das/dsn/", - "https://genome-euro.ucsc.edu/cgi-bin/das/dsn", - ] - goldenpath_servers = [ - "https://hgdownload.cse.ucsc.edu/goldenPath", - "http://hgdownload-euro.soe.ucsc.edu/goldenPath", - ] - available_servers = { - "blat": get_first_available_url(ucsc_blat_servers), - "das": get_first_available_url(ucsc_das_servers), - "goldenpath": get_first_available_url(goldenpath_servers), + server_lists = { + "blat": [ + "https://genome.ucsc.edu/cgi-bin/hgBlat", + "https://genome-asia.ucsc.edu/cgi-bin/hgBlat", + "https://genome-euro.ucsc.edu/cgi-bin/hgBlat", + ], + "das": [ + "https://genome.ucsc.edu/cgi-bin/das/dsn/", + "https://genome-asia.ucsc.edu/cgi-bin/das/dsn/", + "https://genome-euro.ucsc.edu/cgi-bin/das/dsn", + ], + "goldenpath": [ + "https://hgdownload.cse.ucsc.edu/goldenPath", + "http://hgdownload-euro.soe.ucsc.edu/goldenPath", + ], } - if not available_servers["blat"]: - raise URLError("All UCSC blat servers are currently down. Please wait for a while and try again.") + available_servers = {key: get_first_available_url(urls) for key, urls in server_lists.items()} - if not available_servers["goldenpath"]: - raise URLError("All UCSC GoldenPath servers are currently down. Please wait for a while and try again.") + error_messages = { + "blat": "All UCSC blat servers are currently down. Please wait for a while and try again.", + "das": "All UCSC DAS servers are currently down. Please wait for a while and try again.", + "goldenpath": "All UCSC GoldenPath servers are currently down. Please wait for a while and try again.", + } - if not available_servers["das"]: - raise URLError("All UCSC DAS servers are currently down. Please wait for a while and try again.") + for key, message in error_messages.items(): + if not available_servers[key]: + raise URLError(message) - if not is_genome_listed_in_UCSC(genome, available_servers["das"]): - raise AttributeError(f"{genome} is not listed. Available genomes are in {available_servers['das']}") + if not is_genome_in_ucsc_ids(genome, available_servers["das"]): + raise ValueError(f"{genome} is not listed. Available genomes are in {available_servers['das']}") return available_servers diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..b32fbd0e --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,14 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption("--runslow", action="store_true", default=False, help="run slow tests") + + +def pytest_configure(config): + config.addinivalue_line("markers", "slow: mark test as slow to run") + + +def pytest_runtest_setup(item): + if "slow" in item.keywords and not item.config.getoption("--runslow"): + pytest.skip("need --runslow option to run") diff --git a/tests/src/preprocess/test_validate_inputs.py b/tests/src/preprocess/test_validate_inputs.py index daf51e3e..1407cd8c 100644 --- a/tests/src/preprocess/test_validate_inputs.py +++ b/tests/src/preprocess/test_validate_inputs.py @@ -1,8 +1,5 @@ import pytest from DAJIN2.preprocess import validate_inputs -from importlib import reload - -reload(validate_inputs) ############################################################################### @@ -13,7 +10,7 @@ def test_exists(): with pytest.raises(FileNotFoundError) as e: test = "filenotfound.txt" - validate_inputs._exist_file(test) + validate_inputs._validate_file_existence(test) assert str(e.value) == f"{test} is not found" @@ -23,21 +20,21 @@ def test_exists(): def test_fastq_extension(): - with pytest.raises(AttributeError) as e: + with pytest.raises(ValueError) as e: test = "test.fqq" - validate_inputs._fastq_extension("test.fqq") + validate_inputs._validate_fastq_extension("test.fqq") assert str(e.value) == f"{test} requires extensions either 'fastq', 'fastq.gz', 'fq' or 'fq.gz'" def test_fastq_error_not_fastq_format(): - with pytest.raises(AttributeError): + with pytest.raises(ValueError): fastq_path = "tests/data/preprocess/validate_inputs/empty.fq" - _ = validate_inputs._fastq_content(fastq_path) + _ = validate_inputs._validate_fastq_content(fastq_path) def test_fastq_without_error(): fasta_path = "tests/data/preprocess/validate_inputs/control.fq.gz" - assert validate_inputs._fastq_content(fasta_path) is None + assert validate_inputs._validate_fastq_content(fasta_path) is None ############################################################################### @@ -45,37 +42,37 @@ def test_fastq_without_error(): ############################################################################### -def test_fasta_error_not_fasta_format(): - with pytest.raises(AttributeError) as e: +def test_non_proper_fasta_format(): + with pytest.raises(ValueError) as e: fasta_path = "tests/data/preprocess/validate_inputs/empty.fa" - _ = validate_inputs._fasta_content(fasta_path) - assert str(e.value) == f"{fasta_path} is not a FASTA format" + _ = validate_inputs._validate_fasta_content(fasta_path) + assert str(e.value) == f"{fasta_path} is not a proper FASTA format" def test_fasta_error_duplicated_identifiers(): - with pytest.raises(AttributeError) as e: + with pytest.raises(ValueError) as e: fasta_path = "tests/data/preprocess/validate_inputs/duplicated_name.fa" - _ = validate_inputs._fasta_content(fasta_path) + _ = validate_inputs._validate_fasta_content(fasta_path) assert str(e.value) == f"{fasta_path} must include unique identifiers" def test_fasta_error_duplicated_sequences(): - with pytest.raises(AttributeError) as e: + with pytest.raises(ValueError) as e: fasta_path = "tests/data/preprocess/validate_inputs/duplicated_seq.fa" - _ = validate_inputs._fasta_content(fasta_path) + _ = validate_inputs._validate_fasta_content(fasta_path) assert str(e.value) == f"{fasta_path} must include unique DNA sequences" def test_fasta_error_without_control(): - with pytest.raises(AttributeError) as e: + with pytest.raises(ValueError) as e: fasta_path = "tests/data/preprocess/validate_inputs/no_control.fa" - _ = validate_inputs._fasta_content(fasta_path) + _ = validate_inputs._validate_fasta_content(fasta_path) assert str(e.value) == f"One of the headers in the {fasta_path} must be '>control'" def test_fasta_without_error(): fasta_path = "tests/data/preprocess/validate_inputs/design_stx2.fa" - assert validate_inputs._fasta_content(fasta_path) is None + assert validate_inputs._validate_fasta_content(fasta_path) is None ############################################################################### @@ -83,32 +80,39 @@ def test_fasta_without_error(): ############################################################################### -@pytest.mark.skip("This test takes long time due to URL access") +@pytest.mark.slow def test_available_url_pass(): - flag = validate_inputs._check_url_availabilities(["https://example.com"]) - assert flag == [True] + assert validate_inputs._is_webpage_available("https://example.com") is True -@pytest.mark.skip("This test takes long time due to URL access") +@pytest.mark.slow def test_available_url_fail(): - flag = validate_inputs._check_url_availabilities(["https://example_xxx.com"]) - assert flag == [False] + assert validate_inputs._is_webpage_available("https://example_xxx.com") is False + + +@pytest.mark.slow +def test_get_first_available_url(): + test = validate_inputs.get_first_available_url(["https://example_xxx.com", "https://example.com"]) + answer = "https://example.com" + assert test == answer + + +@pytest.mark.slow +def test_get_first_available_url_not_found(): + test = validate_inputs.get_first_available_url(["https://example_xxx.com", "https://example_yyy.com"]) + answer = None + assert test == answer -@pytest.mark.skip("This test takes long time due to URL access") +@pytest.mark.slow def test_available_genome_pass(): genome = "mm10" - ucsc_url = "https://genome.ucsc.edu/" - assert validate_inputs._is_listed(genome, ucsc_url) is None + url_das = "https://genome.ucsc.edu/cgi-bin/das/dsn" + assert validate_inputs.is_genome_in_ucsc_ids(genome, url_das) is True -@pytest.mark.skip("This test takes long time due to URL access") +@pytest.mark.slow def test_available_genome_fail(): - genome = "xxxx" - ucsc_url = "https://genome.ucsc.edu/" - with pytest.raises(AttributeError) as e: - validate_inputs._is_listed(genome, ucsc_url) - assert ( - str(e.value) - == f"{genome} is not listed in UCSC genome browser. Available genomes are in {ucsc_url}/cgi-bin/das/dsn" - ) + genome = "mm12345" + url_das = "https://genome.ucsc.edu/cgi-bin/das/dsn" + assert validate_inputs.is_genome_in_ucsc_ids(genome, url_das) is False