Skip to content

Commit

Permalink
Added assay type filter for germline workflow launch
Browse files Browse the repository at this point in the history
* Added ICA pipeline orchestrator assay type filter using demux metadata
* Updated related test cases
  • Loading branch information
victorskl committed Feb 15, 2021
1 parent 24a82db commit 15e08c9
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 16 deletions.
5 changes: 4 additions & 1 deletion data_processors/pipeline/lambdas/demux_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

SAMPLE_ID_HEADER = 'Sample_ID (SampleSheet)'
OVERRIDECYCLES_HEADER = 'OverrideCycles'
TYPE_HEADER = 'Type'

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -123,7 +124,7 @@ def extract_requested_rows(df, requested_ids: list):
# filter rows by requested ids
subset_rows = df[df[SAMPLE_ID_HEADER].isin(requested_ids)]
# filter colums by data needed for workflow
subset_cols = subset_rows[[SAMPLE_ID_HEADER, OVERRIDECYCLES_HEADER]]
subset_cols = subset_rows[[SAMPLE_ID_HEADER, OVERRIDECYCLES_HEADER, TYPE_HEADER]]
return subset_cols


Expand Down Expand Up @@ -185,8 +186,10 @@ def handler(event, context):
# turn metadata_df into format compatible with workflow input
sample_array = requested_metadata_df[SAMPLE_ID_HEADER].values.tolist()
orc_array = requested_metadata_df[OVERRIDECYCLES_HEADER].values.tolist()
type_array = requested_metadata_df[TYPE_HEADER].values.tolist()

return {
'samples': sample_array,
'override_cycles': orc_array,
'types': type_array,
}
27 changes: 25 additions & 2 deletions data_processors/pipeline/lambdas/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from data_portal.models import Workflow, SequenceRun, Batch, BatchRun
from data_processors.pipeline import services, constant
from data_processors.pipeline.constant import WorkflowType, WorkflowStatus
from data_processors.pipeline.lambdas import workflow_update, fastq
from data_processors.pipeline.lambdas import workflow_update, fastq, demux_metadata
from utils import libjson, libsqs, libssm

