From f9c3b1c0d656f46a465551849d2499623f95b121 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Thu, 14 Mar 2024 12:15:27 -0700 Subject: [PATCH 01/17] add resources --- src/linker/pipeline.py | 2 ++ src/linker/runner.py | 62 ++++++++++++++++-------------------------- 2 files changed, 26 insertions(+), 38 deletions(-) diff --git a/src/linker/pipeline.py b/src/linker/pipeline.py index 95dc28a0..b9bb7b07 100644 --- a/src/linker/pipeline.py +++ b/src/linker/pipeline.py @@ -121,6 +121,7 @@ def write_implementation_rules( validation_file = str( results_dir / "input_validations" / implementation.validation_filename ) + resources = self.config.slurm_resources if self.config.computing_environment == "slurm" else None validation_rule = InputValidationRule( name=implementation.name, input=input_files, @@ -132,6 +133,7 @@ def write_implementation_rules( execution_input=input_files, validation=validation_file, output=output_files, + resources=resources, envvars=implementation.environment_variables, diagnostics_dir=str(diagnostics_dir), script_cmd=implementation.script_cmd, diff --git a/src/linker/runner.py b/src/linker/runner.py index d333bd00..05542c55 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -37,6 +37,8 @@ def main( ## See above "--envvars", "foo", + "--quiet", + "progress" ] argv.extend(environment_args) logger.info(f"Running Snakemake") @@ -50,44 +52,28 @@ def get_environment_args(config, results_dir) -> List[str]: # TODO [MIC-4822]: launch a local spark cluster instead of relying on implementation elif config.computing_environment == "slurm": - # TODO: Add back slurm support - raise NotImplementedError( - "Slurm support is not yet implemented with snakemake integration" - ) - # # Set up a single drmaa.session that is persistent for the duration of the pipeline - # # TODO [MIC-4468]: Check for slurm in a more meaningful way - # hostname = socket.gethostname() - # if "slurm" not in hostname: - # raise RuntimeError( - # f"Specified a 'slurm' computing-environment but on host {hostname}" - # ) - # os.environ["DRMAA_LIBRARY_PATH"] = "/opt/slurm-drmaa/lib/libdrmaa.so" - # diagnostics = results_dir / "diagnostics/" - # job_name = "snakemake-linker" - # resources = config.slurm_resources - # drmaa_args = get_cli_args( - # job_name=job_name, - # account=resources["account"], - # partition=resources["partition"], - # peak_memory=resources["memory"], - # max_runtime=resources["time_limit"], - # num_threads=resources["cpus"], - # ) - # drmaa_cli_arguments = [ - # "--executor", - # "drmaa", - # "--drmaa-args", - # drmaa_args, - # "--drmaa-log-dir", - # str(diagnostics), - # ] - # # slurm_args = [ - # # "--executor", - # # "slurm", - # # "--profile", - # # "/ihme/homes/pnast/repos/linker/.config/snakemake/slurm" - # # ] - # return drmaa_cli_arguments + # Set up a single drmaa.session that is persistent for the duration of the pipeline + # TODO [MIC-4468]: Check for slurm in a more meaningful way + hostname = socket.gethostname() + if "slurm" not in hostname: + raise RuntimeError( + f"Specified a 'slurm' computing-environment but on host {hostname}" + ) + os.environ["DRMAA_LIBRARY_PATH"] = "/opt/slurm-drmaa/lib/libdrmaa.so" + diagnostics = results_dir / "diagnostics/" + job_name = "snakemake-linker" + resources = config.slurm_resources + slurm_args = [ + "--executor", + "slurm", + "--default-resources", + f"slurm_account={resources['account']}", + f"slurm_partition='{resources['partition']}'", + f"mem={resources['memory']}", + f"runtime={resources['time_limit']}", + f"nodes={resources['cpus']}", + ] + return slurm_args else: raise NotImplementedError( "only computing_environment 'local' and 'slurm' are supported; " From a1bb55897812ac1e449e94b9548281968c11f988 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Fri, 15 Mar 2024 14:21:26 -0700 Subject: [PATCH 02/17] work on logging --- src/linker/rule.py | 27 ++++++++++++++++++++++++--- src/linker/runner.py | 4 ++-- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/linker/rule.py b/src/linker/rule.py index f92e9bc1..7453db72 100644 --- a/src/linker/rule.py +++ b/src/linker/rule.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Callable, List +from typing import Callable, List, Optional class Rule(ABC): @@ -39,7 +39,8 @@ def _build_rule(self) -> str: rule all: input: final_output={self.target_files}, - validation='{self.validation}'""" + validation='{self.validation}' + message: 'Grabbing final output' """ @dataclass @@ -52,6 +53,7 @@ class ImplementedRule(Rule): execution_input: List of file paths required by implementation validation: name of file created by InputValidationRule to check for compatible input output: List of file paths created by implementation + resources: envvars: Dictionary of environment variables to set diagnostics_dir: Directory for diagnostic files script_cmd: Command to execute @@ -61,6 +63,7 @@ class ImplementedRule(Rule): execution_input: List[str] validation: str output: List[str] + resources: Optional[dict] envvars: dict diagnostics_dir: str script_cmd: str @@ -70,13 +73,31 @@ def _build_rule(self) -> str: f""" rule: name: "{self.name}" + message: "Running Implementation {self.name}" input: implementation_inputs={self.execution_input}, validation="{self.validation}" - output: {self.output}""" + output: {self.output} + log: + stdout="{self.diagnostics_dir}/implementation_logs/stdout", + stderr="{self.diagnostics_dir}/implementation_logs/stderr" """ + + self._build_resources() + self._build_shell_command() ) + def _build_resources(self) -> str: + # Include slurm partition for now, as snakemake has trouble + # parsing strings with periods like "all.q" + if not self.resources: + return "" + return f""" + resources: + slurm_partition='{self.resources['partition']}', + mem={self.resources['memory']}, + runtime={self.resources['time_limit']}, + nodes={self.resources['cpus']}, + """ + def _build_shell_command(self) -> str: shell_cmd = f""" shell: diff --git a/src/linker/runner.py b/src/linker/runner.py index 05542c55..9c86cf27 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -37,8 +37,8 @@ def main( ## See above "--envvars", "foo", - "--quiet", - "progress" + # "--quiet", + # "progress" ] argv.extend(environment_args) logger.info(f"Running Snakemake") From 636795ba5c5621be63cad55dae5ac92266fc2eb6 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 11:13:34 -0700 Subject: [PATCH 03/17] configure tmp dir --- src/linker/runner.py | 16 ++++++++++------ src/linker/utilities/paths.py | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/linker/runner.py b/src/linker/runner.py index 5716d927..e2e4ad1e 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -24,7 +24,7 @@ def main( copy_configuration_files_to_results_directory(config, results_dir) snakefile = pipeline.build_snakefile(results_dir) environment_args = get_environment_args(config, results_dir) - singularity_args = get_singularity_args(config.input_data, results_dir) + singularity_args = get_singularity_args(config, results_dir) # We need to set a dummy environment variable to avoid logging a wall of text. # TODO [MIC-4920]: Remove when https://github.com/snakemake/snakemake-interface-executor-plugins/issues/55 merges os.environ["foo"] = "bar" @@ -49,11 +49,15 @@ def main( snake_main(argv) -def get_singularity_args(input_data: List[Path], results_dir: Path) -> str: - input_file_paths = ",".join(file.as_posix() for file in input_data) +def get_singularity_args(config: Config, results_dir: Path) -> str: + input_file_paths = ",".join(file.as_posix() for file in config.input_data) singularity_args = "--no-home --containall" - LINKER_TEMP.mkdir(parents=True, exist_ok=True) - singularity_args += f" -B {LINKER_TEMP}:/tmp,{results_dir},{input_file_paths}" + # Bind linker temp dir to /tmp in the container + # Slurm will delete /tmp after job completion + # but we'll bind a subdirectory for local runs + linker_tmp_dir = LINKER_TEMP[config.computing_environment] + linker_tmp_dir.mkdir(parents=True, exist_ok=True) + singularity_args += f" -B {linker_tmp_dir}:/tmp,{results_dir},{input_file_paths}" return singularity_args @@ -84,7 +88,7 @@ def get_environment_args(config: Config, results_dir: Path) -> List[str]: f"mem={resources['memory']}", f"runtime={resources['time_limit']}", f"nodes={resources['cpus']}", - ] + ] return slurm_args else: raise NotImplementedError( diff --git a/src/linker/utilities/paths.py b/src/linker/utilities/paths.py index 8584c19a..8c2203ce 100644 --- a/src/linker/utilities/paths.py +++ b/src/linker/utilities/paths.py @@ -3,4 +3,4 @@ # TODO: We'll need to update this to be more generic for external users and have a way of configuring this CONTAINER_DIR = "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images" IMPLEMENTATION_METADATA = Path(__file__).parent.parent / "implementation_metadata.yaml" -LINKER_TEMP = Path("/tmp/linker") +LINKER_TEMP = {"local": Path("/tmp/linker"), "slurm": Path("/tmp")} From a887b4b0087295b5a95b553187d6abe8431dde90 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 11:13:49 -0700 Subject: [PATCH 04/17] cleanup --- src/linker/pipeline.py | 6 +++++- src/linker/rule.py | 11 ++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/linker/pipeline.py b/src/linker/pipeline.py index 3cbfb2ce..85254625 100644 --- a/src/linker/pipeline.py +++ b/src/linker/pipeline.py @@ -121,7 +121,11 @@ def write_implementation_rules( validation_file = str( results_dir / "input_validations" / implementation.validation_filename ) - resources = self.config.slurm_resources if self.config.computing_environment == "slurm" else None + resources = ( + self.config.slurm_resources + if self.config.computing_environment == "slurm" + else None + ) validation_rule = InputValidationRule( name=implementation.name, input=input_files, diff --git a/src/linker/rule.py b/src/linker/rule.py index 9f6a89d0..23d76c76 100644 --- a/src/linker/rule.py +++ b/src/linker/rule.py @@ -71,8 +71,10 @@ class ImplementedRule(Rule): script_cmd: str def _build_rule(self) -> str: - return ( - f""" + return self._build_io() + self._build_resources() + self._build_shell_command() + + def _build_io(self) -> str: + return f""" rule: name: "{self.name}" message: "Running Implementation {self.name}" @@ -80,12 +82,7 @@ def _build_rule(self) -> str: implementation_inputs={self.execution_input}, validation="{self.validation}" output: {self.output} - log: - stdout="{self.diagnostics_dir}/implementation_logs/stdout", - stderr="{self.diagnostics_dir}/implementation_logs/stderr", container: "{self.image_path}" """ - + self._build_shell_command() - ) def _build_resources(self) -> str: # Include slurm partition for now, as snakemake has trouble From 69f1a9dd1889e7f6ddcc022d0b0f897aee7ca23d Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 11:25:12 -0700 Subject: [PATCH 05/17] fix broken tests --- tests/unit/rule_strings/implemented_rule.txt | 7 +++++++ tests/unit/rule_strings/pipeline.txt | 3 +++ tests/unit/rule_strings/target_rule.txt | 3 ++- tests/unit/test_rule.py | 5 +++++ tests/unit/test_runner.py | 5 +++-- 5 files changed, 20 insertions(+), 3 deletions(-) diff --git a/tests/unit/rule_strings/implemented_rule.txt b/tests/unit/rule_strings/implemented_rule.txt index 27ceb072..af79f3dd 100644 --- a/tests/unit/rule_strings/implemented_rule.txt +++ b/tests/unit/rule_strings/implemented_rule.txt @@ -1,11 +1,18 @@ rule: name: "foo" + message: "Running Implementation foo" input: implementation_inputs=['foo', 'bar'], validation="bar" output: ['baz'] container: "Multipolarity.sif" + resources: + slurm_partition='slurmpart', + mem=5, + runtime=1, + nodes=1337, + shell: ''' export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS=foo,bar diff --git a/tests/unit/rule_strings/pipeline.txt b/tests/unit/rule_strings/pipeline.txt index c6a66c4e..341f8579 100644 --- a/tests/unit/rule_strings/pipeline.txt +++ b/tests/unit/rule_strings/pipeline.txt @@ -3,6 +3,7 @@ rule all: input: final_output=['{snake_dir}/result.parquet'], validation='{snake_dir}/input_validations/final_validator' +message: 'Grabbing final output' rule: name: "results_validator" input: ['{snake_dir}/result.parquet'] @@ -23,6 +24,7 @@ rule: validation_utils.validate_input_file_dummy(f) rule: name: "step_1_python_pandas" + message: "Running Implementation step_1_python_pandas" input: implementation_inputs=['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'], validation="{snake_dir}/input_validations/step_1_python_pandas_validator" @@ -46,6 +48,7 @@ rule: validation_utils.validate_input_file_dummy(f) rule: name: "step_2_python_pandas" + message: "Running Implementation step_2_python_pandas" input: implementation_inputs=['{snake_dir}/intermediate/1_step_1/result.parquet'], validation="{snake_dir}/input_validations/step_2_python_pandas_validator" diff --git a/tests/unit/rule_strings/target_rule.txt b/tests/unit/rule_strings/target_rule.txt index 16804ef5..05e08c6f 100644 --- a/tests/unit/rule_strings/target_rule.txt +++ b/tests/unit/rule_strings/target_rule.txt @@ -2,4 +2,5 @@ rule all: input: final_output=['foo', 'bar'], - validation='bar' \ No newline at end of file + validation='bar' + message: 'Grabbing final output' \ No newline at end of file diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 93a1a48c..9da4d585 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -46,6 +46,11 @@ def test_implemented_rule_build_rule(): execution_input=["foo", "bar"], validation="bar", output=["baz"], + resources={"partition": "slurmpart", + "time_limit": 1, + "memory": 5, + "cpus": 1337, + }, envvars={"eggs": "coconut"}, diagnostics_dir="spam", image_path="Multipolarity.sif", diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index c60b1a48..28ce1905 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -1,13 +1,14 @@ from tempfile import TemporaryDirectory from linker.runner import get_environment_args, get_singularity_args +from linker.utilities.paths import LINKER_TEMP def test_get_singularity_args(default_config, test_dir): with TemporaryDirectory() as results_dir: assert ( - get_singularity_args(default_config.input_data, results_dir) - == f"--no-home --containall -B /tmp/linker:/tmp," + get_singularity_args(default_config, results_dir) + == f"--no-home --containall -B {LINKER_TEMP[default_config.computing_environment]}:/tmp," f"{results_dir}," f"{test_dir}/input_data1/file1.csv," f"{test_dir}/input_data2/file2.csv" From 70121065a8e1da4db7c2d6ad77f0109dc58c45cc Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 11:25:43 -0700 Subject: [PATCH 06/17] lint --- tests/unit/test_rule.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 9da4d585..42a2b056 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -46,11 +46,12 @@ def test_implemented_rule_build_rule(): execution_input=["foo", "bar"], validation="bar", output=["baz"], - resources={"partition": "slurmpart", - "time_limit": 1, - "memory": 5, - "cpus": 1337, - }, + resources={ + "partition": "slurmpart", + "time_limit": 1, + "memory": 5, + "cpus": 1337, + }, envvars={"eggs": "coconut"}, diagnostics_dir="spam", image_path="Multipolarity.sif", From 8bf5787d963e018fb0167521bc75d83fa2f00667 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 15:44:18 -0700 Subject: [PATCH 07/17] format output logging --- src/linker/rule.py | 5 ++++- src/linker/runner.py | 6 ++---- tests/unit/rule_strings/implemented_rule.txt | 4 +++- tests/unit/rule_strings/pipeline.txt | 6 ++++-- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/linker/rule.py b/src/linker/rule.py index 23d76c76..51e2b9f6 100644 --- a/src/linker/rule.py +++ b/src/linker/rule.py @@ -82,6 +82,7 @@ def _build_io(self) -> str: implementation_inputs={self.execution_input}, validation="{self.validation}" output: {self.output} + log: "{self.diagnostics_dir}/{self.name}-output.log" container: "{self.image_path}" """ def _build_resources(self) -> str: @@ -95,6 +96,7 @@ def _build_resources(self) -> str: mem={self.resources['memory']}, runtime={self.resources['time_limit']}, nodes={self.resources['cpus']}, + slurm_extra="--output '{self.diagnostics_dir}/{self.name}-slurm-%j.log'" """ def _build_shell_command(self) -> str: @@ -107,8 +109,9 @@ def _build_shell_command(self) -> str: for var_name, var_value in self.envvars.items(): shell_cmd += f""" export {var_name}={var_value}""" + # Log stdout/stderr to diagnostics directory shell_cmd += f""" - {self.script_cmd} + {self.script_cmd} > {{log}} 2>&1 '''""" return shell_cmd diff --git a/src/linker/runner.py b/src/linker/runner.py index e2e4ad1e..dc88b384 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -38,8 +38,8 @@ def main( ## See above "--envvars", "foo", - # "--quiet", - # "progress" + "--quiet", + "progress", "--use-singularity", "--singularity-args", singularity_args, @@ -76,8 +76,6 @@ def get_environment_args(config: Config, results_dir: Path) -> List[str]: f"Specified a 'slurm' computing-environment but on host {hostname}" ) os.environ["DRMAA_LIBRARY_PATH"] = "/opt/slurm-drmaa/lib/libdrmaa.so" - diagnostics = results_dir / "diagnostics/" - job_name = "snakemake-linker" resources = config.slurm_resources slurm_args = [ "--executor", diff --git a/tests/unit/rule_strings/implemented_rule.txt b/tests/unit/rule_strings/implemented_rule.txt index af79f3dd..175027a7 100644 --- a/tests/unit/rule_strings/implemented_rule.txt +++ b/tests/unit/rule_strings/implemented_rule.txt @@ -6,12 +6,14 @@ rule: implementation_inputs=['foo', 'bar'], validation="bar" output: ['baz'] + log: "spam/foo-output.log" container: "Multipolarity.sif" resources: slurm_partition='slurmpart', mem=5, runtime=1, nodes=1337, + slurm_extra="--output 'spam/foo-slurm-%j.log'" shell: ''' @@ -19,5 +21,5 @@ rule: export DUMMY_CONTAINER_OUTPUT_PATHS=baz export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY=spam export eggs=coconut - echo hello world + echo hello world > {log} 2>&1 ''' \ No newline at end of file diff --git a/tests/unit/rule_strings/pipeline.txt b/tests/unit/rule_strings/pipeline.txt index 341f8579..1dec24ee 100644 --- a/tests/unit/rule_strings/pipeline.txt +++ b/tests/unit/rule_strings/pipeline.txt @@ -29,13 +29,14 @@ rule: implementation_inputs=['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'], validation="{snake_dir}/input_validations/step_1_python_pandas_validator" output: ['{snake_dir}/intermediate/1_step_1/result.parquet'] + log: "{snake_dir}/diagnostics/1_step_1/step_1_python_pandas-output.log" container: "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif" shell: ''' export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS={test_dir}/input_data1/file1.csv,{test_dir}/input_data2/file2.csv export DUMMY_CONTAINER_OUTPUT_PATHS={snake_dir}/intermediate/1_step_1/result.parquet export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY={snake_dir}/diagnostics/1_step_1 - python /dummy_step.py + python /dummy_step.py > {log} 2>&1 ''' rule: name: "step_2_python_pandas_validator" @@ -53,11 +54,12 @@ rule: implementation_inputs=['{snake_dir}/intermediate/1_step_1/result.parquet'], validation="{snake_dir}/input_validations/step_2_python_pandas_validator" output: ['{snake_dir}/result.parquet'] + log: "{snake_dir}/diagnostics/2_step_2/step_2_python_pandas-output.log" container: "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif" shell: ''' export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS={snake_dir}/intermediate/1_step_1/result.parquet export DUMMY_CONTAINER_OUTPUT_PATHS={snake_dir}/result.parquet export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY={snake_dir}/diagnostics/2_step_2 - python /dummy_step.py + python /dummy_step.py > {log} 2>&1 ''' \ No newline at end of file From cb719a0a922582c3780e9f828938624956553b29 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 16:02:22 -0700 Subject: [PATCH 08/17] adjust some comments --- src/linker/rule.py | 2 +- src/linker/runner.py | 4 +--- src/linker/utilities/paths.py | 2 ++ 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/linker/rule.py b/src/linker/rule.py index 51e2b9f6..043f8d80 100644 --- a/src/linker/rule.py +++ b/src/linker/rule.py @@ -53,7 +53,7 @@ class ImplementedRule(Rule): execution_input: List of file paths required by implementation validation: name of file created by InputValidationRule to check for compatible input output: List of file paths created by implementation - resources: + resources: Computational resources used by executor (e.g. SLURM) envvars: Dictionary of environment variables to set diagnostics_dir: Directory for diagnostic files image_path: Path to Singularity image diff --git a/src/linker/runner.py b/src/linker/runner.py index dc88b384..f7921184 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -38,6 +38,7 @@ def main( ## See above "--envvars", "foo", + ## Suppress some of the snakemake output "--quiet", "progress", "--use-singularity", @@ -52,9 +53,6 @@ def main( def get_singularity_args(config: Config, results_dir: Path) -> str: input_file_paths = ",".join(file.as_posix() for file in config.input_data) singularity_args = "--no-home --containall" - # Bind linker temp dir to /tmp in the container - # Slurm will delete /tmp after job completion - # but we'll bind a subdirectory for local runs linker_tmp_dir = LINKER_TEMP[config.computing_environment] linker_tmp_dir.mkdir(parents=True, exist_ok=True) singularity_args += f" -B {linker_tmp_dir}:/tmp,{results_dir},{input_file_paths}" diff --git a/src/linker/utilities/paths.py b/src/linker/utilities/paths.py index 8c2203ce..3bcc3fe5 100644 --- a/src/linker/utilities/paths.py +++ b/src/linker/utilities/paths.py @@ -3,4 +3,6 @@ # TODO: We'll need to update this to be more generic for external users and have a way of configuring this CONTAINER_DIR = "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images" IMPLEMENTATION_METADATA = Path(__file__).parent.parent / "implementation_metadata.yaml" +# Bind linker temp dir to /tmp in the container. Slurm will delete /tmp after job completion, +# but we'll bind a subdirectory for local runs LINKER_TEMP = {"local": Path("/tmp/linker"), "slurm": Path("/tmp")} From 6e430e983ecffb72b7cd6cec06f2d2b2d351ad82 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Mon, 18 Mar 2024 16:02:44 -0700 Subject: [PATCH 09/17] add local / "slurm" arguments test --- tests/unit/test_runner.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index 28ce1905..f7615682 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -2,6 +2,8 @@ from linker.runner import get_environment_args, get_singularity_args from linker.utilities.paths import LINKER_TEMP +from linker.configuration import Config +from pathlib import Path def test_get_singularity_args(default_config, test_dir): @@ -15,6 +17,22 @@ def test_get_singularity_args(default_config, test_dir): ) -def test_get_environment_args(default_config, test_dir): - assert default_config.computing_environment == "local" - assert get_environment_args(default_config, test_dir) == [] +def test_get_environment_args(default_config_params, test_dir): + config = Config(**default_config_params) + assert get_environment_args(config, test_dir) == [] + + slurm_config_params = default_config_params + slurm_config_params.update({"computing_environment": Path(f"{test_dir}/spark_environment.yaml")}) + slurm_config = Config(**slurm_config_params) + resources = slurm_config.slurm_resources + assert get_environment_args(slurm_config, test_dir) == [ + "--executor", + "slurm", + "--default-resources", + f"slurm_account={resources['account']}", + f"slurm_partition='{resources['partition']}'", + f"mem={resources['memory']}", + f"runtime={resources['time_limit']}", + f"nodes={resources['cpus']}", + ] + From abdc59c0d6f4e95f41d32ca065d47241e7fa7c8f Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Tue, 19 Mar 2024 09:41:18 -0700 Subject: [PATCH 10/17] add local / slurm specific string tests --- .../rule_strings/implemented_rule_local.txt | 18 +++++ ...ed_rule.txt => implemented_rule_slurm.txt} | 0 .../{pipeline.txt => pipeline_local.txt} | 0 tests/unit/rule_strings/pipeline_slurm.txt | 79 +++++++++++++++++++ tests/unit/test_pipeline.py | 28 ++++++- tests/unit/test_rule.py | 31 +++++++- 6 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 tests/unit/rule_strings/implemented_rule_local.txt rename tests/unit/rule_strings/{implemented_rule.txt => implemented_rule_slurm.txt} (100%) rename tests/unit/rule_strings/{pipeline.txt => pipeline_local.txt} (100%) create mode 100644 tests/unit/rule_strings/pipeline_slurm.txt diff --git a/tests/unit/rule_strings/implemented_rule_local.txt b/tests/unit/rule_strings/implemented_rule_local.txt new file mode 100644 index 00000000..cbace484 --- /dev/null +++ b/tests/unit/rule_strings/implemented_rule_local.txt @@ -0,0 +1,18 @@ + +rule: + name: "foo" + message: "Running Implementation foo" + input: + implementation_inputs=['foo', 'bar'], + validation="bar" + output: ['baz'] + log: "spam/foo-output.log" + container: "Multipolarity.sif" + shell: + ''' + export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS=foo,bar + export DUMMY_CONTAINER_OUTPUT_PATHS=baz + export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY=spam + export eggs=coconut + echo hello world > {log} 2>&1 + ''' \ No newline at end of file diff --git a/tests/unit/rule_strings/implemented_rule.txt b/tests/unit/rule_strings/implemented_rule_slurm.txt similarity index 100% rename from tests/unit/rule_strings/implemented_rule.txt rename to tests/unit/rule_strings/implemented_rule_slurm.txt diff --git a/tests/unit/rule_strings/pipeline.txt b/tests/unit/rule_strings/pipeline_local.txt similarity index 100% rename from tests/unit/rule_strings/pipeline.txt rename to tests/unit/rule_strings/pipeline_local.txt diff --git a/tests/unit/rule_strings/pipeline_slurm.txt b/tests/unit/rule_strings/pipeline_slurm.txt new file mode 100644 index 00000000..5aa3f4a4 --- /dev/null +++ b/tests/unit/rule_strings/pipeline_slurm.txt @@ -0,0 +1,79 @@ +from linker.utilities import validation_utils +rule all: + input: + final_output=['{snake_dir}/result.parquet'], + validation='{snake_dir}/input_validations/final_validator' +message: 'Grabbing final output' +rule: + name: "results_validator" + input: ['{snake_dir}/result.parquet'] + output: touch("{snake_dir}/input_validations/final_validator") + localrule: True + message: "Validating results input" + run: + for f in input: + validation_utils.validate_input_file_dummy(f) +rule: + name: "step_1_python_pandas_validator" + input: ['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'] + output: touch("{snake_dir}/input_validations/step_1_python_pandas_validator") + localrule: True + message: "Validating step_1_python_pandas input" + run: + for f in input: + validation_utils.validate_input_file_dummy(f) +rule: + name: "step_1_python_pandas" + message: "Running Implementation step_1_python_pandas" + input: + implementation_inputs=['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'], + validation="{snake_dir}/input_validations/step_1_python_pandas_validator" + output: ['{snake_dir}/intermediate/1_step_1/result.parquet'] + log: "{snake_dir}/diagnostics/1_step_1/step_1_python_pandas-output.log" + container: "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif" + resources: + slurm_partition='some-partition', + mem=42, + runtime=42, + nodes=42, + slurm_extra="--output '{snake_dir}/diagnostics/1_step_1/step_1_python_pandas-slurm-%j.log'" + + shell: + ''' + export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS={test_dir}/input_data1/file1.csv,{test_dir}/input_data2/file2.csv + export DUMMY_CONTAINER_OUTPUT_PATHS={snake_dir}/intermediate/1_step_1/result.parquet + export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY={snake_dir}/diagnostics/1_step_1 + python /dummy_step.py > {log} 2>&1 + ''' +rule: + name: "step_2_python_pandas_validator" + input: ['{snake_dir}/intermediate/1_step_1/result.parquet'] + output: touch("{snake_dir}/input_validations/step_2_python_pandas_validator") + localrule: True + message: "Validating step_2_python_pandas input" + run: + for f in input: + validation_utils.validate_input_file_dummy(f) +rule: + name: "step_2_python_pandas" + message: "Running Implementation step_2_python_pandas" + input: + implementation_inputs=['{snake_dir}/intermediate/1_step_1/result.parquet'], + validation="{snake_dir}/input_validations/step_2_python_pandas_validator" + output: ['{snake_dir}/result.parquet'] + log: "{snake_dir}/diagnostics/2_step_2/step_2_python_pandas-output.log" + container: "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif" + resources: + slurm_partition='some-partition', + mem=42, + runtime=42, + nodes=42, + slurm_extra="--output '{snake_dir}/diagnostics/2_step_2/step_2_python_pandas-slurm-%j.log'" + + shell: + ''' + export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS={snake_dir}/intermediate/1_step_1/result.parquet + export DUMMY_CONTAINER_OUTPUT_PATHS={snake_dir}/result.parquet + export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY={snake_dir}/diagnostics/2_step_2 + python /dummy_step.py > {log} 2>&1 + ''' \ No newline at end of file diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 30f011eb..b193406c 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -3,6 +3,10 @@ from tempfile import TemporaryDirectory from linker.pipeline import Pipeline +from linker.configuration import Config + +PIPELINE_STRINGS = { "pipeline_local": "rule_strings/pipeline_local.txt", + "pipeline_slurm": "rule_strings/pipeline_slurm.txt",} def test__get_implementations(default_config, mocker): @@ -53,12 +57,32 @@ def test_get_diagnostic_dir(default_config, mocker): ) -def test_build_snakefile(default_config, mocker, test_dir): +def test_build_snakefile_local(default_config, mocker, test_dir): mocker.patch("linker.implementation.Implementation.validate", return_value={}) pipeline = Pipeline(default_config) with TemporaryDirectory() as snake_dir: snakefile = pipeline.build_snakefile(Path(snake_dir)) - expected_file_path = Path(os.path.dirname(__file__)) / "rule_strings/pipeline.txt" + expected_file_path = Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_local"] + with open(expected_file_path) as expected_file: + expected = expected_file.read() + expected = expected.replace("{snake_dir}", snake_dir) + expected = expected.replace("{test_dir}", test_dir) + snake_str = snakefile.read_text() + snake_str_lines = snake_str.split("\n") + expected_lines = expected.split("\n") + assert len(snake_str_lines) == len(expected_lines) + for i, expected_line in enumerate(expected_lines): + assert snake_str_lines[i].strip() == expected_line.strip() + +def test_build_snakefile_slurm(default_config_params, mocker, test_dir): + slurm_config_params = default_config_params + slurm_config_params.update({"computing_environment": Path(f"{test_dir}/spark_environment.yaml")}) + slurm_config = Config(**slurm_config_params) + mocker.patch("linker.implementation.Implementation.validate", return_value={}) + pipeline = Pipeline(slurm_config) + with TemporaryDirectory() as snake_dir: + snakefile = pipeline.build_snakefile(Path(snake_dir)) + expected_file_path = Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_slurm"] with open(expected_file_path) as expected_file: expected = expected_file.read() expected = expected.replace("{snake_dir}", snake_dir) diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 42a2b056..c5fb906e 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -6,7 +6,8 @@ RULE_STRINGS = { "target_rule": "rule_strings/target_rule.txt", - "implemented_rule": "rule_strings/implemented_rule.txt", + "implemented_rule_local": "rule_strings/implemented_rule_local.txt", + "implemented_rule_slurm": "rule_strings/implemented_rule_slurm.txt", "validation_rule": "rule_strings/validation_rule.txt", } @@ -39,8 +40,30 @@ def test_target_rule_build_rule(): for i, expected_line in enumerate(expected_lines): assert rulestring_lines[i].strip() == expected_line.strip() - -def test_implemented_rule_build_rule(): +def test_implemented_rule_build_rule_local(): + rule = ImplementedRule( + name="foo", + execution_input=["foo", "bar"], + validation="bar", + output=["baz"], + resources=None, + envvars={"eggs": "coconut"}, + diagnostics_dir="spam", + image_path="Multipolarity.sif", + script_cmd="echo hello world", + ) + + file_path = Path(os.path.dirname(__file__)) / RULE_STRINGS["implemented_rule_local"] + with open(file_path) as expected_file: + expected = expected_file.read() + rulestring = rule._build_rule() + rulestring_lines = rulestring.split("\n") + expected_lines = expected.split("\n") + assert len(rulestring_lines) == len(expected_lines) + for i, expected_line in enumerate(expected_lines): + assert rulestring_lines[i].strip() == expected_line.strip() + +def test_implemented_rule_build_rule_slurm(): rule = ImplementedRule( name="foo", execution_input=["foo", "bar"], @@ -57,7 +80,7 @@ def test_implemented_rule_build_rule(): image_path="Multipolarity.sif", script_cmd="echo hello world", ) - file_path = Path(os.path.dirname(__file__)) / RULE_STRINGS["implemented_rule"] + file_path = Path(os.path.dirname(__file__)) / RULE_STRINGS["implemented_rule_slurm"] with open(file_path) as expected_file: expected = expected_file.read() rulestring = rule._build_rule() From 98b28011deeea3e6612a6d20c3c15bd0bc089e4e Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Tue, 19 Mar 2024 09:41:33 -0700 Subject: [PATCH 11/17] lint --- tests/unit/test_pipeline.py | 23 ++++++++++++++++------- tests/unit/test_rule.py | 6 ++++-- tests/unit/test_runner.py | 31 ++++++++++++++++--------------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index b193406c..3557b948 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -2,11 +2,13 @@ from pathlib import Path from tempfile import TemporaryDirectory -from linker.pipeline import Pipeline from linker.configuration import Config +from linker.pipeline import Pipeline -PIPELINE_STRINGS = { "pipeline_local": "rule_strings/pipeline_local.txt", - "pipeline_slurm": "rule_strings/pipeline_slurm.txt",} +PIPELINE_STRINGS = { + "pipeline_local": "rule_strings/pipeline_local.txt", + "pipeline_slurm": "rule_strings/pipeline_slurm.txt", +} def test__get_implementations(default_config, mocker): @@ -62,7 +64,9 @@ def test_build_snakefile_local(default_config, mocker, test_dir): pipeline = Pipeline(default_config) with TemporaryDirectory() as snake_dir: snakefile = pipeline.build_snakefile(Path(snake_dir)) - expected_file_path = Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_local"] + expected_file_path = ( + Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_local"] + ) with open(expected_file_path) as expected_file: expected = expected_file.read() expected = expected.replace("{snake_dir}", snake_dir) @@ -74,15 +78,20 @@ def test_build_snakefile_local(default_config, mocker, test_dir): for i, expected_line in enumerate(expected_lines): assert snake_str_lines[i].strip() == expected_line.strip() + def test_build_snakefile_slurm(default_config_params, mocker, test_dir): - slurm_config_params = default_config_params - slurm_config_params.update({"computing_environment": Path(f"{test_dir}/spark_environment.yaml")}) + slurm_config_params = default_config_params + slurm_config_params.update( + {"computing_environment": Path(f"{test_dir}/spark_environment.yaml")} + ) slurm_config = Config(**slurm_config_params) mocker.patch("linker.implementation.Implementation.validate", return_value={}) pipeline = Pipeline(slurm_config) with TemporaryDirectory() as snake_dir: snakefile = pipeline.build_snakefile(Path(snake_dir)) - expected_file_path = Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_slurm"] + expected_file_path = ( + Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_slurm"] + ) with open(expected_file_path) as expected_file: expected = expected_file.read() expected = expected.replace("{snake_dir}", snake_dir) diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index c5fb906e..761f1b77 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -40,6 +40,7 @@ def test_target_rule_build_rule(): for i, expected_line in enumerate(expected_lines): assert rulestring_lines[i].strip() == expected_line.strip() + def test_implemented_rule_build_rule_local(): rule = ImplementedRule( name="foo", @@ -52,7 +53,7 @@ def test_implemented_rule_build_rule_local(): image_path="Multipolarity.sif", script_cmd="echo hello world", ) - + file_path = Path(os.path.dirname(__file__)) / RULE_STRINGS["implemented_rule_local"] with open(file_path) as expected_file: expected = expected_file.read() @@ -62,7 +63,8 @@ def test_implemented_rule_build_rule_local(): assert len(rulestring_lines) == len(expected_lines) for i, expected_line in enumerate(expected_lines): assert rulestring_lines[i].strip() == expected_line.strip() - + + def test_implemented_rule_build_rule_slurm(): rule = ImplementedRule( name="foo", diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index f7615682..7b9f2837 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -1,9 +1,9 @@ +from pathlib import Path from tempfile import TemporaryDirectory +from linker.configuration import Config from linker.runner import get_environment_args, get_singularity_args from linker.utilities.paths import LINKER_TEMP -from linker.configuration import Config -from pathlib import Path def test_get_singularity_args(default_config, test_dir): @@ -20,19 +20,20 @@ def test_get_singularity_args(default_config, test_dir): def test_get_environment_args(default_config_params, test_dir): config = Config(**default_config_params) assert get_environment_args(config, test_dir) == [] - - slurm_config_params = default_config_params - slurm_config_params.update({"computing_environment": Path(f"{test_dir}/spark_environment.yaml")}) + + slurm_config_params = default_config_params + slurm_config_params.update( + {"computing_environment": Path(f"{test_dir}/spark_environment.yaml")} + ) slurm_config = Config(**slurm_config_params) resources = slurm_config.slurm_resources assert get_environment_args(slurm_config, test_dir) == [ - "--executor", - "slurm", - "--default-resources", - f"slurm_account={resources['account']}", - f"slurm_partition='{resources['partition']}'", - f"mem={resources['memory']}", - f"runtime={resources['time_limit']}", - f"nodes={resources['cpus']}", - ] - + "--executor", + "slurm", + "--default-resources", + f"slurm_account={resources['account']}", + f"slurm_partition='{resources['partition']}'", + f"mem={resources['memory']}", + f"runtime={resources['time_limit']}", + f"nodes={resources['cpus']}", + ] From 448421699606bbf0583b8221ac1b167cdabe31ad Mon Sep 17 00:00:00 2001 From: Steve Bachmeier Date: Tue, 12 Mar 2024 07:22:40 -0700 Subject: [PATCH 12/17] better check for if on slurm --- src/linker/runner.py | 9 +++++---- tests/unit/test_slurm_utils.py | 9 +++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/linker/runner.py b/src/linker/runner.py index f7921184..78dee4a3 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -10,6 +10,7 @@ from linker.pipeline import Pipeline from linker.utilities.data_utils import copy_configuration_files_to_results_directory from linker.utilities.paths import LINKER_TEMP +from linker.utilities.slurm_utils import is_on_slurm def main( @@ -67,11 +68,11 @@ def get_environment_args(config: Config, results_dir: Path) -> List[str]: # TODO [MIC-4822]: launch a local spark cluster instead of relying on implementation elif config.computing_environment == "slurm": # Set up a single drmaa.session that is persistent for the duration of the pipeline - # TODO [MIC-4468]: Check for slurm in a more meaningful way - hostname = socket.gethostname() - if "slurm" not in hostname: + if not is_on_slurm(): raise RuntimeError( - f"Specified a 'slurm' computing-environment but on host {hostname}" + f"A 'slurm' computing environment is specified but it has been " + "determined that the current host is not on a slurm cluster " + f"(host: {socket.gethostname()})." ) os.environ["DRMAA_LIBRARY_PATH"] = "/opt/slurm-drmaa/lib/libdrmaa.so" resources = config.slurm_resources diff --git a/tests/unit/test_slurm_utils.py b/tests/unit/test_slurm_utils.py index 001b5d7d..475414a4 100644 --- a/tests/unit/test_slurm_utils.py +++ b/tests/unit/test_slurm_utils.py @@ -26,6 +26,15 @@ IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" +def test_is_on_slurm(): + # export SLURM_ROOT + os.environ["SLURM_ROOT"] = "/some/path" + assert is_on_slurm() + # unset SLURM_ROOT + del os.environ["SLURM_ROOT"] + assert not is_on_slurm() + + @pytest.mark.skipif( IN_GITHUB_ACTIONS, reason="Github Actions does not have access to our file system and so no drmaa.", From 0965558b756b513249cfbed942c0acd1e3a21c60 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Tue, 19 Mar 2024 10:00:09 -0700 Subject: [PATCH 13/17] use steve's check on slurm --- src/linker/utilities/slurm_utils.py | 5 +++++ tests/unit/test_slurm_utils.py | 1 + 2 files changed, 6 insertions(+) diff --git a/src/linker/utilities/slurm_utils.py b/src/linker/utilities/slurm_utils.py index 7c1edf52..33566f9c 100644 --- a/src/linker/utilities/slurm_utils.py +++ b/src/linker/utilities/slurm_utils.py @@ -11,6 +11,11 @@ from linker.configuration import Config +def is_on_slurm() -> bool: + """Returns True if the current environment is a SLURM cluster.""" + return "SLURM_ROOT" in os.environ + + def get_slurm_drmaa() -> "drmaa": """Returns object() to bypass RuntimeError when not on a DRMAA-compliant system""" try: diff --git a/tests/unit/test_slurm_utils.py b/tests/unit/test_slurm_utils.py index 475414a4..0642d4bd 100644 --- a/tests/unit/test_slurm_utils.py +++ b/tests/unit/test_slurm_utils.py @@ -11,6 +11,7 @@ _generate_spark_cluster_job_template, get_cli_args, get_slurm_drmaa, + is_on_slurm, ) CLI_KWARGS = { From c5c955ad8fbf7f55424254671b8e5c265ccc2aad Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Tue, 19 Mar 2024 10:05:43 -0700 Subject: [PATCH 14/17] ingnore slurm check on GHA --- tests/unit/test_runner.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index 7b9f2837..5fad4b7d 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -1,10 +1,15 @@ +import os from pathlib import Path from tempfile import TemporaryDirectory +import pytest + from linker.configuration import Config from linker.runner import get_environment_args, get_singularity_args from linker.utilities.paths import LINKER_TEMP +IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" + def test_get_singularity_args(default_config, test_dir): with TemporaryDirectory() as results_dir: @@ -17,10 +22,16 @@ def test_get_singularity_args(default_config, test_dir): ) -def test_get_environment_args(default_config_params, test_dir): +def test_get_environment_args_local(default_config_params, test_dir): config = Config(**default_config_params) assert get_environment_args(config, test_dir) == [] + +@pytest.mark.skipif( + IN_GITHUB_ACTIONS, + reason="Github Actions does not have access to our file system and so no SLURM.", +) +def test_get_environment_args_slurm(default_config_params, test_dir): slurm_config_params = default_config_params slurm_config_params.update( {"computing_environment": Path(f"{test_dir}/spark_environment.yaml")} From 79eb332c836f70a6c2fdb8c403101c50cf6e6800 Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Wed, 20 Mar 2024 11:29:50 -0700 Subject: [PATCH 15/17] add executor dep --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index df140ae0..7cba218b 100644 --- a/setup.py +++ b/setup.py @@ -17,12 +17,12 @@ install_requirements = [ "click", "docker", - "drmaa", "loguru", "pandas", "pyyaml", "pyarrow", "snakemake>=8.0.0", + "snakemake-executor-plugin-slurm", ] setup_requires = ["setuptools_scm"] From d9442131469f561458098d733115d81deba1659c Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Wed, 20 Mar 2024 11:39:10 -0700 Subject: [PATCH 16/17] add step name to rules --- src/linker/pipeline.py | 3 ++- src/linker/rule.py | 16 ++++++++-------- src/linker/runner.py | 2 -- src/linker/utilities/paths.py | 4 ++-- .../unit/rule_strings/implemented_rule_local.txt | 2 +- .../unit/rule_strings/implemented_rule_slurm.txt | 2 +- tests/unit/rule_strings/pipeline_local.txt | 4 ++-- tests/unit/rule_strings/pipeline_slurm.txt | 4 ++-- tests/unit/test_rule.py | 6 ++++-- 9 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/linker/pipeline.py b/src/linker/pipeline.py index 85254625..8a815913 100644 --- a/src/linker/pipeline.py +++ b/src/linker/pipeline.py @@ -133,7 +133,8 @@ def write_implementation_rules( validator=implementation.step.input_validator, ) implementation_rule = ImplementedRule( - name=implementation.name, + step_name=implementation.step_name, + implementation_name=implementation.name, execution_input=input_files, validation=validation_file, output=output_files, diff --git a/src/linker/rule.py b/src/linker/rule.py index 043f8d80..c7f92f8d 100644 --- a/src/linker/rule.py +++ b/src/linker/rule.py @@ -49,7 +49,8 @@ class ImplementedRule(Rule): A rule that defines the execution of an implementation Parameters: - name: Name + step_name: Name of step + implementation_name: Name of implementation execution_input: List of file paths required by implementation validation: name of file created by InputValidationRule to check for compatible input output: List of file paths created by implementation @@ -60,7 +61,8 @@ class ImplementedRule(Rule): script_cmd: Command to execute """ - name: str + step_name: str + implementation_name: str execution_input: List[str] validation: str output: List[str] @@ -76,18 +78,16 @@ def _build_rule(self) -> str: def _build_io(self) -> str: return f""" rule: - name: "{self.name}" - message: "Running Implementation {self.name}" + name: "{self.implementation_name}" + message: "Running {self.step_name} implementation: {self.implementation_name}" input: implementation_inputs={self.execution_input}, validation="{self.validation}" output: {self.output} - log: "{self.diagnostics_dir}/{self.name}-output.log" + log: "{self.diagnostics_dir}/{self.implementation_name}-output.log" container: "{self.image_path}" """ def _build_resources(self) -> str: - # Include slurm partition for now, as snakemake has trouble - # parsing strings with periods like "all.q" if not self.resources: return "" return f""" @@ -96,7 +96,7 @@ def _build_resources(self) -> str: mem={self.resources['memory']}, runtime={self.resources['time_limit']}, nodes={self.resources['cpus']}, - slurm_extra="--output '{self.diagnostics_dir}/{self.name}-slurm-%j.log'" + slurm_extra="--output '{self.diagnostics_dir}/{self.implementation_name}-slurm-%j.log'" """ def _build_shell_command(self) -> str: diff --git a/src/linker/runner.py b/src/linker/runner.py index 78dee4a3..4ce9939f 100644 --- a/src/linker/runner.py +++ b/src/linker/runner.py @@ -67,14 +67,12 @@ def get_environment_args(config: Config, results_dir: Path) -> List[str]: # TODO [MIC-4822]: launch a local spark cluster instead of relying on implementation elif config.computing_environment == "slurm": - # Set up a single drmaa.session that is persistent for the duration of the pipeline if not is_on_slurm(): raise RuntimeError( f"A 'slurm' computing environment is specified but it has been " "determined that the current host is not on a slurm cluster " f"(host: {socket.gethostname()})." ) - os.environ["DRMAA_LIBRARY_PATH"] = "/opt/slurm-drmaa/lib/libdrmaa.so" resources = config.slurm_resources slurm_args = [ "--executor", diff --git a/src/linker/utilities/paths.py b/src/linker/utilities/paths.py index 3bcc3fe5..e82ef803 100644 --- a/src/linker/utilities/paths.py +++ b/src/linker/utilities/paths.py @@ -3,6 +3,6 @@ # TODO: We'll need to update this to be more generic for external users and have a way of configuring this CONTAINER_DIR = "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images" IMPLEMENTATION_METADATA = Path(__file__).parent.parent / "implementation_metadata.yaml" -# Bind linker temp dir to /tmp in the container. Slurm will delete /tmp after job completion, -# but we'll bind a subdirectory for local runs +# Bind linker temp dir to /tmp in the container. +# For now, put slurm in /tmp to avoid creating a subdir with a prolog script LINKER_TEMP = {"local": Path("/tmp/linker"), "slurm": Path("/tmp")} diff --git a/tests/unit/rule_strings/implemented_rule_local.txt b/tests/unit/rule_strings/implemented_rule_local.txt index cbace484..dfb7f754 100644 --- a/tests/unit/rule_strings/implemented_rule_local.txt +++ b/tests/unit/rule_strings/implemented_rule_local.txt @@ -1,7 +1,7 @@ rule: name: "foo" - message: "Running Implementation foo" + message: "Running foo_step implementation: foo" input: implementation_inputs=['foo', 'bar'], validation="bar" diff --git a/tests/unit/rule_strings/implemented_rule_slurm.txt b/tests/unit/rule_strings/implemented_rule_slurm.txt index 175027a7..88343d30 100644 --- a/tests/unit/rule_strings/implemented_rule_slurm.txt +++ b/tests/unit/rule_strings/implemented_rule_slurm.txt @@ -1,7 +1,7 @@ rule: name: "foo" - message: "Running Implementation foo" + message: "Running foo_step implementation: foo" input: implementation_inputs=['foo', 'bar'], validation="bar" diff --git a/tests/unit/rule_strings/pipeline_local.txt b/tests/unit/rule_strings/pipeline_local.txt index 1dec24ee..5e507a05 100644 --- a/tests/unit/rule_strings/pipeline_local.txt +++ b/tests/unit/rule_strings/pipeline_local.txt @@ -24,7 +24,7 @@ rule: validation_utils.validate_input_file_dummy(f) rule: name: "step_1_python_pandas" - message: "Running Implementation step_1_python_pandas" + message: "Running step_1 implementation: step_1_python_pandas" input: implementation_inputs=['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'], validation="{snake_dir}/input_validations/step_1_python_pandas_validator" @@ -49,7 +49,7 @@ rule: validation_utils.validate_input_file_dummy(f) rule: name: "step_2_python_pandas" - message: "Running Implementation step_2_python_pandas" + message: "Running step_2 implementation: step_2_python_pandas" input: implementation_inputs=['{snake_dir}/intermediate/1_step_1/result.parquet'], validation="{snake_dir}/input_validations/step_2_python_pandas_validator" diff --git a/tests/unit/rule_strings/pipeline_slurm.txt b/tests/unit/rule_strings/pipeline_slurm.txt index 5aa3f4a4..f31bae71 100644 --- a/tests/unit/rule_strings/pipeline_slurm.txt +++ b/tests/unit/rule_strings/pipeline_slurm.txt @@ -24,7 +24,7 @@ rule: validation_utils.validate_input_file_dummy(f) rule: name: "step_1_python_pandas" - message: "Running Implementation step_1_python_pandas" + message: "Running step_1 implementation: step_1_python_pandas" input: implementation_inputs=['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'], validation="{snake_dir}/input_validations/step_1_python_pandas_validator" @@ -56,7 +56,7 @@ rule: validation_utils.validate_input_file_dummy(f) rule: name: "step_2_python_pandas" - message: "Running Implementation step_2_python_pandas" + message: "Running step_2 implementation: step_2_python_pandas" input: implementation_inputs=['{snake_dir}/intermediate/1_step_1/result.parquet'], validation="{snake_dir}/input_validations/step_2_python_pandas_validator" diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 761f1b77..07c839d8 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -43,7 +43,8 @@ def test_target_rule_build_rule(): def test_implemented_rule_build_rule_local(): rule = ImplementedRule( - name="foo", + step_name="foo_step", + implementation_name="foo", execution_input=["foo", "bar"], validation="bar", output=["baz"], @@ -67,7 +68,8 @@ def test_implemented_rule_build_rule_local(): def test_implemented_rule_build_rule_slurm(): rule = ImplementedRule( - name="foo", + step_name="foo_step", + implementation_name="foo", execution_input=["foo", "bar"], validation="bar", output=["baz"], From ce581ba6d63228f946bda7d42467fb1518ed1a9a Mon Sep 17 00:00:00 2001 From: Patrick Nast Date: Wed, 20 Mar 2024 12:54:12 -0700 Subject: [PATCH 17/17] parameterize tests --- tests/unit/test_pipeline.py | 42 ++++++++++--------------------- tests/unit/test_rule.py | 49 ++++++++++++++----------------------- 2 files changed, 31 insertions(+), 60 deletions(-) diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 3557b948..4f4e0ac3 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -2,12 +2,14 @@ from pathlib import Path from tempfile import TemporaryDirectory +import pytest + from linker.configuration import Config from linker.pipeline import Pipeline PIPELINE_STRINGS = { - "pipeline_local": "rule_strings/pipeline_local.txt", - "pipeline_slurm": "rule_strings/pipeline_slurm.txt", + "local": "rule_strings/pipeline_local.txt", + "slurm": "rule_strings/pipeline_slurm.txt", } @@ -59,38 +61,20 @@ def test_get_diagnostic_dir(default_config, mocker): ) -def test_build_snakefile_local(default_config, mocker, test_dir): - mocker.patch("linker.implementation.Implementation.validate", return_value={}) - pipeline = Pipeline(default_config) - with TemporaryDirectory() as snake_dir: - snakefile = pipeline.build_snakefile(Path(snake_dir)) - expected_file_path = ( - Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_local"] +@pytest.mark.parametrize("computing_environment", ["local", "slurm"]) +def test_build_snakefile(default_config_params, mocker, test_dir, computing_environment): + config_params = default_config_params + if computing_environment == "slurm": + config_params.update( + {"computing_environment": Path(f"{test_dir}/spark_environment.yaml")} ) - with open(expected_file_path) as expected_file: - expected = expected_file.read() - expected = expected.replace("{snake_dir}", snake_dir) - expected = expected.replace("{test_dir}", test_dir) - snake_str = snakefile.read_text() - snake_str_lines = snake_str.split("\n") - expected_lines = expected.split("\n") - assert len(snake_str_lines) == len(expected_lines) - for i, expected_line in enumerate(expected_lines): - assert snake_str_lines[i].strip() == expected_line.strip() - - -def test_build_snakefile_slurm(default_config_params, mocker, test_dir): - slurm_config_params = default_config_params - slurm_config_params.update( - {"computing_environment": Path(f"{test_dir}/spark_environment.yaml")} - ) - slurm_config = Config(**slurm_config_params) + config = Config(**config_params) mocker.patch("linker.implementation.Implementation.validate", return_value={}) - pipeline = Pipeline(slurm_config) + pipeline = Pipeline(config) with TemporaryDirectory() as snake_dir: snakefile = pipeline.build_snakefile(Path(snake_dir)) expected_file_path = ( - Path(os.path.dirname(__file__)) / PIPELINE_STRINGS["pipeline_slurm"] + Path(os.path.dirname(__file__)) / PIPELINE_STRINGS[computing_environment] ) with open(expected_file_path) as expected_file: expected = expected_file.read() diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 07c839d8..554150a2 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -2,6 +2,8 @@ from pathlib import Path from tempfile import TemporaryDirectory +import pytest + from linker.rule import ImplementedRule, InputValidationRule, Rule, TargetRule RULE_STRINGS = { @@ -41,50 +43,35 @@ def test_target_rule_build_rule(): assert rulestring_lines[i].strip() == expected_line.strip() -def test_implemented_rule_build_rule_local(): +@pytest.mark.parametrize("computing_environment", ["local", "slurm"]) +def test_implemented_rule_build_rule(computing_environment): + if computing_environment == "slurm": + resources = { + "partition": "slurmpart", + "time_limit": 1, + "memory": 5, + "cpus": 1337, + } + else: + resources = None + rule = ImplementedRule( step_name="foo_step", implementation_name="foo", execution_input=["foo", "bar"], validation="bar", output=["baz"], - resources=None, + resources=resources, envvars={"eggs": "coconut"}, diagnostics_dir="spam", image_path="Multipolarity.sif", script_cmd="echo hello world", ) - file_path = Path(os.path.dirname(__file__)) / RULE_STRINGS["implemented_rule_local"] - with open(file_path) as expected_file: - expected = expected_file.read() - rulestring = rule._build_rule() - rulestring_lines = rulestring.split("\n") - expected_lines = expected.split("\n") - assert len(rulestring_lines) == len(expected_lines) - for i, expected_line in enumerate(expected_lines): - assert rulestring_lines[i].strip() == expected_line.strip() - - -def test_implemented_rule_build_rule_slurm(): - rule = ImplementedRule( - step_name="foo_step", - implementation_name="foo", - execution_input=["foo", "bar"], - validation="bar", - output=["baz"], - resources={ - "partition": "slurmpart", - "time_limit": 1, - "memory": 5, - "cpus": 1337, - }, - envvars={"eggs": "coconut"}, - diagnostics_dir="spam", - image_path="Multipolarity.sif", - script_cmd="echo hello world", + file_path = ( + Path(os.path.dirname(__file__)) + / RULE_STRINGS[f"implemented_rule_{computing_environment}"] ) - file_path = Path(os.path.dirname(__file__)) / RULE_STRINGS["implemented_rule_slurm"] with open(file_path) as expected_file: expected = expected_file.read() rulestring = rule._build_rule()