diff --git a/pydra/conftest.py b/pydra/conftest.py index 66a1d200f..5e29659de 100644 --- a/pydra/conftest.py +++ b/pydra/conftest.py @@ -16,7 +16,7 @@ def pytest_addoption(parser): def pytest_generate_tests(metafunc): - if "plugin_dask_opt" in metafunc.fixturenames: + if "plugin" in metafunc.fixturenames: if bool(shutil.which("sbatch")): Plugins = ["slurm"] else: @@ -37,30 +37,6 @@ def pytest_generate_tests(metafunc): Plugins.remove("slurm") except ValueError: pass - metafunc.parametrize("plugin_dask_opt", Plugins) - - if "plugin" in metafunc.fixturenames: - use_dask = False - try: - use_dask = metafunc.config.getoption("dask") - except ValueError: - pass - if use_dask: - Plugins = [] - elif bool(shutil.which("sbatch")): - Plugins = ["slurm"] - else: - Plugins = ["cf"] - try: - if metafunc.config.getoption("psij"): - Plugins.append("psij-" + metafunc.config.getoption("psij")) - if ( - bool(shutil.which("sbatch")) - and metafunc.config.getoption("psij") == "slurm" - ): - Plugins.remove("slurm") - except ValueError: - pass metafunc.parametrize("plugin", Plugins) diff --git a/pydra/engine/tests/test_node_task.py b/pydra/engine/tests/test_node_task.py index 4e182781b..3be1447a4 100644 --- a/pydra/engine/tests/test_node_task.py +++ b/pydra/engine/tests/test_node_task.py @@ -366,15 +366,14 @@ def test_odir_init(): # Tests for tasks without state (i.e. no splitter) -@pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_1(plugin_dask_opt, tmp_path): +def test_task_nostate_1(plugin, tmp_path): """task without splitter""" nn = fun_addtwo(name="NA", a=3) nn.cache_dir = tmp_path assert np.allclose(nn.inputs.a, [3]) assert nn.state is None - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) # checking the results @@ -407,15 +406,14 @@ def test_task_nostate_1_call(): assert nn.output_dir.exists() -@pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_1_call_subm(plugin_dask_opt, tmp_path): +def test_task_nostate_1_call_subm(plugin, tmp_path): """task without splitter""" nn = fun_addtwo(name="NA", a=3) nn.cache_dir = tmp_path assert np.allclose(nn.inputs.a, [3]) assert nn.state is None - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: nn(submitter=sub) # checking the results @@ -425,15 +423,14 @@ def test_task_nostate_1_call_subm(plugin_dask_opt, tmp_path): assert nn.output_dir.exists() -@pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_1_call_plug(plugin_dask_opt, tmp_path): +def test_task_nostate_1_call_plug(plugin, tmp_path): """task without splitter""" nn = fun_addtwo(name="NA", a=3) nn.cache_dir = tmp_path assert np.allclose(nn.inputs.a, [3]) assert nn.state is None - nn(plugin=plugin_dask_opt) + nn(plugin=plugin) # checking the results results = nn.result() @@ -557,8 +554,7 @@ def test_task_nostate_7(): # Testing caching for tasks without states -@pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_cachedir(plugin_dask_opt, tmp_path): +def test_task_nostate_cachedir(plugin, tmp_path): """task with provided cache_dir using pytest tmp_path""" cache_dir = tmp_path / "test_task_nostate" cache_dir.mkdir() @@ -566,7 +562,7 @@ def test_task_nostate_cachedir(plugin_dask_opt, tmp_path): assert np.allclose(nn.inputs.a, [3]) assert nn.state is None - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) # checking the results @@ -574,8 +570,7 @@ def test_task_nostate_cachedir(plugin_dask_opt, tmp_path): assert results.output.out == 5 -@pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_cachedir_relativepath(tmp_path, plugin_dask_opt): +def test_task_nostate_cachedir_relativepath(tmp_path, plugin): """task with provided cache_dir as relative path""" os.chdir(tmp_path) cache_dir = "test_task_nostate" @@ -585,7 +580,7 @@ def test_task_nostate_cachedir_relativepath(tmp_path, plugin_dask_opt): assert np.allclose(nn.inputs.a, [3]) assert nn.state is None - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) # checking the results @@ -595,8 +590,7 @@ def test_task_nostate_cachedir_relativepath(tmp_path, plugin_dask_opt): shutil.rmtree(cache_dir) -@pytest.mark.flaky(reruns=2) # when dask -def test_task_nostate_cachelocations(plugin_dask_opt, tmp_path): +def test_task_nostate_cachelocations(plugin, tmp_path): """ Two identical tasks with provided cache_dir; the second task has cache_locations and should not recompute the results @@ -607,11 +601,11 @@ def test_task_nostate_cachelocations(plugin_dask_opt, tmp_path): cache_dir2.mkdir() nn = fun_addtwo(name="NA", a=3, cache_dir=cache_dir) - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) nn2 = fun_addtwo(name="NA", a=3, cache_dir=cache_dir2, cache_locations=cache_dir) - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn2) # checking the results @@ -737,9 +731,8 @@ def test_task_nostate_cachelocations_updated(plugin, tmp_path): # Tests for tasks with states (i.e. with splitter) -@pytest.mark.flaky(reruns=2) # when dask @pytest.mark.parametrize("input_type", ["list", "array"]) -def test_task_state_1(plugin_dask_opt, input_type, tmp_path): +def test_task_state_1(plugin, input_type, tmp_path): """task with the simplest splitter""" a_in = [3, 5] if input_type == "array": @@ -752,7 +745,7 @@ def test_task_state_1(plugin_dask_opt, input_type, tmp_path): assert nn.state.splitter_rpn == ["NA.a"] assert (nn.inputs.a == np.array([3, 5])).all() - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) # checking the results @@ -1082,8 +1075,7 @@ def test_task_state_6a(plugin, tmp_path): assert odir.exists() -@pytest.mark.flaky(reruns=2) # when dask -def test_task_state_comb_1(plugin_dask_opt, tmp_path): +def test_task_state_comb_1(plugin, tmp_path): """task with the simplest splitter and combiner""" nn = fun_addtwo(name="NA").split(a=[3, 5], splitter="a").combine(combiner="a") nn.cache_dir = tmp_path @@ -1096,7 +1088,7 @@ def test_task_state_comb_1(plugin_dask_opt, tmp_path): assert nn.state.splitter_final is None assert nn.state.splitter_rpn_final == [] - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) assert nn.state.states_ind == [{"NA.a": 0}, {"NA.a": 1}] @@ -1459,8 +1451,7 @@ def test_task_state_comb_contdim_2(tmp_path): # Testing caching for tasks with states -@pytest.mark.flaky(reruns=2) # when dask -def test_task_state_cachedir(plugin_dask_opt, tmp_path): +def test_task_state_cachedir(plugin, tmp_path): """task with a state and provided cache_dir using pytest tmp_path""" cache_dir = tmp_path / "test_task_nostate" cache_dir.mkdir() @@ -1469,7 +1460,7 @@ def test_task_state_cachedir(plugin_dask_opt, tmp_path): assert nn.state.splitter == "NA.a" assert (nn.inputs.a == np.array([3, 5])).all() - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(nn) # checking the results diff --git a/pydra/engine/tests/test_shelltask.py b/pydra/engine/tests/test_shelltask.py index 5129113a0..4b3a8a2e5 100644 --- a/pydra/engine/tests/test_shelltask.py +++ b/pydra/engine/tests/test_shelltask.py @@ -26,15 +26,14 @@ pytest.skip("SLURM not available in windows", allow_module_level=True) -@pytest.mark.flaky(reruns=2) # when dask @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_shell_cmd_1(plugin_dask_opt, results_function, tmp_path): +def test_shell_cmd_1(plugin, results_function, tmp_path): """simple command, no arguments""" cmd = ["pwd"] shelly = ShellCommandTask(name="shelly", executable=cmd, cache_dir=tmp_path) assert shelly.cmdline == " ".join(cmd) - res = results_function(shelly, plugin=plugin_dask_opt) + res = results_function(shelly, plugin=plugin) assert Path(res.output.stdout.rstrip()) == shelly.output_dir assert res.output.return_code == 0 assert res.output.stderr == "" @@ -108,7 +107,7 @@ def test_shell_cmd_2b(plugin, results_function, tmp_path): @pytest.mark.flaky(reruns=2) -def test_shell_cmd_3(plugin_dask_opt, tmp_path): +def test_shell_cmd_3(plugin, tmp_path): """commands without arguments splitter = executable """ @@ -119,7 +118,7 @@ def test_shell_cmd_3(plugin_dask_opt, tmp_path): shelly.cache_dir = tmp_path # assert shelly.cmdline == ["pwd", "whoami"] - res = shelly(plugin=plugin_dask_opt) + res = shelly(plugin=plugin) assert Path(res[0].output.stdout.rstrip()) == shelly.output_dir[0] if "USER" in os.environ: @@ -2174,8 +2173,7 @@ def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmp_path # customised input_spec in Workflow -@pytest.mark.flaky(reruns=2) # when dask -def test_wf_shell_cmd_2(plugin_dask_opt, tmp_path): +def test_wf_shell_cmd_2(plugin, tmp_path): """a workflow with input with defined output_file_template (str) that requires wf.lzin """ @@ -2213,7 +2211,7 @@ def test_wf_shell_cmd_2(plugin_dask_opt, tmp_path): wf.set_output([("out_f", wf.shelly.lzout.out1), ("out", wf.shelly.lzout.stdout)]) - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: wf(submitter=sub) res = wf.result() diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index d65247e96..535b8d2df 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -128,8 +128,7 @@ def test_wf_in_wf(plugin, tmpdir): assert res.output.out == 7 -@pytest.mark.flaky(reruns=2) # when dask -def test_wf2(plugin_dask_opt, tmpdir): +def test_wf2(plugin, tmpdir): """workflow as a node workflow-node with one task and no splitter """ @@ -143,15 +142,14 @@ def test_wf2(plugin_dask_opt, tmpdir): wf.set_output([("out", wf.wfnd.lzout.out)]) wf.cache_dir = tmpdir - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(wf) res = wf.result() assert res.output.out == 3 -@pytest.mark.flaky(reruns=2) # when dask -def test_wf_with_state(plugin_dask_opt, tmpdir): +def test_wf_with_state(plugin, tmpdir): wf = Workflow(name="wf_with_state", input_spec=["x"]) wf.add(sleep_add_one(name="taska", x=wf.lzin.x)) wf.add(sleep_add_one(name="taskb", x=wf.taska.lzout.out)) @@ -160,7 +158,7 @@ def test_wf_with_state(plugin_dask_opt, tmpdir): wf.set_output([("out", wf.taskb.lzout.out)]) wf.cache_dir = tmpdir - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(wf) res = wf.result() diff --git a/pydra/engine/tests/test_workflow.py b/pydra/engine/tests/test_workflow.py index 598021c83..741e88954 100644 --- a/pydra/engine/tests/test_workflow.py +++ b/pydra/engine/tests/test_workflow.py @@ -378,8 +378,7 @@ def test_wf_2d_outpasdict(plugin, tmpdir): assert wf.output_dir.exists() -@pytest.mark.flaky(reruns=3) # when dask -def test_wf_3(plugin_dask_opt, tmpdir): +def test_wf_3(plugin, tmpdir): """testing None value for an input""" wf = Workflow(name="wf_3", input_spec=["x", "y"]) wf.add(fun_addvar_none(name="addvar", a=wf.lzin.x, b=wf.lzin.y)) @@ -389,7 +388,7 @@ def test_wf_3(plugin_dask_opt, tmpdir): wf.inputs.y = None wf.cache_dir = tmpdir - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(wf) assert wf.output_dir.exists() @@ -1200,8 +1199,7 @@ def test_wf_3sernd_ndst_1a(plugin, tmpdir): # workflows with structures A -> C, B -> C -@pytest.mark.flaky(reruns=3) # when dask -def test_wf_3nd_st_1(plugin_dask_opt, tmpdir): +def test_wf_3nd_st_1(plugin, tmpdir): """workflow with three tasks, third one connected to two previous tasks, splitter on the workflow level """ @@ -1214,7 +1212,7 @@ def test_wf_3nd_st_1(plugin_dask_opt, tmpdir): wf.set_output([("out", wf.mult.lzout.out)]) wf.cache_dir = tmpdir - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(wf) results = wf.result() @@ -1228,8 +1226,7 @@ def test_wf_3nd_st_1(plugin_dask_opt, tmpdir): assert odir.exists() -@pytest.mark.flaky(reruns=3) # when dask -def test_wf_3nd_ndst_1(plugin_dask_opt, tmpdir): +def test_wf_3nd_ndst_1(plugin, tmpdir): """workflow with three tasks, third one connected to two previous tasks, splitter on the tasks levels """ @@ -1242,7 +1239,7 @@ def test_wf_3nd_ndst_1(plugin_dask_opt, tmpdir): wf.set_output([("out", wf.mult.lzout.out)]) wf.cache_dir = tmpdir - with Submitter(plugin=plugin_dask_opt) as sub: + with Submitter(plugin=plugin) as sub: sub(wf) results = wf.result()