logger = logging.getLogger()
Expand Down Expand Up @@ -162,24 +162,47 @@ def prepare_germline_jobs(this_batch: Batch, this_batch_run: BatchRun, this_sqr:
"""
job_list = []
fastq_containers: List[dict] = libjson.loads(this_batch.context_data)

gds_volume_name = this_sqr.gds_volume_name
gds_folder_path = this_sqr.gds_folder_path
sample_sheet_name = this_sqr.sample_sheet_name

metadata: dict = demux_metadata.handler({
'gdsVolume': gds_volume_name,
'gdsBasePath': gds_folder_path,
'gdsSamplesheet': sample_sheet_name,
}, None)

for fastq_container in fastq_containers:
fastq_map = fastq_container['fastq_map']
for sample_name, bag in fastq_map.items():
fastq_list = bag['fastq_list'] # GERMLINE CWL workflow does not use this absolute gds path list, at the mo
sample_name_str: str = sample_name

# skip Undetermined samples
if sample_name_str == "Undetermined":
logger.warning(f"SKIP '{sample_name}' SAMPLE GERMLINE WORKFLOW LAUNCH.")
continue

# skip sample start with NTC_
if sample_name_str.startswith("NTC_"):
logger.warning(f"SKIP NTC SAMPLE '{sample_name}' GERMLINE WORKFLOW LAUNCH.")
continue

fastq_list = bag['fastq_list'] # GERMLINE CWL workflow does not use this absolute gds path list, at the mo
# skip germline if assay type is not WGS
assay_type = metadata['types'][metadata['samples'].index(sample_name_str)]
if assay_type != "WGS":
logger.warning(f"SKIP {assay_type} SAMPLE '{sample_name}' GERMLINE WORKFLOW LAUNCH.")
continue

# skip GERMLINE CWL workflow does not take multiple fastq_directories
fastq_directories = bag['fastq_directories']
if len(fastq_directories) != 1:
logger.warning(f"SKIP SAMPLE '{sample_name}' GERMLINE WORKFLOW LAUNCH. "
f"GERMLINE CWL WORKFLOW EXPECT ONE FASTQ DIRECTORY. FOUND: {fastq_directories}")
continue

# skip GERMLINE CWL workflow does not take multiple fastq_list_csv inputs
fastq_list_csv = bag['fastq_list_csv']
if len(fastq_list_csv) != 1:
logger.warning(f"SKIP SAMPLE '{sample_name}' GERMLINE WORKFLOW LAUNCH. "
Expand Down
20 changes: 19 additions & 1 deletion data_processors/pipeline/tests/test_demux_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def setUp(self) -> None:
],
demux_metadata.OVERRIDECYCLES_HEADER: [
"Y100;I8N2;I8N2;Y100",
]
],
demux_metadata.TYPE_HEADER: [
"WGS",
],
}
self.mock_metadata_df = pd.DataFrame(data=d)

Expand Down Expand Up @@ -142,3 +145,18 @@ def test_download_metadata(self):
print(my_df)
self.assertIsNotNone(my_df)
self.assertTrue(not my_df.empty)

@skip
def test_handler(self):
"""
aws sso login --profile dev && export AWS_PROFILE=dev
python manage.py test data_processors.pipeline.tests.test_demux_metadata.DemuxMetaDataIntegrationTests.test_handler
"""
result: pd.DataFrame = demux_metadata.handler({
'gdsVolume': "umccr-raw-sequence-data-dev",
'gdsBasePath': "/200612_A01052_0017_BH5LYWDSXY",
'gdsSamplesheet': "SampleSheet.csv",
}, None)

print(json.dumps(result))
self.assertIsNotNone(result)
135 changes: 133 additions & 2 deletions data_processors/pipeline/tests/test_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
from datetime import datetime
from unittest import skip

from django.utils.timezone import make_aware
from libiap.openapi import libwes, libgds
from mockito import when

from data_portal.models import Workflow, BatchRun, Batch
from data_portal.models import Workflow, BatchRun, Batch, SequenceRun
from data_portal.tests.factories import WorkflowFactory, TestConstant
from data_processors.pipeline.constant import WorkflowType, WorkflowStatus
from data_processors.pipeline.lambdas import orchestrator
Expand Down Expand Up @@ -100,6 +101,45 @@ def test_germline(self):
)
when(libgds.FilesApi).list_files(...).thenReturn(mock_file_list)

when(orchestrator.demux_metadata).handler(...).thenReturn({
'samples': [
"NA12345 - 4KC",
"NA12345 - 4KC",
"PRJ111119_L1900000",
"PRJ111119_L1900000",
"MDX199999_L1999999_topup",
"MDX199999_L1999999_topup",
"L9111111_topup",
"L9111111_topup",
"NTC_L111111",
"NTC_L111111",
],
'override_cycles': [
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
],
'types': [
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
],
})

result = orchestrator.handler({
'wfr_id': TestConstant.wfr_id.value,
'wfv_id': TestConstant.wfv_id.value,
Expand Down Expand Up @@ -185,4 +225,95 @@ def test_bcl_unknown_type(self):


class OrchestratorIntegrationTests(PipelineIntegrationTestCase):
pass

@skip
def test_prepare_germline_jobs(self):
"""
1. uncomment @skip
2. setup target_xx variables noted below
3. hit the test like so:
python manage.py test data_processors.pipeline.tests.test_orchestrator.OrchestratorIntegrationTests.test_prepare_germline_jobs
"""

# NOTE: need to set the target bssh run gds output
target_gds_folder_path = "/Runs/111111_A22222_0011_AGCTG2AGCC_r.ACGTlKjDgEy099ioQOeOWg"
target_gds_volume_name = "bssh.agctbfda498038ed99eeeeee79999999"
target_sample_sheet_name = "SampleSheet.csv"

# NOTE: typically dict within json.dumps is the output of fastq lambda. See wiki for how to invoke fastq lambda
# https://github.com/umccr/wiki/blob/master/computing/cloud/illumina/automation.md
target_batch_context_data = json.dumps([
{
"locations": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC"
],
"fastq_map": {
"PRJ111111_L0000000_rerun": {
"fastq_list": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Project1/PRJ111111_L0000000_rerun_S4_L001_R1_001.fastq.gz",
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Project1/PRJ111111_L0000000_rerun_S4_L001_R2_001.fastq.gz",
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Project1/PRJ111111_L0000000_rerun_S4_L002_R1_001.fastq.gz",
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Project1/PRJ111111_L0000000_rerun_S4_L002_R2_001.fastq.gz"
],
"fastq_directories": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Project1"
],
"fastq_list_csv": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Reports/fastq_list.csv"
],
"tags": []
},
"NTC_NebRNA111111KC_L0000000_rerun": {
"fastq_list": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/UMCCR/NTC_NebRNA111111KC_L0000000_rerun_S5_L001_R1_001.fastq.gz",
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/UMCCR/NTC_NebRNA111111KC_L0000000_rerun_S5_L001_R2_001.fastq.gz",
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/UMCCR/NTC_NebRNA111111KC_L0000000_rerun_S5_L002_R1_001.fastq.gz",
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/UMCCR/NTC_NebRNA111111KC_L0000000_rerun_S5_L002_R2_001.fastq.gz"
],
"fastq_directories": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/UMCCR"
],
"fastq_list_csv": [
"gds://umccr-fastq-data-dev/111111_A22222_0011_AGCTG2AGCC/Y111_I1_I1_Y111/Reports/fastq_list.csv"
],
"tags": []
},
}
}
])

mock_batch = Batch(name="Test", created_by="Test", context_data=target_batch_context_data)
mock_batch.save()

mock_batch_run = BatchRun(batch=mock_batch)
mock_batch_run.save()

mock_sqr_run = SequenceRun(
run_id="r.ACGTlKjDgEy099ioQOeOWg",
date_modified=make_aware(datetime.utcnow()),
status="PendingAnalysis",
instrument_run_id=TestConstant.sqr_name.value,
gds_folder_path=target_gds_folder_path,
gds_volume_name=target_gds_volume_name,
reagent_barcode="NV9999999-RGSBS",
v1pre3_id="666666",
acl=["wid:acgtacgt-9999-38ed-99fa-94fe79523959"],
flowcell_barcode="BARCODEEE",
sample_sheet_name=target_sample_sheet_name,
api_url=f"https://ilmn/v2/runs/r.ACGTlKjDgEy099ioQOeOWg",
name=TestConstant.sqr_name.value,
msg_attr_action="statuschanged",
msg_attr_action_type="bssh.runs",
msg_attr_action_date="2020-05-09T22:17:10.815Z",
msg_attr_produced_by="BaseSpaceSequenceHub"
)
mock_sqr_run.save()

job_list = orchestrator.prepare_germline_jobs(
this_batch=mock_batch,
this_batch_run=mock_batch_run,
this_sqr=mock_sqr_run,
)

print(json.dumps(job_list))
self.assertIsNotNone(job_list)
48 changes: 38 additions & 10 deletions data_processors/pipeline/tests/test_sqs_iap_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,36 @@ def setUp(self) -> None:
super(SQSIAPEventUnitTests, self).setUp()
when(demux_metadata).handler(...).thenReturn(
{
'samples': ['PTC_EXPn200908LL_L2000001'],
'override_cycles': ['Y100;I8N2;I8N2;Y100'],
'samples': [
"NA12345 - 4KC",
"NA12345 - 4KC",
"PRJ111119_L1900000",
"PRJ111119_L1900000",
"MDX199999_L1999999_topup",
"MDX199999_L1999999_topup",
"L9111111_topup",
"L9111111_topup",
],
'override_cycles': [
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
"Y100;I8N2;I8N2;Y100",
],
'types': [
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
"WGS",
],
}
)

Expand Down Expand Up @@ -549,10 +577,10 @@ def test_wes_runs_event_germline_alt(self):

mock_file_list: libgds.FileListResponse = libgds.FileListResponse()
mock_file_list.items = [
libgds.FileResponse(name="NA12345_S7_L001_R1_001.fastq.gz"),
libgds.FileResponse(name="NA12345_S7_L002_R1_001.fastq.gz"),
libgds.FileResponse(name="NA12345_S7_L001_R2_001.fastq.gz"),
libgds.FileResponse(name="NA12345_S7_L002_R2_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L001_R1_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L002_R1_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L001_R2_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L002_R2_001.fastq.gz"),
]
when(libgds.FilesApi).list_files(...).thenReturn(mock_file_list)

Expand Down Expand Up @@ -629,10 +657,10 @@ def test_wes_runs_event_run_succeeded(self):
# make Germline workflow launch skip
mock_file_list: libgds.FileListResponse = libgds.FileListResponse()
mock_file_list.items = [
libgds.FileResponse(name="NA12345_S7_L001_R1_001.fastq.gz"),
libgds.FileResponse(name="NA12345_S7_L002_R1_001.fastq.gz"),
libgds.FileResponse(name="NA12345_S7_L001_R2_001.fastq.gz"),
libgds.FileResponse(name="NA12345_S7_L002_R2_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L001_R1_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L002_R1_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L001_R2_001.fastq.gz"),
libgds.FileResponse(name="PRJ111119_L1900000_S7_L002_R2_001.fastq.gz"),
]
when(libgds.FilesApi).list_files(...).thenReturn(mock_file_list)

Expand Down

0 comments on commit 15e08c9

Please sign in to comment.