Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check DaskWorker performance for all tests #708

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
26 changes: 1 addition & 25 deletions pydra/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def pytest_generate_tests(metafunc):
if "plugin_dask_opt" in metafunc.fixturenames:
if "plugin" in metafunc.fixturenames:
if bool(shutil.which("sbatch")):
Plugins = ["slurm"]
else:
Expand All @@ -34,33 +34,9 @@
bool(shutil.which("sbatch"))
and metafunc.config.getoption("psij") == "slurm"
):
Plugins.remove("slurm")

Check warning on line 37 in pydra/conftest.py

View check run for this annotation

Codecov / codecov/patch

pydra/conftest.py#L37

Added line #L37 was not covered by tests
except ValueError:
pass
metafunc.parametrize("plugin_dask_opt", Plugins)

if "plugin" in metafunc.fixturenames:
use_dask = False
try:
use_dask = metafunc.config.getoption("dask")
except ValueError:
pass
if use_dask:
Plugins = []
elif bool(shutil.which("sbatch")):
Plugins = ["slurm"]
else:
Plugins = ["cf"]
try:
if metafunc.config.getoption("psij"):
Plugins.append("psij-" + metafunc.config.getoption("psij"))
if (
bool(shutil.which("sbatch"))
and metafunc.config.getoption("psij") == "slurm"
):
Plugins.remove("slurm")
except ValueError:
pass
metafunc.parametrize("plugin", Plugins)


Expand Down
47 changes: 19 additions & 28 deletions pydra/engine/tests/test_node_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,14 @@ def test_odir_init():
# Tests for tasks without state (i.e. no splitter)


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_1(plugin_dask_opt, tmp_path):
def test_task_nostate_1(plugin, tmp_path):
"""task without splitter"""
nn = fun_addtwo(name="NA", a=3)
nn.cache_dir = tmp_path
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

# checking the results
Expand Down Expand Up @@ -407,15 +406,14 @@ def test_task_nostate_1_call():
assert nn.output_dir.exists()


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_1_call_subm(plugin_dask_opt, tmp_path):
def test_task_nostate_1_call_subm(plugin, tmp_path):
"""task without splitter"""
nn = fun_addtwo(name="NA", a=3)
nn.cache_dir = tmp_path
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
nn(submitter=sub)

# checking the results
Expand All @@ -425,15 +423,14 @@ def test_task_nostate_1_call_subm(plugin_dask_opt, tmp_path):
assert nn.output_dir.exists()


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_1_call_plug(plugin_dask_opt, tmp_path):
def test_task_nostate_1_call_plug(plugin, tmp_path):
"""task without splitter"""
nn = fun_addtwo(name="NA", a=3)
nn.cache_dir = tmp_path
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

nn(plugin=plugin_dask_opt)
nn(plugin=plugin)

# checking the results
results = nn.result()
Expand Down Expand Up @@ -557,25 +554,23 @@ def test_task_nostate_7():
# Testing caching for tasks without states


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_cachedir(plugin_dask_opt, tmp_path):
def test_task_nostate_cachedir(plugin, tmp_path):
"""task with provided cache_dir using pytest tmp_path"""
cache_dir = tmp_path / "test_task_nostate"
cache_dir.mkdir()
nn = fun_addtwo(name="NA", a=3, cache_dir=cache_dir)
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

# checking the results
results = nn.result()
assert results.output.out == 5


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_cachedir_relativepath(tmp_path, plugin_dask_opt):
def test_task_nostate_cachedir_relativepath(tmp_path, plugin):
"""task with provided cache_dir as relative path"""
os.chdir(tmp_path)
cache_dir = "test_task_nostate"
Expand All @@ -585,7 +580,7 @@ def test_task_nostate_cachedir_relativepath(tmp_path, plugin_dask_opt):
assert np.allclose(nn.inputs.a, [3])
assert nn.state is None

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

# checking the results
Expand All @@ -595,8 +590,7 @@ def test_task_nostate_cachedir_relativepath(tmp_path, plugin_dask_opt):
shutil.rmtree(cache_dir)


