Skip to content

Commit

Permalink
added singularity to run function
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Dec 1, 2024
1 parent c0775d2 commit 403f2c2
Showing 1 changed file with 49 additions and 37 deletions.
86 changes: 49 additions & 37 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,76 +788,88 @@ def cleanup_failed_job(self, job_info):
else:
self.logger.info(f"Output file not found (already removed or not created): {outfile}")

def run(self, statement_list, job_memory=None, job_threads=None, image=None, volumes=None, env_vars=None, **kwargs):
def run(self, statement_list, job_memory=None, job_threads=None, container_runtime=None, image=None, volumes=None, env_vars=None, **kwargs):
"""
Execute a list of statements with optional container support.
Execute a list of statements with optional container support for Docker or Singularity.
Args:
statement_list (list): List of commands to execute.
job_memory (str): Memory requirements (e.g., "4G").
job_threads (int): Number of threads to use.
image (str): Container image to use (e.g., "ubuntu:20.04").
volumes (list): List of volume mappings (e.g., ["/data:/data"]).
container_runtime (str): Container runtime to use ("docker" or "singularity").
image (str): Container image to use (e.g., "ubuntu:20.04" or a Singularity SIF path).
volumes (list): List of volume mappings (e.g., ['/data:/data'] for Docker or ['/data:/data'] for Singularity).
env_vars (dict): Environment variables for the container.
**kwargs: Additional arguments passed to the executor.
Returns:
list: Benchmark data collected from executed jobs.
"""
# Validation checks
if image:
# If image is specified, validate related arguments
if not isinstance(image, str) or not image.strip():
raise ValueError("A valid container image must be specified when 'image' is provided.")

if volumes and not isinstance(volumes, list):
raise ValueError("Volumes must be a list of volume mappings (e.g., ['/data:/data']).")

if env_vars and not isinstance(env_vars, dict):
raise ValueError("Environment variables must be provided as a dictionary (e.g., {'VAR': 'value'}).")
if container_runtime:
if container_runtime not in ["docker", "singularity"]:
raise ValueError("Container runtime must be 'docker' or 'singularity'.")
if not image:
raise ValueError("An image must be specified when using a container runtime.")

else:
# If no image is specified, disallow container-specific arguments
if volumes:
raise ValueError("Volume mappings cannot be provided without specifying a container image.")
if env_vars:
raise ValueError("Environment variables cannot be provided without specifying a container image.")
# If no container runtime is specified, disallow container-specific arguments
if volumes or env_vars or image:
raise ValueError("Container-specific arguments (volumes, env_vars, or image) require a container runtime.")

benchmark_data = []
for statement in statement_list:
job_info = {"statement": statement}
self.start_job(job_info) # Track the job lifecycle

try:
# Prepare for containerised execution if an image is specified
if image:
# Prepare for containerised execution if a container runtime is specified
if container_runtime:
volume_args = []
if volumes:
for volume in volumes:
volume_args.extend(["-v", volume])
if container_runtime == "docker":
for volume in volumes:
volume_args.extend(["-v", volume])
elif container_runtime == "singularity":
for volume in volumes:
volume_args.extend(["--bind", volume])

env_args = []
if env_vars:
for key, value in env_vars.items():
env_args.extend(["-e", f"{key}={value}"])
if container_runtime == "docker":
env_args.extend(["-e", f"{key}={value}"])
elif container_runtime == "singularity":
env_args.extend(["--env", f"{key}={value}"])

# Add standard environment variables
if job_memory:
env_args.extend(["-e", f"JOB_MEMORY={job_memory}"])
env_args.append(f"--env JOB_MEMORY={job_memory}" if container_runtime == "singularity" else "-e JOB_MEMORY={job_memory}")
if job_threads:
env_args.extend(["-e", f"JOB_THREADS={job_threads}"])

# Construct the Docker run command
docker_command = [
"docker", "run", "--rm",
*volume_args,
*env_args,
image,
"/bin/bash", "-c",
f"'{statement}'" # Wrap statement in quotes for proper execution
]
env_args.append(f"--env JOB_THREADS={job_threads}" if container_runtime == "singularity" else "-e JOB_THREADS={job_threads}")

# Construct the container command
if container_runtime == "docker":
container_command = [
"docker", "run", "--rm",
*volume_args,
*env_args,
image,
"/bin/bash", "-c",
f"'{statement}'" # Wrap statement in quotes for proper execution
]
elif container_runtime == "singularity":
container_command = [
"singularity", "exec",
*volume_args,
*env_args,
image,
"/bin/bash", "-c",
f"'{statement}'" # Wrap statement in quotes for proper execution
]

# Replace the original statement with the containerised version
statement = " ".join(docker_command)
statement = " ".join(container_command)

# Execute the statement (containerised or not)
full_statement, job_path = self.build_job_script(statement)
Expand Down

0 comments on commit 403f2c2

Please sign in to comment.