From 3e33053b47ca95b725738db11023d7109167ea2a Mon Sep 17 00:00:00 2001 From: Dries Schaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:49:07 +0100 Subject: [PATCH] FEAT: asynchronous publishing for the nextflow runner. (#736) --- CHANGELOG.md | 4 + .../arguments/_processOutputValues.nf | 20 ++- .../runners/nextflow/states/publishFiles.nf | 154 ++++++++++++++++++ .../runners/nextflow/states/publishStates.nf | 58 ++----- .../workflowFactory/workflowFactory.nf | 131 +++++++++++++-- .../multiple_emit_channels/config.vsh.yaml | 31 ++++ .../src/multiple_emit_channels/main.nf | 63 +++++++ .../runners/nextflow/NextflowScriptTest.scala | 31 ++++ 8 files changed, 426 insertions(+), 66 deletions(-) create mode 100644 src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf create mode 100644 src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml create mode 100644 src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a10ded3b..4160fcc94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ TODO add summary +## NEW FEATURES + +* `Nextflow` runner: allow emitting multiple output channels (PR #736). + ## MINOR CHANGES * `viash-hub`: Change the url for viash-hub Git access to packages.viash-hub.com (PR #774). diff --git a/src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf b/src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf index 01eb4ca6d..aded1e8e8 100644 --- a/src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf +++ b/src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf @@ -1,12 +1,5 @@ -Map _processOutputValues(Map outputs, Map config, String id, String key) { +Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) { if (!workflow.stubRun) { - config.allArguments.each { arg -> - if (arg.direction == "output" && arg.required) { - assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : - "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" - } - } - outputs = outputs.collectEntries { name, value -> def par = config.allArguments.find { it.plainName == name && it.direction == "output" } assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument" @@ -18,3 +11,14 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) { } return outputs } + +void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) { + if (!workflow.stubRun) { + config.allArguments.each { arg -> + if (arg.direction == "output" && arg.required) { + assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : + "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" + } + } + } +} \ No newline at end of file diff --git a/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf b/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf new file mode 100644 index 000000000..a0270d168 --- /dev/null +++ b/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf @@ -0,0 +1,154 @@ +def publishFiles(Map args) { + def key_ = args.get("key") + + assert key_ != null : "publishFiles: key must be specified" + + workflow publishFilesWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] + + // the input files and the target output filenames + def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() + def inputFiles_ = inputoutputFilenames_[0] + def outputFilenames_ = inputoutputFilenames_[1] + + [id_, inputFiles_, outputFilenames_] + } + | publishFilesProc + emit: input_ch + } + return publishFilesWf +} + +process publishFilesProc { + // todo: check publishpath? + publishDir path: "${getPublishDir()}/", mode: "copy" + tag "$id" + input: + tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + output: + tuple val(id), path{outputFiles} + script: + def copyCommands = [ + inputFiles instanceof List ? inputFiles : [inputFiles], + outputFiles instanceof List ? outputFiles : [outputFiles] + ] + .transpose() + .collectMany{infile, outfile -> + if (infile.toString() != outfile.toString()) { + [ + "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", + "cp -r '${infile.toString()}' '${outfile.toString()}'" + ] + } else { + // no need to copy if infile is the same as outfile + [] + } + } + """ + echo "Copying output files to destination folder" + ${copyCommands.join("\n ")} + """ +} + + +// this assumes that the state contains no other values other than those specified in the config +def publishFilesByConfig(Map args) { + def config = args.get("config") + assert config != null : "publishFilesByConfig: config must be specified" + + def key_ = args.get("key", config.name) + assert key_ != null : "publishFilesByConfig: key must be specified" + + workflow publishFilesSimpleWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10] + def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad'] + + + // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // - key is a String + // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) + // - inputPath is a List[Path] + // - outputFilename is a List[String] + // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) + def processedState = + config.allArguments + .findAll { it.direction == "output" } + .collectMany { par -> + def plainName_ = par.plainName + // if the state does not contain the key, it's an + // optional argument for which the component did + // not generate any output OR multiple channels were emitted + // and the output was just not added to using the channel + // that is now being parsed + if (!state_.containsKey(plainName_)) { + return [] + } + def value = state_[plainName_] + // if the parameter is not a file, it should be stored + // in the state as-is, but is not something that needs + // to be copied from the source path to the dest path + if (par.type != "file") { + return [[inputPath: [], outputFilename: []]] + } + // if the orig state does not contain this filename, + // it's an optional argument for which the user specified + // that it should not be returned as a state + if (!origState_.containsKey(plainName_)) { + return [] + } + def filenameTemplate = origState_[plainName_] + // if the pararameter is multiple: true, fetch the template + if (par.multiple && filenameTemplate instanceof List) { + filenameTemplate = filenameTemplate[0] + } + // instantiate the template + def filename = filenameTemplate + .replaceAll('\\$id', id_) + .replaceAll('\\$\\{id\\}', id_) + .replaceAll('\\$key', key_) + .replaceAll('\\$\\{key\\}', key_) + if (par.multiple) { + // if the parameter is multiple: true, the filename + // should contain a wildcard '*' that is replaced with + // the index of the file + assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" + def outputPerFile = value.withIndex().collect{ val, ix -> + def filename_ix = filename.replace("*", ix.toString()) + def inputPath = val instanceof File ? val.toPath() : val + [inputPath: inputPath, outputFilename: filename_ix] + } + def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> + [key, outputPerFile.collect{dic -> dic[key]}] + } + return [[key: plainName_] + transposedOutputs] + } else { + def value_ = java.nio.file.Paths.get(filename) + def inputPath = value instanceof File ? value.toPath() : value + return [[inputPath: [inputPath], outputFilename: [filename]]] + } + } + + def inputPaths = processedState.collectMany{it.inputPath} + def outputFilenames = processedState.collectMany{it.outputFilename} + + + [id_, inputPaths, outputFilenames] + } + | publishFilesProc + emit: input_ch + } + return publishFilesSimpleWf +} + + + diff --git a/src/main/resources/io/viash/runners/nextflow/states/publishStates.nf b/src/main/resources/io/viash/runners/nextflow/states/publishStates.nf index e3fd40e85..c57fbdfea 100644 --- a/src/main/resources/io/viash/runners/nextflow/states/publishStates.nf +++ b/src/main/resources/io/viash/runners/nextflow/states/publishStates.nf @@ -54,8 +54,6 @@ def publishStates(Map args) { // the input files and the target output filenames def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() - def inputFiles_ = inputoutputFilenames_[0] - def outputFilenames_ = inputoutputFilenames_[1] def yamlFilename = yamlTemplate_ .replaceAll('\\$id', id_) @@ -68,7 +66,7 @@ def publishStates(Map args) { // convert state to yaml blob def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename)) - [id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -80,33 +78,17 @@ process publishStatesProc { publishDir path: "${getPublishDir()}/", mode: "copy" tag "$id" input: - tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + tuple val(id), val(yamlBlob), val(yamlFile) output: - tuple val(id), path{[yamlFile] + outputFiles} + tuple val(id), path{[yamlFile]} script: - def copyCommands = [ - inputFiles instanceof List ? inputFiles : [inputFiles], - outputFiles instanceof List ? outputFiles : [outputFiles] - ] - .transpose() - .collectMany{infile, outfile -> - if (infile.toString() != outfile.toString()) { - [ - "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", - "cp -r '${infile.toString()}' '${outfile.toString()}'" - ] - } else { - // no need to copy if infile is the same as outfile - [] - } - } """ -mkdir -p "\$(dirname '${yamlFile}')" -echo "Storing state as yaml" -echo '${yamlBlob}' > '${yamlFile}' -echo "Copying output files to destination folder" -${copyCommands.join("\n ")} -""" + mkdir -p "\$(dirname '${yamlFile}')" + echo "Storing state as yaml" + cat > '${yamlFile}' << HERE +${yamlBlob} +HERE + """ } @@ -137,13 +119,10 @@ def publishStatesByConfig(Map args) { .replaceAll('\\$\\{key\\}', key_) def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent() - // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // the processed state is a list of [key, value] tuples, where // - key is a String // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) - // - inputPath is a List[Path] - // - outputFilename is a List[String] // - (key, value) are the tuples that will be saved to the state.yaml file - // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) def processedState = config.allArguments .findAll { it.direction == "output" } @@ -160,7 +139,7 @@ def publishStatesByConfig(Map args) { // in the state as-is, but is not something that needs // to be copied from the source path to the dest path if (par.type != "file") { - return [[key: plainName_, value: value, inputPath: [], outputFilename: []]] + return [[key: plainName_, value: value]] } // if the orig state does not contain this filename, // it's an optional argument for which the user specified @@ -191,13 +170,9 @@ def publishStatesByConfig(Map args) { if (yamlDir != null) { value_ = yamlDir.relativize(value_) } - def inputPath = val instanceof File ? val.toPath() : val - [value: value_, inputPath: inputPath, outputFilename: filename_ix] - } - def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key -> - [key, outputPerFile.collect{dic -> dic[key]}] + return value_ } - return [[key: plainName_] + transposedOutputs] + return [["key": plainName_, "value": outputPerFile]] } else { def value_ = java.nio.file.Paths.get(filename) // if id contains a slash @@ -205,18 +180,17 @@ def publishStatesByConfig(Map args) { value_ = yamlDir.relativize(value_) } def inputPath = value instanceof File ? value.toPath() : value - return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]] + return [["key": plainName_, value: value_]] } } + def updatedState_ = processedState.collectEntries{[it.key, it.value]} - def inputPaths = processedState.collectMany{it.inputPath} - def outputFilenames = processedState.collectMany{it.outputFilename} // convert state to yaml blob def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_) - [id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf index 50d78b4c9..2361cfa0a 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf @@ -10,7 +10,8 @@ def _debug(workflowArgs, debugKey) { def workflowFactory(Map args, Map defaultWfArgs, Map meta) { def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta) def key_ = workflowArgs["key"] - + def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName} + workflow workflowInstance { take: input_ @@ -167,12 +168,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } // TODO: move some of the _meta.join_id wrangling to the safeJoin() function. - def chInitialOutput = chArgsWithDefaults + def chInitialOutputMulti = chArgsWithDefaults | _debug(workflowArgs, "processed") // run workflow | innerWorkflowFactory(workflowArgs) - // check output tuple - | map { id_, output_ -> + def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti] + assert chInitialOutputList.size() > 0: "should have emitted at least one output channel" + // Add a channel ID to the events, which designates the channel the event was emitted from as a running number + // This number is used to sort the events later when the events are gathered from across the channels. + def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex -> + def newChannel = channel + | map {tuple -> + assert tuple instanceof List : + "Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" + + " Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" + + " Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}" + + def newEvent = [channelIndex] + tuple + return newEvent + } + return newChannel + } + // Put the events into 1 channel, cover case where there is only one channel is emitted + def chInitialOutput = chInitialOutputList.size() > 1 ? \ + chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \ + chInitialOutputListWithIndexedEvents[0] + def chInitialOutputProcessed = chInitialOutput + | map { tuple -> + def channelId = tuple[0] + def id_ = tuple[1] + def output_ = tuple[2] // see if output map contains metadata def meta_ = @@ -185,19 +210,95 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { output_ = output_.findAll{k, v -> k != "_meta"} // check value types - output_ = _processOutputValues(output_, meta.config, id_, key_) + output_ = _checkValidOutputArgument(output_, meta.config, id_, key_) - // simplify output if need be - if (workflowArgs.auto.simplifyOutput && output_.size() == 1) { - output_ = output_.values()[0] + [join_id, channelId, id_, output_] + } + // | view{"chInitialOutput: ${it.take(3)}"} + + // join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...] + def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_) + // input tuple format: [join_id, channel_id, id, output, prev_state, ...] + // output tuple format: [join_id, channel_id, id, new_state, ...] + | map{ tup -> + def new_state = workflowArgs.toState(tup.drop(2).take(3)) + tup.take(3) + [new_state] + tup.drop(5) + } + if (workflowArgs.auto.publish == "state") { + def chPublishFiles = chPublishWithPreviousState + // input tuple format: [join_id, channel_id, id, new_state, ...] + // output tuple format: [join_id, channel_id, id, new_state] + | map{ tup -> + tup.take(4) } - [join_id, id_, output_] + safeJoin(chPublishFiles, chArgsWithDefaults, key_) + // input tuple format: [join_id, channel_id, id, new_state, orig_state, ...] + // output tuple format: [id, new_state, orig_state] + | map { tup -> + tup.drop(2).take(3) + } + | publishFilesByConfig(key: key_, config: meta.config) + } + // Join the state from the events that were emitted from different channels + def chJoined = chInitialOutputProcessed + | map {tuple -> + def join_id = tuple[0] + def channel_id = tuple[1] + def id = tuple[2] + def other = tuple.drop(3) + // Below, groupTuple is used to join the events. To make sure resuming a workflow + // keeps working, the output state must be deterministic. This means the state needs to be + // sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash', + // but hashing the state when it is large can be problematic in terms of performance. + // Therefore, a custom comparator function is provided. We add the channel ID to the + // states so that we can use the channel ID to sort the items. + def stateWithChannelID = [[channel_id] * other.size(), other].transpose() + // A comparator that is provided to groupTuple's 'sort' argument is applied + // to all elements of the event tuple (that is not the 'id'). The comparator + // closure that is used below expects the input to be List. So the join_id and + // channel_id must also be wrapped in a list. + [[join_id], [channel_id], id] + stateWithChannelID } - // | view{"chInitialOutput: ${it.take(3)}"} + | groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true) + | map {join_ids, _, id, statesWithChannelID -> + // Remove the channel IDs from the states + def states = statesWithChannelID.collect{it[1]} + def newJoinId = join_ids.flatten().unique{a, b -> a <=> b} + assert newJoinId.size() == 1: "Multiple events were emitted for '$id'." + def newJoinIdUnique = newJoinId[0] + def newState = states.inject([:]){ old_state, state_to_add -> + def stateToAddNoMultiple = state_to_add.findAll{k, v -> !multipleArgs.contains(k)} + // First add non multiple arguments + + def overlap = old_state.keySet().intersect(stateToAddNoMultiple.keySet()) + assert overlap.isEmpty() : "ID $id: multiple entries for " + + " argument(s) $overlap were emitted." + def return_state = old_state + stateToAddNoMultiple + + // Add `multiple: true` arguments + def stateToAddMultiple = state_to_add.findAll{k, v -> multipleArgs.contains(k)} + stateToAddMultiple.each {k, v -> + def currentKey = return_state.getOrDefault(k, []) + def currentKeyList = currentKey instanceof List ? currentKey : [currentKey] + currentKeyList.add(v) + return_state[k] = currentKeyList + } + return return_state + } + + _checkAllRequiredOuputsPresent(newState, meta.config, id, key_) + // simplify output if need be + if (workflowArgs.auto.simplifyOutput && newState.size() == 1) { + newState = newState.values()[0] + } + + return [newJoinIdUnique, id, newState] + } + // join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...] - def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_) + def chNewState = safeJoin(chJoined, chRunFiltered, key_) // input tuple format: [join_id, id, output, prev_state, ...] // output tuple format: [join_id, id, new_state, ...] | map{ tup -> @@ -206,23 +307,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } if (workflowArgs.auto.publish == "state") { - def chPublish = chNewState + def chPublishStates = chNewState // input tuple format: [join_id, id, new_state, ...] // output tuple format: [join_id, id, new_state] | map{ tup -> tup.take(3) } - safeJoin(chPublish, chArgsWithDefaults, key_) + safeJoin(chPublishStates, chArgsWithDefaults, key_) // input tuple format: [join_id, id, new_state, orig_state, ...] // output tuple format: [id, new_state, orig_state] | map { tup -> tup.drop(1).take(3) - } + } | publishStatesByConfig(key: key_, config: meta.config) } - - // remove join_id and meta chReturn = chNewState | map { tup -> // input tuple format: [join_id, id, new_state, ...] diff --git a/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml new file mode 100644 index 000000000..77d26638b --- /dev/null +++ b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/config.vsh.yaml @@ -0,0 +1,31 @@ +name: multiple_emit_channels +argument_groups: + - name: Outputs + arguments: + - name: "--input" + type: file + description: Input file + required: true + example: input.txt + - name: "--step_1_output" + required: true + type: file + direction: output + - name: "--step_3_output" + required: true + type: file + direction: output + - name: "--multiple_output" + required: true + multiple: true + type: file + direction: output +resources: + - type: nextflow_script + path: main.nf + entrypoint: base +dependencies: + - name: step1 + - name: step3 +platforms: + - type: nextflow diff --git a/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf new file mode 100644 index 000000000..0b447d8ef --- /dev/null +++ b/src/test/resources/testnextflowvdsl3/src/multiple_emit_channels/main.nf @@ -0,0 +1,63 @@ +workflow base { + take: input_ch + main: + + step_1_ch = input_ch + // test fromstate and tostate with list[string] + | step1.run( + fromState: ["input"], + toState: { id, output, state -> ["step_1_output": output.output, "multiple_output": output.output] } + ) + + step_3_ch = input_ch + // test fromstate and tostate with map[string, string] + | step3.run( + fromState: ["input"], + toState: { id, output, state -> ["step_3_output": output.output, "multiple_output": output.output] } + ) + + emit: + step_1_ch + step_3_ch +} + +workflow test_base { + // todo: fix how `test_base` is able to access the test resources + Channel.value([ + "foo", + [ + "input": file("${params.rootDir}/resources/lines3.txt") + ] + ]) + | multiple_emit_channels + | toList() + | view { output_list -> + assert output_list.size() == 1 : "output channel should contain 1 event" + + def event = output_list[0] + assert event.size() == 2 : "outputs should contain two elements; [id, state]" + def id = event[0] + + // check id + assert id == "foo" : "id should be foo" + + // check state + def state = event[1] + assert state instanceof Map : "state should be a map" + assert "step_1_output" in state : "state should contain key 'step_1_output'" + assert "step_3_output" in state : "state should contain key 'step_3_output'" + + def step_1_output = state.step_1_output + assert step_1_output instanceof Path: "step_1_output should be a file" + assert step_1_output.toFile().exists() : "step_1_output file should exist" + + def step_3_output = state.step_3_output + assert step_3_output instanceof Path: "step_3_output should be a file" + assert step_3_output.toFile().exists() : "step_3_output file should exist" + + assert "multiple_output" in state : "state should contain 'multiple_output'" + def multiple_output = state.multiple_output + assert multiple_output instanceof List: "multiple_output should be a list" + assert multiple_output.size() == 2 + } +} \ No newline at end of file diff --git a/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala b/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala index 9194df96b..2e0a770cb 100644 --- a/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala +++ b/src/test/scala/io/viash/runners/nextflow/NextflowScriptTest.scala @@ -200,6 +200,37 @@ class NextflowScriptTest extends AnyFunSuite with BeforeAndAfterAll { assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") } + + test("Run multiple output channels standalone", NextflowTest) { + val (exitCode, stdOut, stdErr) = NextflowTestHelper.run( + mainScript = "target/nextflow/multiple_emit_channels/main.nf", + args = List( + "--id", "foo", + "--input", "resources/lines5.txt", + "--publish_dir", "output" + ), + cwd = tempFolFile + ) + + assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") + } + + + test("Run multiple output channels check output", DockerTest, NextflowTest) { + val (exitCode, stdOut, stdErr) = NextflowTestHelper.run( + mainScript = "target/nextflow/multiple_emit_channels/main.nf", + entry = Some("test_base"), + args = List( + "--rootDir", tempFolStr, + "--publish_dir", "output" + ), + cwd = tempFolFile + ) + + assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") + } + + test("Check whether --help is same as Viash's --help", NextflowTest) { // except that WorkflowHelper.nf will not print alternatives, and // will always prefix argument names with -- (so --foo, not -f or foo).