diff --git a/cgatcore/pipeline/execution.py b/cgatcore/pipeline/execution.py index 9e0e393e..7679f252 100644 --- a/cgatcore/pipeline/execution.py +++ b/cgatcore/pipeline/execution.py @@ -788,16 +788,17 @@ def cleanup_failed_job(self, job_info): else: self.logger.info(f"Output file not found (already removed or not created): {outfile}") - def run(self, statement_list, job_memory=None, job_threads=None, image=None, volumes=None, env_vars=None, **kwargs): + def run(self, statement_list, job_memory=None, job_threads=None, container_runtime=None, image=None, volumes=None, env_vars=None, **kwargs): """ - Execute a list of statements with optional container support. + Execute a list of statements with optional container support for Docker or Singularity. Args: statement_list (list): List of commands to execute. job_memory (str): Memory requirements (e.g., "4G"). job_threads (int): Number of threads to use. - image (str): Container image to use (e.g., "ubuntu:20.04"). - volumes (list): List of volume mappings (e.g., ["/data:/data"]). + container_runtime (str): Container runtime to use ("docker" or "singularity"). + image (str): Container image to use (e.g., "ubuntu:20.04" or a Singularity SIF path). + volumes (list): List of volume mappings (e.g., ['/data:/data'] for Docker or ['/data:/data'] for Singularity). env_vars (dict): Environment variables for the container. **kwargs: Additional arguments passed to the executor. @@ -805,23 +806,16 @@ def run(self, statement_list, job_memory=None, job_threads=None, image=None, vol list: Benchmark data collected from executed jobs. """ # Validation checks - if image: - # If image is specified, validate related arguments - if not isinstance(image, str) or not image.strip(): - raise ValueError("A valid container image must be specified when 'image' is provided.") - - if volumes and not isinstance(volumes, list): - raise ValueError("Volumes must be a list of volume mappings (e.g., ['/data:/data']).") - - if env_vars and not isinstance(env_vars, dict): - raise ValueError("Environment variables must be provided as a dictionary (e.g., {'VAR': 'value'}).") + if container_runtime: + if container_runtime not in ["docker", "singularity"]: + raise ValueError("Container runtime must be 'docker' or 'singularity'.") + if not image: + raise ValueError("An image must be specified when using a container runtime.") else: - # If no image is specified, disallow container-specific arguments - if volumes: - raise ValueError("Volume mappings cannot be provided without specifying a container image.") - if env_vars: - raise ValueError("Environment variables cannot be provided without specifying a container image.") + # If no container runtime is specified, disallow container-specific arguments + if volumes or env_vars or image: + raise ValueError("Container-specific arguments (volumes, env_vars, or image) require a container runtime.") benchmark_data = [] for statement in statement_list: @@ -829,35 +823,53 @@ def run(self, statement_list, job_memory=None, job_threads=None, image=None, vol self.start_job(job_info) # Track the job lifecycle try: - # Prepare for containerised execution if an image is specified - if image: + # Prepare for containerised execution if a container runtime is specified + if container_runtime: volume_args = [] if volumes: - for volume in volumes: - volume_args.extend(["-v", volume]) + if container_runtime == "docker": + for volume in volumes: + volume_args.extend(["-v", volume]) + elif container_runtime == "singularity": + for volume in volumes: + volume_args.extend(["--bind", volume]) env_args = [] if env_vars: for key, value in env_vars.items(): - env_args.extend(["-e", f"{key}={value}"]) + if container_runtime == "docker": + env_args.extend(["-e", f"{key}={value}"]) + elif container_runtime == "singularity": + env_args.extend(["--env", f"{key}={value}"]) # Add standard environment variables if job_memory: - env_args.extend(["-e", f"JOB_MEMORY={job_memory}"]) + env_args.append(f"--env JOB_MEMORY={job_memory}" if container_runtime == "singularity" else "-e JOB_MEMORY={job_memory}") if job_threads: - env_args.extend(["-e", f"JOB_THREADS={job_threads}"]) - - # Construct the Docker run command - docker_command = [ - "docker", "run", "--rm", - *volume_args, - *env_args, - image, - "/bin/bash", "-c", - f"'{statement}'" # Wrap statement in quotes for proper execution - ] + env_args.append(f"--env JOB_THREADS={job_threads}" if container_runtime == "singularity" else "-e JOB_THREADS={job_threads}") + + # Construct the container command + if container_runtime == "docker": + container_command = [ + "docker", "run", "--rm", + *volume_args, + *env_args, + image, + "/bin/bash", "-c", + f"'{statement}'" # Wrap statement in quotes for proper execution + ] + elif container_runtime == "singularity": + container_command = [ + "singularity", "exec", + *volume_args, + *env_args, + image, + "/bin/bash", "-c", + f"'{statement}'" # Wrap statement in quotes for proper execution + ] + # Replace the original statement with the containerised version - statement = " ".join(docker_command) + statement = " ".join(container_command) # Execute the statement (containerised or not) full_statement, job_path = self.build_job_script(statement)