Skip to content

Commit

Permalink
Merge pull request #194 from cgat-developers/AC-document
Browse files Browse the repository at this point in the history
Ac document
  • Loading branch information
Acribbs authored Jan 1, 2025
2 parents 145bf4e + 70fea88 commit 55ad2cf
Show file tree
Hide file tree
Showing 13 changed files with 1,805 additions and 120 deletions.
50 changes: 47 additions & 3 deletions cgatcore/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,53 @@
=============================================
This module provides a comprehensive set of tools to facilitate the creation and management
of data processing pipelines using CGAT Ruffus. It includes functionalities for pipeline control,
logging, parameterization, task execution, database uploads, temporary file management, and
integration with AWS S3.
of data processing pipelines using CGAT Ruffus. It includes functionalities for:
1. Pipeline Control
- Task execution and dependency management
- Command-line interface for pipeline operations
- Logging and error handling
2. Resource Management
- Cluster job submission and monitoring
- Memory and CPU allocation
- Temporary file handling
3. Configuration
- Parameter management via YAML configuration
- Cluster settings customization
- Pipeline state persistence
4. Cloud Integration
- AWS S3 support for input/output files
- Cloud-aware pipeline decorators
- Remote file handling
Example Usage
------------
A basic pipeline using local files:
.. code-block:: python
from cgatcore import pipeline as P
# Standard pipeline task
@P.transform("input.txt", suffix(".txt"), ".processed")
def process_local_file(infile, outfile):
# Processing logic here
pass
Using S3 integration:
.. code-block:: python
# S3-aware pipeline task
@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
# Processing logic here
pass
For detailed documentation, see: https://cgat-core.readthedocs.io/
"""


Expand Down
64 changes: 51 additions & 13 deletions cgatcore/pipeline/cluster.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,54 @@
'''cluster.py - cluster utility functions for ruffus pipelines
==============================================================
This module abstracts the DRMAA native specification and provides
convenience functions for running Drmaa jobs.
Currently SGE, SLURM, Torque and PBSPro are supported.
Reference
---------
'''
"""
cluster.py - Cluster job management for CGAT pipelines
====================================================
This module provides functionality for submitting and managing jobs on various
cluster platforms (SLURM, SGE, PBS/Torque). It handles:
1. Job Submission
- Resource allocation (memory, CPU cores)
- Queue selection and prioritization
- Job dependencies and scheduling
2. Platform Support
- SLURM Workload Manager
- Sun Grid Engine (SGE)
- PBS/Torque
- Local execution (multiprocessing)
3. Resource Management
- Memory limits and monitoring
- CPU allocation
- Job runtime constraints
- Temporary directory handling
Configuration
------------
Cluster settings can be configured in `.cgat.yml`:
.. code-block:: yaml
cluster:
queue_manager: slurm
queue: main
memory_resource: mem
memory_default: 4G
parallel_environment: dedicated
Available Parameters
------------------
- cluster_queue: Cluster queue to use (default: all.q)
- cluster_priority: Job priority (-10 to 10, default: -10)
- cluster_num_jobs: Maximum concurrent jobs (default: 100)
- cluster_memory_resource: Memory resource identifier
- cluster_memory_default: Default job memory (default: 4G)
- cluster_memory_ulimit: Enable memory limits via ulimit
- cluster_parallel_environment: Parallel environment name
- cluster_queue_manager: Queue management system
- cluster_tmpdir: Temporary directory location
For detailed documentation, see: https://cgat-core.readthedocs.io/
"""

import re
import math
Expand Down Expand Up @@ -484,7 +523,6 @@ def get_native_specification(self,
spec.append("-q {}".format(kwargs["queue"]))

spec.append(kwargs.get("options", ""))

return spec

def update_template(self, jt):
Expand Down
63 changes: 55 additions & 8 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,62 @@
"""execution.py - Job control for ruffus pipelines
=========================================================
"""
execution.py - Task execution for CGAT pipelines
==============================================
This module handles the execution of pipeline tasks, providing support for:
1. Job Execution
- Local execution via subprocess
- Cluster job submission
- Python function execution
- Container-based execution
2. Resource Management
- Memory monitoring and limits
- CPU allocation
- Runtime constraints
- Working directory management
3. Error Handling
- Job failure detection
- Retry mechanisms
- Error logging and reporting
- Clean-up procedures
4. Execution Modes
- Synchronous (blocking) execution
- Asynchronous job submission
- Parallel task execution
- Dependency-aware scheduling
Session
-------
Usage Examples
-------------
1. Submit a command to the cluster:
This module manages a DRMAA session. :func:`start_session`
starts a session and :func:`close_session` closes it.
.. code-block:: python
statement = "samtools sort input.bam -o output.bam"
job_options = "-l mem_free=4G"
job_threads = 4
execution.run(statement,
job_options=job_options,
job_threads=job_threads)
2. Execute a Python function:
.. code-block:: python
def process_data(infile, outfile):
# Processing logic here
pass
Reference
---------
execution.submit(module="my_module",
function="process_data",
infiles="input.txt",
outfiles="output.txt",
job_memory="4G")
For detailed documentation, see: https://cgat-core.readthedocs.io/
"""

