Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snakemake SLURM Functionality #77

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
install_requirements = [
"click",
"docker",
"drmaa",
"loguru",
"pandas",
"pyyaml",
"pyarrow",
"snakemake>=8.0.0",
"snakemake-executor-plugin-slurm",
]

setup_requires = ["setuptools_scm"]
Expand Down
9 changes: 8 additions & 1 deletion src/linker/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,24 @@ def write_implementation_rules(
validation_file = str(
results_dir / "input_validations" / implementation.validation_filename
)
resources = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may want to specify this as "slurm_resources" (as opposed to spark resources or in the future who knows what else)

Unless your thought is the logic will go here eventually regardless of resource type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the more general "resources" make sense at least on the rule side, given that we may support non-slurm execution resources, and this would be the place that those would go (though not necessarily for something like spark,)
https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#snakefiles-standard-resources

self.config.slurm_resources
if self.config.computing_environment == "slurm"
else None
)
validation_rule = InputValidationRule(
name=implementation.name,
input=input_files,
output=validation_file,
validator=implementation.step.input_validator,
)
implementation_rule = ImplementedRule(
name=implementation.name,
step_name=implementation.step_name,
implementation_name=implementation.name,
execution_input=input_files,
validation=validation_file,
output=output_files,
resources=resources,
envvars=implementation.environment_variables,
diagnostics_dir=str(diagnostics_dir),
image_path=implementation.singularity_image_path,
Expand Down
40 changes: 30 additions & 10 deletions src/linker/rule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Callable, List
from typing import Callable, List, Optional


class Rule(ABC):
Expand Down Expand Up @@ -39,7 +39,8 @@ def _build_rule(self) -> str:
rule all:
input:
final_output={self.target_files},
validation='{self.validation}'"""
validation='{self.validation}'
message: 'Grabbing final output' """


@dataclass
Expand All @@ -48,37 +49,55 @@ class ImplementedRule(Rule):
A rule that defines the execution of an implementation

Parameters:
name: Name
step_name: Name of step
implementation_name: Name of implementation
execution_input: List of file paths required by implementation
validation: name of file created by InputValidationRule to check for compatible input
output: List of file paths created by implementation
resources: Computational resources used by executor (e.g. SLURM)
envvars: Dictionary of environment variables to set
diagnostics_dir: Directory for diagnostic files
image_path: Path to Singularity image
script_cmd: Command to execute
"""

name: str
step_name: str
implementation_name: str
execution_input: List[str]
validation: str
output: List[str]
resources: Optional[dict]
envvars: dict
diagnostics_dir: str
image_path: str
script_cmd: str

def _build_rule(self) -> str:
return (
f"""
return self._build_io() + self._build_resources() + self._build_shell_command()

def _build_io(self) -> str:
return f"""
rule:
name: "{self.name}"
name: "{self.implementation_name}"
message: "Running {self.step_name} implementation: {self.implementation_name}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message is snakemake's way of logging to stdout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it adds to the snakemake logging--example in the stdout posted above

input:
implementation_inputs={self.execution_input},
validation="{self.validation}"
output: {self.output}
log: "{self.diagnostics_dir}/{self.implementation_name}-output.log"
container: "{self.image_path}" """
+ self._build_shell_command()
)

def _build_resources(self) -> str:
if not self.resources:
return ""
return f"""
resources:
slurm_partition='{self.resources['partition']}',
mem={self.resources['memory']},
runtime={self.resources['time_limit']},
nodes={self.resources['cpus']},
slurm_extra="--output '{self.diagnostics_dir}/{self.implementation_name}-slurm-%j.log'"
"""