@pytest.mark.flaky(reruns=2) # when dask
def test_task_nostate_cachelocations(plugin_dask_opt, tmp_path):
def test_task_nostate_cachelocations(plugin, tmp_path):
"""
Two identical tasks with provided cache_dir;
the second task has cache_locations and should not recompute the results
Expand All @@ -607,11 +601,11 @@ def test_task_nostate_cachelocations(plugin_dask_opt, tmp_path):
cache_dir2.mkdir()

nn = fun_addtwo(name="NA", a=3, cache_dir=cache_dir)
with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

nn2 = fun_addtwo(name="NA", a=3, cache_dir=cache_dir2, cache_locations=cache_dir)
with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn2)

# checking the results
Expand Down Expand Up @@ -737,9 +731,8 @@ def test_task_nostate_cachelocations_updated(plugin, tmp_path):
# Tests for tasks with states (i.e. with splitter)


@pytest.mark.flaky(reruns=2) # when dask
@pytest.mark.parametrize("input_type", ["list", "array"])
def test_task_state_1(plugin_dask_opt, input_type, tmp_path):
def test_task_state_1(plugin, input_type, tmp_path):
"""task with the simplest splitter"""
a_in = [3, 5]
if input_type == "array":
Expand All @@ -752,7 +745,7 @@ def test_task_state_1(plugin_dask_opt, input_type, tmp_path):
assert nn.state.splitter_rpn == ["NA.a"]
assert (nn.inputs.a == np.array([3, 5])).all()

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

# checking the results
Expand Down Expand Up @@ -1082,8 +1075,7 @@ def test_task_state_6a(plugin, tmp_path):
assert odir.exists()


@pytest.mark.flaky(reruns=2) # when dask
def test_task_state_comb_1(plugin_dask_opt, tmp_path):
def test_task_state_comb_1(plugin, tmp_path):
"""task with the simplest splitter and combiner"""
nn = fun_addtwo(name="NA").split(a=[3, 5], splitter="a").combine(combiner="a")
nn.cache_dir = tmp_path
Expand All @@ -1096,7 +1088,7 @@ def test_task_state_comb_1(plugin_dask_opt, tmp_path):
assert nn.state.splitter_final is None
assert nn.state.splitter_rpn_final == []

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

assert nn.state.states_ind == [{"NA.a": 0}, {"NA.a": 1}]
Expand Down Expand Up @@ -1459,8 +1451,7 @@ def test_task_state_comb_contdim_2(tmp_path):
# Testing caching for tasks with states


@pytest.mark.flaky(reruns=2) # when dask
def test_task_state_cachedir(plugin_dask_opt, tmp_path):
def test_task_state_cachedir(plugin, tmp_path):
"""task with a state and provided cache_dir using pytest tmp_path"""
cache_dir = tmp_path / "test_task_nostate"
cache_dir.mkdir()
Expand All @@ -1469,7 +1460,7 @@ def test_task_state_cachedir(plugin_dask_opt, tmp_path):
assert nn.state.splitter == "NA.a"
assert (nn.inputs.a == np.array([3, 5])).all()

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(nn)

# checking the results
Expand Down
14 changes: 6 additions & 8 deletions pydra/engine/tests/test_shelltask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
pytest.skip("SLURM not available in windows", allow_module_level=True)


@pytest.mark.flaky(reruns=2) # when dask
@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter])
def test_shell_cmd_1(plugin_dask_opt, results_function, tmp_path):
def test_shell_cmd_1(plugin, results_function, tmp_path):
"""simple command, no arguments"""
cmd = ["pwd"]
shelly = ShellCommandTask(name="shelly", executable=cmd, cache_dir=tmp_path)
assert shelly.cmdline == " ".join(cmd)

res = results_function(shelly, plugin=plugin_dask_opt)
res = results_function(shelly, plugin=plugin)
assert Path(res.output.stdout.rstrip()) == shelly.output_dir
assert res.output.return_code == 0
assert res.output.stderr == ""
Expand Down Expand Up @@ -108,7 +107,7 @@ def test_shell_cmd_2b(plugin, results_function, tmp_path):


@pytest.mark.flaky(reruns=2)
def test_shell_cmd_3(plugin_dask_opt, tmp_path):
def test_shell_cmd_3(plugin, tmp_path):
"""commands without arguments
splitter = executable
"""
Expand All @@ -119,7 +118,7 @@ def test_shell_cmd_3(plugin_dask_opt, tmp_path):
shelly.cache_dir = tmp_path

# assert shelly.cmdline == ["pwd", "whoami"]
res = shelly(plugin=plugin_dask_opt)
res = shelly(plugin=plugin)
assert Path(res[0].output.stdout.rstrip()) == shelly.output_dir[0]

if "USER" in os.environ:
Expand Down Expand Up @@ -2174,8 +2173,7 @@ def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmp_path
# customised input_spec in Workflow


@pytest.mark.flaky(reruns=2) # when dask
def test_wf_shell_cmd_2(plugin_dask_opt, tmp_path):
def test_wf_shell_cmd_2(plugin, tmp_path):
"""a workflow with input with defined output_file_template (str)
that requires wf.lzin
"""
Expand Down Expand Up @@ -2213,7 +2211,7 @@ def test_wf_shell_cmd_2(plugin_dask_opt, tmp_path):

wf.set_output([("out_f", wf.shelly.lzout.out1), ("out", wf.shelly.lzout.stdout)])

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
wf(submitter=sub)

res = wf.result()
Expand Down
10 changes: 4 additions & 6 deletions pydra/engine/tests/test_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ def test_wf_in_wf(plugin, tmpdir):
assert res.output.out == 7


@pytest.mark.flaky(reruns=2) # when dask
def test_wf2(plugin_dask_opt, tmpdir):
def test_wf2(plugin, tmpdir):
"""workflow as a node
workflow-node with one task and no splitter
"""
Expand All @@ -143,15 +142,14 @@ def test_wf2(plugin_dask_opt, tmpdir):
wf.set_output([("out", wf.wfnd.lzout.out)])
wf.cache_dir = tmpdir

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

res = wf.result()
assert res.output.out == 3


@pytest.mark.flaky(reruns=2) # when dask
def test_wf_with_state(plugin_dask_opt, tmpdir):
def test_wf_with_state(plugin, tmpdir):
wf = Workflow(name="wf_with_state", input_spec=["x"])
wf.add(sleep_add_one(name="taska", x=wf.lzin.x))
wf.add(sleep_add_one(name="taskb", x=wf.taska.lzout.out))
Expand All @@ -160,7 +158,7 @@ def test_wf_with_state(plugin_dask_opt, tmpdir):
wf.set_output([("out", wf.taskb.lzout.out)])
wf.cache_dir = tmpdir

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

res = wf.result()
Expand Down
15 changes: 6 additions & 9 deletions pydra/engine/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,7 @@ def test_wf_2d_outpasdict(plugin, tmpdir):
assert wf.output_dir.exists()


@pytest.mark.flaky(reruns=3) # when dask
def test_wf_3(plugin_dask_opt, tmpdir):
def test_wf_3(plugin, tmpdir):
"""testing None value for an input"""
wf = Workflow(name="wf_3", input_spec=["x", "y"])
wf.add(fun_addvar_none(name="addvar", a=wf.lzin.x, b=wf.lzin.y))
Expand All @@ -389,7 +388,7 @@ def test_wf_3(plugin_dask_opt, tmpdir):
wf.inputs.y = None
wf.cache_dir = tmpdir

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

assert wf.output_dir.exists()
Expand Down Expand Up @@ -1200,8 +1199,7 @@ def test_wf_3sernd_ndst_1a(plugin, tmpdir):
# workflows with structures A -> C, B -> C


@pytest.mark.flaky(reruns=3) # when dask
def test_wf_3nd_st_1(plugin_dask_opt, tmpdir):
def test_wf_3nd_st_1(plugin, tmpdir):
"""workflow with three tasks, third one connected to two previous tasks,
splitter on the workflow level
"""
Expand All @@ -1214,7 +1212,7 @@ def test_wf_3nd_st_1(plugin_dask_opt, tmpdir):
wf.set_output([("out", wf.mult.lzout.out)])
wf.cache_dir = tmpdir

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

results = wf.result()
Expand All @@ -1228,8 +1226,7 @@ def test_wf_3nd_st_1(plugin_dask_opt, tmpdir):
assert odir.exists()


@pytest.mark.flaky(reruns=3) # when dask
def test_wf_3nd_ndst_1(plugin_dask_opt, tmpdir):
def test_wf_3nd_ndst_1(plugin, tmpdir):
"""workflow with three tasks, third one connected to two previous tasks,
splitter on the tasks levels
"""
Expand All @@ -1242,7 +1239,7 @@ def test_wf_3nd_ndst_1(plugin_dask_opt, tmpdir):
wf.set_output([("out", wf.mult.lzout.out)])
wf.cache_dir = tmpdir

with Submitter(plugin=plugin_dask_opt) as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

results = wf.result()
Expand Down
Loading