Skip to content

Commit

Permalink
Add tests and fix arguments with multiple values
Browse files Browse the repository at this point in the history
  • Loading branch information
DriesSchaumont committed Nov 18, 2024
1 parent 6b3c5b4 commit 60ca517
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ def _debug(workflowArgs, debugKey) {
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
def key_ = workflowArgs["key"]

def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}

workflow workflowInstance {
take: input_

Expand Down Expand Up @@ -195,7 +196,7 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
// | view{"chInitialOutput: ${it.take(3)}"}

// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chModifiedFiltered, key_)
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
// input tuple format: [join_id, id, output, prev_state, ...]
// output tuple format: [join_id, id, new_state, ...]
| map{ tup ->
Expand All @@ -222,13 +223,26 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
| groupTuple(by: 1, sort: 'hash', size: chInitialOutputList.size(), remainder: true)
| map {join_ids, id, states ->
def newJoinId = join_ids.unique{a, b -> a <=> b}
assert newJoinId.size() == 1: "Multiple join IDs were emitted for '$id'."
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
def newJoinIdUnique = newJoinId[0]
def newState = states.inject([:]){ old_state, state_to_add ->
def overlap = old_state.keySet().intersect(state_to_add.keySet())
assert overlap.isEmpty() : "ID $id: multiple entries for for argument(s) $overlap were emitted."
def return_state = old_state + state_to_add
return return_state
def stateToAddNoMultiple = state_to_add.findAll{k, v -> !multipleArgs.contains(k)}
// First add non multiple arguments

def overlap = old_state.keySet().intersect(stateToAddNoMultiple.keySet())
assert overlap.isEmpty() : "ID $id: multiple entries for " +
" argument(s) $overlap were emitted."
def return_state = old_state + stateToAddNoMultiple

// Add `multiple: true` arguments
def stateToAddMultiple = state_to_add.findAll{k, v -> multipleArgs.contains(k)}
stateToAddMultiple.each {k, v ->
def currentKey = return_state.getOrDefault(k, [])
def currentKeyList = currentKey instanceof List ? currentKey : [currentKey]
currentKeyList.add(v)
return_state[k] = currentKeyList
}
return return_state
}

// simplify output if need be
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: multiple_emit_channels
namespace: test_wfs
argument_groups:
- name: Outputs
arguments:
- name: "--input"
type: file
description: Input file
required: true
example: input.txt
- name: "--step_1_output"
required: true
type: file
direction: output
- name: "--step_2_output"
required: true
type: file
direction: output
resources:
- type: nextflow_script
path: main.nf
entrypoint: base
dependencies:
- name: step1
- name: step3
platforms:
- type: nextflow
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
workflow base {
take: input_ch
main:

step_1_ch = input_ch
// test fromstate and tostate with list[string]
| step1.run(
fromState: ["input"],
toState: { id, output, state -> ["step_1_output": output.output] }
)

step_2_ch = input_ch
// test fromstate and tostate with map[string, string]
| step3.run(
fromState: ["input"],
toState: { id, output, state -> ["step_3_output": output.output] }
)

emit:
step_1_ch
step_3_ch
}

0 comments on commit 60ca517

Please sign in to comment.