diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf index f4a9b4b69..7d4e03ede 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf @@ -10,7 +10,8 @@ def _debug(workflowArgs, debugKey) { def workflowFactory(Map args, Map defaultWfArgs, Map meta) { def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta) def key_ = workflowArgs["key"] - + def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName} + workflow workflowInstance { take: input_ @@ -195,7 +196,7 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { // | view{"chInitialOutput: ${it.take(3)}"} // join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...] - def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chModifiedFiltered, key_) + def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_) // input tuple format: [join_id, id, output, prev_state, ...] // output tuple format: [join_id, id, new_state, ...] | map{ tup -> @@ -222,13 +223,26 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { | groupTuple(by: 1, sort: 'hash', size: chInitialOutputList.size(), remainder: true) | map {join_ids, id, states -> def newJoinId = join_ids.unique{a, b -> a <=> b} - assert newJoinId.size() == 1: "Multiple join IDs were emitted for '$id'." + assert newJoinId.size() == 1: "Multiple events were emitted for '$id'." def newJoinIdUnique = newJoinId[0] def newState = states.inject([:]){ old_state, state_to_add -> - def overlap = old_state.keySet().intersect(state_to_add.keySet()) - assert overlap.isEmpty() : "ID $id: multiple entries for for argument(s) $overlap were emitted." - def return_state = old_state + state_to_add - return return_state + def stateToAddNoMultiple = state_to_add.findAll{k, v -> !multipleArgs.contains(k)} + // First add non multiple arguments + + def overlap = old_state.keySet().intersect(stateToAddNoMultiple.keySet()) + assert overlap.isEmpty() : "ID $id: multiple entries for " + + " argument(s) $overlap were emitted." + def return_state = old_state + stateToAddNoMultiple + + // Add `multiple: true` arguments + def stateToAddMultiple = state_to_add.findAll{k, v -> multipleArgs.contains(k)} + stateToAddMultiple.each {k, v -> + def currentKey = return_state.getOrDefault(k, []) + def currentKeyList = currentKey instanceof List ? currentKey : [currentKey] + currentKeyList.add(v) + return_state[k] = currentKeyList + } + return return_state } // simplify output if need be diff --git a/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml new file mode 100644 index 000000000..77d26638b --- /dev/null +++ b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml @@ -0,0 +1,31 @@ +name: multiple_emit_channels +argument_groups: + - name: Outputs + arguments: + - name: "--input" + type: file + description: Input file + required: true + example: input.txt + - name: "--step_1_output" + required: true + type: file + direction: output + - name: "--step_3_output" + required: true + type: file + direction: output + - name: "--multiple_output" + required: true + multiple: true + type: file + direction: output +resources: + - type: nextflow_script + path: main.nf + entrypoint: base +dependencies: + - name: step1 + - name: step3 +platforms: + - type: nextflow diff --git a/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf new file mode 100644 index 000000000..0b447d8ef --- /dev/null +++ b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf @@ -0,0 +1,63 @@ +workflow base { + take: input_ch + main: + + step_1_ch = input_ch + // test fromstate and tostate with list[string] + | step1.run( + fromState: ["input"], + toState: { id, output, state -> ["step_1_output": output.output, "multiple_output": output.output] } + ) + + step_3_ch = input_ch + // test fromstate and tostate with map[string, string] + | step3.run( + fromState: ["input"], + toState: { id, output, state -> ["step_3_output": output.output, "multiple_output": output.output] } + ) + + emit: + step_1_ch + step_3_ch +} + +workflow test_base { + // todo: fix how `test_base` is able to access the test resources + Channel.value([ + "foo", + [ + "input": file("${params.rootDir}/resources/lines3.txt") + ] + ]) + | multiple_emit_channels + | toList() + | view { output_list -> + assert output_list.size() == 1 : "output channel should contain 1 event" + + def event = output_list[0] + assert event.size() == 2 : "outputs should contain two elements; [id, state]" + def id = event[0] + + // check id + assert id == "foo" : "id should be foo" + + // check state + def state = event[1] + assert state instanceof Map : "state should be a map" + assert "step_1_output" in state : "state should contain key 'step_1_output'" + assert "step_3_output" in state : "state should contain key 'step_3_output'" + + def step_1_output = state.step_1_output + assert step_1_output instanceof Path: "step_1_output should be a file" + assert step_1_output.toFile().exists() : "step_1_output file should exist" + + def step_3_output = state.step_3_output + assert step_3_output instanceof Path: "step_3_output should be a file" + assert step_3_output.toFile().exists() : "step_3_output file should exist" + + assert "multiple_output" in state : "state should contain 'multiple_output'" + def multiple_output = state.multiple_output + assert multiple_output instanceof List: "multiple_output should be a list" + assert multiple_output.size() == 2 + } +} \ No newline at end of file diff --git a/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala b/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala index 9194df96b..2e0a770cb 100644 --- a/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala +++ b/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala @@ -200,6 +200,37 @@ class NextflowScriptTest extends AnyFunSuite with BeforeAndAfterAll { assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") } + + test("Run multiple output channels standalone", NextflowTest) { + val (exitCode, stdOut, stdErr) = NextflowTestHelper.run( + mainScript = "target/nextflow/multiple_emit_channels/main.nf", + args = List( + "--id", "foo", + "--input", "resources/lines5.txt", + "--publish_dir", "output" + ), + cwd = tempFolFile + ) + + assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") + } + + + test("Run multiple output channels check output", DockerTest, NextflowTest) { + val (exitCode, stdOut, stdErr) = NextflowTestHelper.run( + mainScript = "target/nextflow/multiple_emit_channels/main.nf", + entry = Some("test_base"), + args = List( + "--rootDir", tempFolStr, + "--publish_dir", "output" + ), + cwd = tempFolFile + ) + + assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") + } + + test("Check whether --help is same as Viash's --help", NextflowTest) { // except that WorkflowHelper.nf will not print alternatives, and // will always prefix argument names with -- (so --foo, not -f or foo).