diff --git a/docs/reference/nextflow_vdsl3/import_module.qmd b/docs/reference/nextflow_vdsl3/import_module.qmd index 93f399fc1..3532c35a3 100644 --- a/docs/reference/nextflow_vdsl3/import_module.qmd +++ b/docs/reference/nextflow_vdsl3/import_module.qmd @@ -74,14 +74,6 @@ workflow { - `auto.transcript`: If `true`, the module's transcript will be published to the `params.transcriptDir` folder. Default: `false` (inherited from Viash config). -- `map` (`Function`): Apply a map over the incoming tuple. Example: `{ tup -> [ tup[0], [input: tup[1].output] ] + tup.drop(2) }`. Default: `null`. - -- `mapId` (`Function`): Apply a map over the ID element of a tuple (i.e. the first element). Example: `{ id -> id + "_foo" }`. Default: `null`. - -- `mapData` (`Function`): Apply a map over the data element of a tuple (i.e. the second element). Example: `{ data -> [ input: data.output ] }`. Default: `null`. - -- `mapPassthrough` (`Function`): Apply a map over the passthrough elements of a tuple (i.e. the tuple excl. the first two elements). Example: `{ pt -> pt.drop(1) }`. Default: `null`. - - `filter` (`Function`): Filter the channel. Example: `{ tup -> tup[0] == "foo" }`. Default: `null`. - `fromState`: Fetch data from the state and pass it to the module without altering the current state. `fromState` should be `null`, `List[String]`, `Map[String, String]` or a function. diff --git a/src/main/resources/io/viash/runners/nextflow/DataflowHelper.nf b/src/main/resources/io/viash/runners/nextflow/DataflowHelper.nf deleted file mode 100644 index 450ad7492..000000000 --- a/src/main/resources/io/viash/runners/nextflow/DataflowHelper.nf +++ /dev/null @@ -1,219 +0,0 @@ -// This helper file will be deprecated soon -dataflowHelperDeprecationWarningPrinted = false - -def dataflowHelperDeprecationWarning() { - if (!dataflowHelperDeprecationWarningPrinted) { - dataflowHelperDeprecationWarningPrinted = true - System.err.println("Warning: the functions in the DataflowHelper.nf (setWorkflowArguments, getWorkflowArguments) are deprecated and will be removed in Viash 0.9.0.") - } -} - -/* usage: -| setWorkflowArguments( - pca: [ "input": "input", "obsm_output": "obsm_pca" ] - harmonypy: [ "obs_covariates": "obs_covariates", "obsm_input": "obsm_pca" ], - find_neighbors: [ "obsm_input": "obsm_pca" ], - umap: [ "output": "output" ] -) -*/ - -def setWorkflowArguments(Map args) { - dataflowHelperDeprecationWarning() - - def wfKey = args.key != null ? args.key : "setWorkflowArguments" - args.keySet().removeAll(["key"]) - - - /* - data = [a:1, b:2, c:3] - // args = [foo: ["a", "b"], bar: ["b"]] - args = [foo: [a: 'a', out: "b"], bar: [in: "b"]] - */ - - workflow setWorkflowArgumentsInstance { - take: - input_ - - main: - output_ = input_ - | map{ tup -> - assert tup.size() : "Event should have length 2 or greater. Expected format: [id, data]." - def id = tup[0] - def data = tup[1] - def passthrough = tup.drop(2) - - // determine new data - def toRemove = args.collectMany{ _, dataKeys -> - // dataKeys is a map but could also be a list - dataKeys instanceof List ? dataKeys : dataKeys.values() - }.unique() - def newData = data.findAll{!toRemove.contains(it.key)} - - // determine splitargs - def splitArgs = args. - collectEntries{procKey, dataKeys -> - // dataKeys is a map but could also be a list - def newSplitData = dataKeys - .collectEntries{ val -> - newKey = val instanceof String ? val : val.key - origKey = val instanceof String ? val : val.value - [ newKey, data[origKey] ] - } - .findAll{it.value} - [procKey, newSplitData] - } - - // return output - [ id, newData, splitArgs] + passthrough - } - - emit: - output_ - } - - return setWorkflowArgumentsInstance.cloneWithName(wfKey) -} - -/* usage: -| getWorkflowArguments("harmonypy") -*/ - - -def getWorkflowArguments(Map args) { - dataflowHelperDeprecationWarning() - - def inputKey = args.inputKey != null ? args.inputKey : "input" - def wfKey = "getWorkflowArguments_" + args.key - - workflow getWorkflowArgumentsInstance { - take: - input_ - - main: - output_ = input_ - | map{ tup -> - assert tup.size() : "Event should have length 3 or greater. Expected format: [id, data, splitArgs]." - - def id = tup[0] - def data = tup[1] - def splitArgs = tup[2].clone() - - def passthrough = tup.drop(3) - - // try to infer arg name - if (data !instanceof Map) { - data = [[ inputKey, data ]].collectEntries() - } - assert splitArgs instanceof Map: "Third element of event (id: $id) should be a map" - assert splitArgs.containsKey(args.key): "Third element of event (id: $id) should have a key ${args.key}" - - def newData = data + splitArgs.remove(args.key) - - [ id, newData, splitArgs] + passthrough - } - - emit: - output_ - } - - return getWorkflowArgumentsInstance.cloneWithName(wfKey) - -} - - -def strictMap(Closure clos) { - dataflowHelperDeprecationWarning() - - def numArgs = clos.class.methods.find{it.name == "call"}.parameterCount - - workflow strictMapWf { - take: - input_ - - main: - output_ = input_ - | map{ tup -> - if (tup.size() != numArgs) { - throw new RuntimeException("Closure does not have the same number of arguments as channel tuple.\nNumber of closure arguments: $numArgs\nChannel tuple: $tup") - } - clos(tup) - } - - emit: - output_ - } - - return strictMapWf -} - -def passthroughMap(Closure clos) { - dataflowHelperDeprecationWarning() - - def numArgs = clos.class.methods.find{it.name == "call"}.parameterCount - - workflow passthroughMapWf { - take: - input_ - - main: - output_ = input_ - | map{ tup -> - def out = clos(tup.take(numArgs)) - out + tup.drop(numArgs) - } - - emit: - output_ - } - - return passthroughMapWf -} - -def passthroughFlatMap(Closure clos) { - dataflowHelperDeprecationWarning() - - def numArgs = clos.class.methods.find{it.name == "call"}.parameterCount - - workflow passthroughFlatMapWf { - take: - input_ - - main: - output_ = input_ - | flatMap{ tup -> - def out = clos(tup.take(numArgs)) - def pt = tup.drop(numArgs) - for (o in out) { - o.addAll(pt) - } - out - } - - emit: - output_ - } - - return passthroughFlatMapWf -} - -def passthroughFilter(Closure clos) { - dataflowHelperDeprecationWarning() - - def numArgs = clos.class.methods.find{it.name == "call"}.parameterCount - - workflow passthroughFilterWf { - take: - input_ - - main: - output_ = input_ - | filter{ tup -> - clos(tup.take(numArgs)) - } - - emit: - output_ - } - - return passthroughFilterWf -} diff --git a/src/main/resources/io/viash/runners/nextflow/channel/preprocessInputs.nf b/src/main/resources/io/viash/runners/nextflow/channel/preprocessInputs.nf deleted file mode 100644 index 487157e61..000000000 --- a/src/main/resources/io/viash/runners/nextflow/channel/preprocessInputs.nf +++ /dev/null @@ -1,65 +0,0 @@ -// This helper file will be deprecated soon -preprocessInputsDeprecationWarningPrinted = false - -def preprocessInputsDeprecationWarning() { - if (!preprocessInputsDeprecationWarningPrinted) { - preprocessInputsDeprecationWarningPrinted = true - System.err.println("Warning: preprocessInputs() is deprecated and will be removed in Viash 0.9.0.") - } -} - -/** - * Generate a nextflow Workflow that allows processing a channel of - * Vdsl3 formatted events and apply a Viash config to them: - * - Gather default parameters from the Viash config and make - * sure that they are correctly formatted (see applyConfig method). - * - Format the input parameters (also using the applyConfig method). - * - Apply the default parameter to the input parameters. - * - Do some assertions: - * ~ Check if the event IDs in the channel are unique. - * - * The events in the channel are formatted as tuples, with the - * first element of the tuples being a unique id of the parameter set, - * and the second element containg the the parameters themselves. - * Optional extra elements of the tuples will be passed to the output as is. - * - * @param args A map that must contain a 'config' key that points - * to a parsed config (see readConfig()). Optionally, a - * 'key' key can be provided which can be used to create a unique - * name for the workflow process. - * - * @return A workflow that allows processing a channel of Vdsl3 formatted events - * and apply a Viash config to them. - */ -def preprocessInputs(Map args) { - preprocessInputsDeprecationWarning() - - def config = args.config - assert config instanceof Map : - "Error in preprocessInputs: config must be a map. " + - "Expected class: Map. Found: config.getClass() is ${config.getClass()}" - def key_ = args.key ?: config.name - - // Get different parameter types (used throughout this function) - def defaultArgs = config.allArguments - .findAll { it.containsKey("default") } - .collectEntries { [ it.plainName, it.default ] } - - map { tup -> - def id = tup[0] - def data = tup[1] - def passthrough = tup.drop(2) - - def new_data = (defaultArgs + data).collectEntries { name, value -> - def par = config.allArguments.find { it.plainName == name && (it.direction == "input" || it.type == "file") } - - if (par != null) { - value = _checkArgumentType("input", par, value, "in module '$key_' id '$id'") - } - - [ name, value ] - } - - [ id, new_data ] + passthrough - } -} diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf index 7645d7ca3..eff6006c8 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf @@ -14,7 +14,7 @@ def processWorkflowArgs(Map args, Map defaultWfArgs, Map meta) { assert key ==~ /^[a-zA-Z_]\w*$/ : "Error in module '$key': Expected process argument 'key' to consist of only letters, digits or underscores. Found: ${key}" // check for any unexpected keys - def expectedKeys = ["key", "directives", "auto", "map", "mapId", "mapData", "mapPassthrough", "filter", "runIf", "fromState", "toState", "args", "renameKeys", "debug"] + def expectedKeys = ["key", "directives", "auto" "filter", "runIf", "fromState", "toState", "args", "renameKeys", "debug"] def unexpectedKeys = workflowArgs.keySet() - expectedKeys assert unexpectedKeys.isEmpty() : "Error in module '$key': unexpected arguments to the '.run()' function: '${unexpectedKeys.join("', '")}'" @@ -74,19 +74,12 @@ def processWorkflowArgs(Map args, Map defaultWfArgs, Map meta) { workflowArgs.directives.keySet().removeAll(["publishDir", "cpus", "memory", "label"]) } - for (nam in ["map", "mapId", "mapData", "mapPassthrough", "filter", "runIf"]) { + for (nam in ["filter", "runIf"]) { if (workflowArgs.containsKey(nam) && workflowArgs[nam]) { assert workflowArgs[nam] instanceof Closure : "Error in module '$key': Expected process argument '$nam' to be null or a Closure. Found: class ${workflowArgs[nam].getClass()}" } } - // TODO: should functions like 'map', 'mapId', 'mapData', 'mapPassthrough' be deprecated as well? - for (nam in ["map", "mapData", "mapPassthrough", "renameKeys"]) { - if (workflowArgs.containsKey(nam) && workflowArgs[nam] != null) { - log.warn "module '$key': workflow argument '$nam' is deprecated and will be removed in Viash 0.9.0. Please use 'fromState' and 'toState' instead." - } - } - // check fromState workflowArgs["fromState"] = _processFromState(workflowArgs.get("fromState"), key, meta.config) diff --git a/src/main/scala/io/viash/runners/nextflow/NextflowHelper.scala b/src/main/scala/io/viash/runners/nextflow/NextflowHelper.scala index 3f4cee500..5f0b85c5b 100644 --- a/src/main/scala/io/viash/runners/nextflow/NextflowHelper.scala +++ b/src/main/scala/io/viash/runners/nextflow/NextflowHelper.scala @@ -109,22 +109,6 @@ object NextflowHelper { | // auto settings | auto: readJsonBlob('''${jsonPrinter.print(autoJson)}'''), | - | // Apply a map over the incoming tuple - | // Example: `{ tup -> [ tup[0], [input: tup[1].output] ] + tup.drop(2) }` - | map: null, - | - | // Apply a map over the ID element of a tuple (i.e. the first element) - | // Example: `{ id -> id + "_foo" }` - | mapId: null, - | - | // Apply a map over the data element of a tuple (i.e. the second element) - | // Example: `{ data -> [ input: data.output ] }` - | mapData: null, - | - | // Apply a map over the passthrough elements of a tuple (i.e. the tuple excl. the first two elements) - | // Example: `{ pt -> pt.drop(1) }` - | mapPassthrough: null, - | | // Filter the channel | // Example: `{ tup -> tup[0] == "foo" }` | filter: null, diff --git a/src/test/resources/testnextflowvdsl3/src/test_wfs/concurrency/main.nf b/src/test/resources/testnextflowvdsl3/src/test_wfs/concurrency/main.nf index e3a649ad2..28f49779e 100644 --- a/src/test/resources/testnextflowvdsl3/src/test_wfs/concurrency/main.nf +++ b/src/test/resources/testnextflowvdsl3/src/test_wfs/concurrency/main.nf @@ -41,7 +41,6 @@ workflow base { | step1.run( key: "step1bis", - // TODO: renameKeys, map, mapId, mapData will be deprecated // TODO: test filter, runIf map: { id, state -> def new_state = state + [ @@ -50,18 +49,18 @@ workflow base { ] [id, new_state] }, - mapId: { id -> - "${id}_modified" - }, - mapData: { state -> - state + [another_key: "bar"] - }, - renameKeys: ["original_id": "oid"], fromState: { id, state -> ["input": state.file] }, toState: { id, output, state -> - state + ["step1bis_output": output.output] + def original_id = state.original_id + def new_state = state.findAll{k, v -> k != "original_id"} + + ["${id}_modified", state + + ["step1bis_output": output.output] + + ["another_key": "bar"] + + ["oid": original_id] + ] } ) diff --git a/src/test/resources/testnextflowvdsl3/workflows/pipeline1/main.nf b/src/test/resources/testnextflowvdsl3/workflows/pipeline1/main.nf index 55bb7d816..067dccd1c 100644 --- a/src/test/resources/testnextflowvdsl3/workflows/pipeline1/main.nf +++ b/src/test/resources/testnextflowvdsl3/workflows/pipeline1/main.nf @@ -66,45 +66,4 @@ workflow base { // : Channel[(String, File)] } -workflow test_map_mapdata_mapid_arguments { - channelValue - | view{ "DEBUG1: $it" } - | step1.run( - auto: [simplifyOutput: true] - ) - | view{ "DEBUG2: $it" } - | step2.run( - // test map - map: { [ it[0], [ "input1" : it[1], "input2" : it[2] ] ] }, - auto: [simplifyOutput: true] - ) - | view { "DEBUG3: $it" } - | step3.run( - // test id - mapId: {it + "_bar"}, - // test mapdata - mapData: { [ "input": [ it.output1 , it.output2 ] ] }, - auto: [simplifyOutput: true] - ) - /* TESTING */ - | view{ "DEBUG4: $it"} - | toList() - | view { output_list -> - assert output_list.size() == 1 : "output channel should contain 1 event" - - def output = output_list[0] - assert output.size() == 2 : "outputs should contain two elements; [id, output]" - def id = output[0] - - // check id - assert id == "foo_bar" : "id should be foo_bar" - - // check final output file - def output_str = output[1].readLines().join("\n") - assert output_str.matches('^11 .*$') : 'output should match ^11 .*$' - - // return something to print - "DEBUG5: $output" - } -} diff --git a/src/test/scala/io/viash/runners/nextflow/Vdsl3ModuleTest.scala b/src/test/scala/io/viash/runners/nextflow/Vdsl3ModuleTest.scala index e8188119a..035cf38d1 100644 --- a/src/test/scala/io/viash/runners/nextflow/Vdsl3ModuleTest.scala +++ b/src/test/scala/io/viash/runners/nextflow/Vdsl3ModuleTest.scala @@ -63,19 +63,6 @@ class Vdsl3ModuleTest extends AnyFunSuite with BeforeAndAfterAll { assert(!lines2.isDefined) } - test("Test map/mapData/id arguments", DockerTest, NextflowTest) { - - val (exitCode, stdOut, stdErr) = NextflowTestHelper.run( - mainScript = "workflows/pipeline1/main.nf", - entry = Some("test_map_mapdata_mapid_arguments"), - args = List("--publish_dir", "output"), - cwd = tempFolFile - ) - - assert(exitCode == 0, s"\nexit code was $exitCode\nStd output:\n$stdOut\nStd error:\n$stdErr") - } - - test("Check whether --help is same as Viash's --help", NextflowTest) { // except that WorkflowHelper.nf will not print alternatives, and // will always prefix argument names with -- (so --foo, not -f or foo).