import collections
Expand Down
124 changes: 124 additions & 0 deletions docs/function_doc/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,129 @@
# CGATcore Pipeline Module

The `pipeline` module is the core component of CGAT-core, providing essential functionality for building and executing computational pipelines.

## Core Functions

### Pipeline Decorators

```python
@transform(input_files, suffix(".input"), ".output")
def task_function(infile, outfile):
"""Transform a single input file to an output file."""
pass

@merge(input_files, "output.txt")
def merge_task(infiles, outfile):
"""Merge multiple input files into a single output."""
pass

@split(input_file, "*.split")
def split_task(infile, outfiles):
"""Split a single input file into multiple outputs."""
pass

@follows(previous_task)
def dependent_task():
"""Execute after previous_task completes."""
pass
```

### S3-Aware Decorators

```python
@s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
"""Process files directly from S3."""
pass

@s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt")
def merge_s3_files(infiles, outfile):
"""Merge multiple S3 files."""
pass
```

## Configuration Functions

### Pipeline Setup
```python
# Initialize pipeline
pipeline.initialize(options)

# Get pipeline parameters
params = pipeline.get_params()

# Configure cluster execution
pipeline.setup_cluster()
```

### Resource Management
```python
# Set memory requirements
pipeline.set_job_memory("4G")

# Set CPU requirements
pipeline.set_job_threads(4)

# Configure temporary directory
pipeline.set_tmpdir("/path/to/tmp")
```

## Execution Functions

### Running Tasks
```python
# Execute a command
pipeline.run("samtools sort input.bam")

# Submit a Python function
pipeline.submit(
module="my_module",
function="process_data",
infiles="input.txt",
outfiles="output.txt"
)
```

### Job Control
```python
# Check job status
pipeline.is_running(job_id)

# Wait for job completion
pipeline.wait_for_jobs()

# Clean up temporary files
pipeline.cleanup()
```

## Error Handling

```python
try:
pipeline.run("risky_command")
except pipeline.PipelineError as e:
pipeline.handle_error(e)
```

## Best Practices

1. **Resource Management**
- Always specify memory and CPU requirements
- Use appropriate cluster queue settings
- Clean up temporary files

2. **Error Handling**
- Implement proper error checking
- Use pipeline.log for logging
- Handle temporary file cleanup

3. **Performance**
- Use appropriate chunk sizes for parallel processing
- Monitor resource usage
- Optimize cluster settings

For more details, see the [Pipeline Overview](../pipeline_modules/overview.md) and [Writing Workflows](../defining_workflow/writing_workflows.md) guides.

::: cgatcore.pipeline
:members:
:show-inheritance:
Loading

0 comments on commit 55ad2cf

Please sign in to comment.