Skip to content

Commit

Permalink
execution now focuses on only local testing and will implement mocks …
Browse files Browse the repository at this point in the history
…for testing cluster interaction
  • Loading branch information
Acribbs committed Dec 31, 2024
1 parent 2239975 commit cc5e2b0
Showing 1 changed file with 20 additions and 28 deletions.
48 changes: 20 additions & 28 deletions tests/test_pipeline_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import cgatcore.iotools as iotools
from unittest import mock

# Check for DRMAA without importing it
HAVE_DRMAA = False
try:
import drmaa
drmaa.Session() # Try to create a session to verify DRMAA is properly configured
HAVE_DRMAA = True
except ImportError:
HAVE_DRMAA = False
except (ImportError, RuntimeError):
pass

try:
import paramiko
Expand Down Expand Up @@ -68,17 +71,18 @@ def validate_benchmark_data(data, statement):
assert data.statement == statement


@pytest.mark.parametrize("to_cluster", [False, pytest.param(True, marks=pytest.mark.skipif(not HAVE_DRMAA, reason="DRMAA not available"))])
def test_single_job_returns_runtime_information(to_cluster):
statement = "lsof > /dev/null && openssl speed md5"
benchmark_data = P.run(statement, to_cluster=to_cluster)
def test_single_job_returns_runtime_information():
"""Test that a single job returns runtime information when run locally."""
statement = "echo 'test' && sleep 1"
benchmark_data = P.run(statement, to_cluster=False)
assert isinstance(benchmark_data, list)
assert len(benchmark_data) == 1
validate_benchmark_data(benchmark_data.pop(), statement)


def test_multiple_jobs_return_runtime_information():
statements = ["lsof > /dev/null && openssl speed md5"] * 3
"""Test that multiple jobs return runtime information when run locally."""
statements = ["echo 'test' && sleep 1"] * 3
benchmark_data = P.run(statements, to_cluster=False)
assert isinstance(benchmark_data, list)
assert len(benchmark_data) == len(statements)
Expand All @@ -87,37 +91,25 @@ def test_multiple_jobs_return_runtime_information():


def test_array_job_returns_runtime_information():
statements = ["lsof > /dev/null && openssl speed md5"] * 3
"""Test that array jobs return runtime information when run locally."""
statements = ["echo 'test' && sleep 1"] * 3
benchmark_data = P.run(statements, job_array=True, to_cluster=False)
assert isinstance(benchmark_data, list)
assert len(benchmark_data) == len(statements)
for data, stmt in zip(benchmark_data, statements):
validate_benchmark_data(data, stmt)


@pytest.mark.skipif(not HAVE_DRMAA or QUEUE_MANAGER is None, reason="No cluster/DRMAA available for testing")
def test_job_should_fail_if_cancelled():
cancel_cmd = "scancel $SLURM_JOB_ID" if QUEUE_MANAGER == "slurm" else "qdel $SGE_TASK_ID"
with pytest.raises(OSError):
P.run(cancel_cmd, to_cluster=True)


@pytest.mark.skipif(not HAVE_DRMAA or QUEUE_MANAGER != "slurm", reason="Test requires SLURM and DRMAA")
def test_job_should_pass_if_memory_bounds_hit_with_io(work_dir):
statement = "python -c 'import numpy; a = numpy.random.rand(1000,1000)'"
P.run(statement, job_memory="unlimited")


@pytest.mark.parametrize("to_cluster", [False, pytest.param(True, marks=pytest.mark.skipif(not HAVE_DRMAA, reason="DRMAA not available"))])
def test_job_should_fail_if_wrong_arguments(to_cluster):
def test_job_should_fail_if_wrong_arguments():
"""Test that jobs fail with incorrect arguments."""
with pytest.raises(OSError):
P.run("ls -z", to_cluster=to_cluster)
P.run("ls -z", to_cluster=False)


@pytest.mark.parametrize("to_cluster", [False, pytest.param(True, marks=pytest.mark.skipif(not HAVE_DRMAA, reason="DRMAA not available"))])
def test_job_should_pass_if_unlimited_memory_required(to_cluster, work_dir):
statement = "python -c 'import numpy; a = numpy.random.rand(1000,1000)'"
P.run(statement, to_cluster=to_cluster, job_memory="unlimited")
def test_job_should_pass_if_unlimited_memory_required(work_dir):
"""Test that jobs pass with unlimited memory."""
statement = "python -c 'import numpy; a = numpy.random.rand(10,10)'"
P.run(statement, to_cluster=False, job_memory="unlimited")


if __name__ == "__main__":
Expand Down

0 comments on commit cc5e2b0

Please sign in to comment.