Skip to content

Commit

Permalink
modified tests if DRMAA is not available
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Dec 31, 2024
1 parent dcc7f32 commit 2239975
Showing 1 changed file with 17 additions and 41 deletions.
58 changes: 17 additions & 41 deletions tests/test_pipeline_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import getpass
import cgatcore.pipeline as P
import cgatcore.iotools as iotools
from unittest import mock

try:
import drmaa
HAVE_DRMAA = True
except ImportError:
HAVE_DRMAA = False

try:
import paramiko
Expand Down Expand Up @@ -61,7 +68,7 @@ def validate_benchmark_data(data, statement):
assert data.statement == statement


@pytest.mark.parametrize("to_cluster", [False, True])
@pytest.mark.parametrize("to_cluster", [False, pytest.param(True, marks=pytest.mark.skipif(not HAVE_DRMAA, reason="DRMAA not available"))])
def test_single_job_returns_runtime_information(to_cluster):
statement = "lsof > /dev/null && openssl speed md5"
benchmark_data = P.run(statement, to_cluster=to_cluster)
Expand All @@ -88,60 +95,29 @@ def test_array_job_returns_runtime_information():
validate_benchmark_data(data, stmt)


@pytest.mark.skipif(QUEUE_MANAGER is None, reason="No cluster configured for testing")
@pytest.mark.skipif(not HAVE_DRMAA or QUEUE_MANAGER is None, reason="No cluster/DRMAA available for testing")
def test_job_should_fail_if_cancelled():
if not P.will_run_on_cluster(P.get_parameters()):
pytest.skip("Test requires cluster execution")
cancel_cmd = "scancel $SLURM_JOB_ID" if QUEUE_MANAGER == "slurm" else "qdel $SGE_TASK_ID"
with pytest.raises(OSError):
P.run(cancel_cmd, to_cluster=True)


@pytest.mark.skipif(QUEUE_MANAGER != "slurm", reason="Test relevant only in SLURM")
@pytest.mark.skipif(not HAVE_DRMAA or QUEUE_MANAGER != "slurm", reason="Test requires SLURM and DRMAA")
def test_job_should_pass_if_memory_bounds_hit_with_io(work_dir):
outfile = os.path.join(work_dir, "out")
memory = 100000000
benchmark_data = P.run(
f"python -c 'from array import array; a = array(\"B\", (1 for x in range({memory}))); numpy.save(\"{outfile}\", a)'",
to_cluster=True,
cluster_memory_ulimit=False,
job_memory=f"{memory / 10**9}G")
assert benchmark_data
statement = "python -c 'import numpy; a = numpy.random.rand(1000,1000)'"
P.run(statement, job_memory="unlimited")


@pytest.mark.parametrize("to_cluster", [False, True])
@pytest.mark.parametrize("to_cluster", [False, pytest.param(True, marks=pytest.mark.skipif(not HAVE_DRMAA, reason="DRMAA not available"))])
def test_job_should_fail_if_wrong_arguments(to_cluster):
with pytest.raises(OSError):
P.run("hostname -z", to_cluster=to_cluster)
P.run("ls -z", to_cluster=to_cluster)


@pytest.mark.parametrize("to_cluster", [False, True])
@pytest.mark.parametrize("to_cluster", [False, pytest.param(True, marks=pytest.mark.skipif(not HAVE_DRMAA, reason="DRMAA not available"))])
def test_job_should_pass_if_unlimited_memory_required(to_cluster, work_dir):
outfile = os.path.join(work_dir, "out")
memory = 100000000
benchmark_data = P.run(
f"python -c 'from array import array; a = array(\"B\", (1 for x in range({memory}))); open(\"{outfile}\", \"w\").write(str(len(a)))'",
to_cluster=to_cluster,
cluster_memory_ulimit=True,
job_memory="unlimited")
assert benchmark_data
with iotools.open_file(outfile) as outf:
memory_used = int(outf.read().strip())
assert memory_used == memory


def test_job_should_fail_if_killed():
with pytest.raises(OSError):
P.run("kill -9 $$", to_cluster=False)


def test_file_should_exist_on_local_or_remote_system():
filename = "/tmp/test_file"
hostname = socket.gethostname()
with open(filename, 'w') as f:
f.write('Test')
assert remote_file_exists(filename, hostname, expect=True)
os.remove(filename)
statement = "python -c 'import numpy; a = numpy.random.rand(1000,1000)'"
P.run(statement, to_cluster=to_cluster, job_memory="unlimited")


if __name__ == "__main__":
Expand Down

0 comments on commit 2239975

Please sign in to comment.