def _build_shell_command(self) -> str:
shell_cmd = f"""
Expand All @@ -90,8 +109,9 @@ def _build_shell_command(self) -> str:
for var_name, var_value in self.envvars.items():
shell_cmd += f"""
export {var_name}={var_value}"""
# Log stdout/stderr to diagnostics directory
shell_cmd += f"""
{self.script_cmd}
{self.script_cmd} > {{log}} 2>&1
patricktnast marked this conversation as resolved.
Show resolved Hide resolved
'''"""

return shell_cmd
Expand Down
71 changes: 28 additions & 43 deletions src/linker/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from linker.pipeline import Pipeline
from linker.utilities.data_utils import copy_configuration_files_to_results_directory
from linker.utilities.paths import LINKER_TEMP
from linker.utilities.slurm_utils import is_on_slurm


def main(
Expand All @@ -24,7 +25,7 @@ def main(
copy_configuration_files_to_results_directory(config, results_dir)
snakefile = pipeline.build_snakefile(results_dir)
environment_args = get_environment_args(config, results_dir)
singularity_args = get_singularity_args(config.input_data, results_dir)
singularity_args = get_singularity_args(config, results_dir)
# We need to set a dummy environment variable to avoid logging a wall of text.
# TODO [MIC-4920]: Remove when https://github.com/snakemake/snakemake-interface-executor-plugins/issues/55 merges
os.environ["foo"] = "bar"
Expand All @@ -38,6 +39,9 @@ def main(
## See above
"--envvars",
"foo",
## Suppress some of the snakemake output
"--quiet",
"progress",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this saying to suppross some output (--quiet) but to show progress...bars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"progress" is the argument to "--quiet", it's suppressing some logging about the progress of execution

"--use-singularity",
"--singularity-args",
singularity_args,
Expand All @@ -47,11 +51,12 @@ def main(
snake_main(argv)


def get_singularity_args(input_data: List[Path], results_dir: Path) -> str:
input_file_paths = ",".join(file.as_posix() for file in input_data)
def get_singularity_args(config: Config, results_dir: Path) -> str:
input_file_paths = ",".join(file.as_posix() for file in config.input_data)
singularity_args = "--no-home --containall"
LINKER_TEMP.mkdir(parents=True, exist_ok=True)
singularity_args += f" -B {LINKER_TEMP}:/tmp,{results_dir},{input_file_paths}"
linker_tmp_dir = LINKER_TEMP[config.computing_environment]
patricktnast marked this conversation as resolved.
Show resolved Hide resolved
linker_tmp_dir.mkdir(parents=True, exist_ok=True)
singularity_args += f" -B {linker_tmp_dir}:/tmp,{results_dir},{input_file_paths}"
return singularity_args


Expand All @@ -62,44 +67,24 @@ def get_environment_args(config: Config, results_dir: Path) -> List[str]:

# TODO [MIC-4822]: launch a local spark cluster instead of relying on implementation
elif config.computing_environment == "slurm":
# TODO: Add back slurm support
raise NotImplementedError(
"Slurm support is not yet implemented with snakemake integration"
)
# # Set up a single drmaa.session that is persistent for the duration of the pipeline
# # TODO [MIC-4468]: Check for slurm in a more meaningful way
# hostname = socket.gethostname()
# if "slurm" not in hostname:
# raise RuntimeError(
# f"Specified a 'slurm' computing-environment but on host {hostname}"
# )
# os.environ["DRMAA_LIBRARY_PATH"] = "/opt/slurm-drmaa/lib/libdrmaa.so"
# diagnostics = results_dir / "diagnostics/"
# job_name = "snakemake-linker"
# resources = config.slurm_resources
# drmaa_args = get_cli_args(
# job_name=job_name,
# account=resources["account"],
# partition=resources["partition"],
# peak_memory=resources["memory"],
# max_runtime=resources["time_limit"],
# num_threads=resources["cpus"],
# )
# drmaa_cli_arguments = [
# "--executor",
# "drmaa",
# "--drmaa-args",
# drmaa_args,
# "--drmaa-log-dir",
# str(diagnostics),
# ]
# # slurm_args = [
# # "--executor",
# # "slurm",
# # "--profile",
# # "/ihme/homes/pnast/repos/linker/.config/snakemake/slurm"
# # ]
# return drmaa_cli_arguments
if not is_on_slurm():
raise RuntimeError(
f"A 'slurm' computing environment is specified but it has been "
"determined that the current host is not on a slurm cluster "
f"(host: {socket.gethostname()})."
)
resources = config.slurm_resources
slurm_args = [
"--executor",
"slurm",
"--default-resources",
f"slurm_account={resources['account']}",
f"slurm_partition='{resources['partition']}'",
f"mem={resources['memory']}",
f"runtime={resources['time_limit']}",
f"nodes={resources['cpus']}",
]
return slurm_args
else:
raise NotImplementedError(
"only computing_environment 'local' and 'slurm' are supported; "
Expand Down
4 changes: 3 additions & 1 deletion src/linker/utilities/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
# TODO: We'll need to update this to be more generic for external users and have a way of configuring this
CONTAINER_DIR = "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images"
IMPLEMENTATION_METADATA = Path(__file__).parent.parent / "implementation_metadata.yaml"
LINKER_TEMP = Path("/tmp/linker")
# Bind linker temp dir to /tmp in the container.
# For now, put slurm in /tmp to avoid creating a subdir with a prolog script
LINKER_TEMP = {"local": Path("/tmp/linker"), "slurm": Path("/tmp")}
5 changes: 5 additions & 0 deletions src/linker/utilities/slurm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
from linker.configuration import Config


def is_on_slurm() -> bool:
"""Returns True if the current environment is a SLURM cluster."""
return "SLURM_ROOT" in os.environ
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, in my unmerged branch for Jenkins a better check is return shutil.which("sbatch") is not None



def get_slurm_drmaa() -> "drmaa":
"""Returns object() to bypass RuntimeError when not on a DRMAA-compliant system"""
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@

rule:
name: "foo"
message: "Running foo_step implementation: foo"
input:
implementation_inputs=['foo', 'bar'],
validation="bar"
output: ['baz']
log: "spam/foo-output.log"
container: "Multipolarity.sif"
shell:
'''
export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS=foo,bar
export DUMMY_CONTAINER_OUTPUT_PATHS=baz
export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY=spam
export eggs=coconut
echo hello world
echo hello world > {log} 2>&1
'''
25 changes: 25 additions & 0 deletions tests/unit/rule_strings/implemented_rule_slurm.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

rule:
name: "foo"
message: "Running foo_step implementation: foo"
input:
implementation_inputs=['foo', 'bar'],
validation="bar"
output: ['baz']
log: "spam/foo-output.log"
container: "Multipolarity.sif"
resources:
slurm_partition='slurmpart',
mem=5,
runtime=1,
nodes=1337,
slurm_extra="--output 'spam/foo-slurm-%j.log'"

shell:
'''
export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS=foo,bar
export DUMMY_CONTAINER_OUTPUT_PATHS=baz
export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY=spam
export eggs=coconut
echo hello world > {log} 2>&1
'''
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ rule all:
input:
final_output=['{snake_dir}/result.parquet'],
validation='{snake_dir}/input_validations/final_validator'
message: 'Grabbing final output'
rule:
name: "results_validator"
input: ['{snake_dir}/result.parquet']
Expand All @@ -23,17 +24,19 @@ rule:
validation_utils.validate_input_file_dummy(f)
rule:
name: "step_1_python_pandas"
message: "Running step_1 implementation: step_1_python_pandas"
input:
implementation_inputs=['{test_dir}/input_data1/file1.csv', '{test_dir}/input_data2/file2.csv'],
validation="{snake_dir}/input_validations/step_1_python_pandas_validator"
output: ['{snake_dir}/intermediate/1_step_1/result.parquet']
log: "{snake_dir}/diagnostics/1_step_1/step_1_python_pandas-output.log"
container: "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif"
shell:
'''
export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS={test_dir}/input_data1/file1.csv,{test_dir}/input_data2/file2.csv
export DUMMY_CONTAINER_OUTPUT_PATHS={snake_dir}/intermediate/1_step_1/result.parquet
export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY={snake_dir}/diagnostics/1_step_1
python /dummy_step.py
python /dummy_step.py > {log} 2>&1
'''
rule:
name: "step_2_python_pandas_validator"
Expand All @@ -46,15 +49,17 @@ rule:
validation_utils.validate_input_file_dummy(f)
rule:
name: "step_2_python_pandas"
message: "Running step_2 implementation: step_2_python_pandas"
input:
implementation_inputs=['{snake_dir}/intermediate/1_step_1/result.parquet'],
validation="{snake_dir}/input_validations/step_2_python_pandas_validator"
output: ['{snake_dir}/result.parquet']
log: "{snake_dir}/diagnostics/2_step_2/step_2_python_pandas-output.log"
container: "/mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif"
shell:
'''
export DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS={snake_dir}/intermediate/1_step_1/result.parquet
export DUMMY_CONTAINER_OUTPUT_PATHS={snake_dir}/result.parquet
export DUMMY_CONTAINER_DIAGNOSTICS_DIRECTORY={snake_dir}/diagnostics/2_step_2
python /dummy_step.py
python /dummy_step.py > {log} 2>&1
'''
Loading
Loading