From c7c77e05e76f1b5de3ef43da6abbd1955c6f16aa Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:27:52 +0000 Subject: [PATCH 1/3] Nextflow runner: add runElse. --- .../io/viash/runners/nextflow/channel/runEach.nf | 13 ++++++++++++- .../nextflow/workflowFactory/workflowFactory.nf | 9 +++++++++ .../src/test_wfs/filter_runif/main.nf | 3 ++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/resources/io/viash/runners/nextflow/channel/runEach.nf b/src/main/resources/io/viash/runners/nextflow/channel/runEach.nf index 3fbe6cc02..ef2a211e8 100644 --- a/src/main/resources/io/viash/runners/nextflow/channel/runEach.nf +++ b/src/main/resources/io/viash/runners/nextflow/channel/runEach.nf @@ -27,9 +27,11 @@ def runEach(Map args) { def toState_ = args.toState def filter_ = args.filter def runIf_ = args.runIf + def runElse_ = args.runElse def id_ = args.id assert !runIf_ || runIf_ instanceof Closure: "runEach: must pass a Closure to runIf." + assert !runElse_ || runElse_ instanceof Closure: "runElse: must pass a Closure to runElse." workflow runEachWf { take: input_ch @@ -65,6 +67,15 @@ def runEach(Map args) { chRun = id_ch chPassthrough = Channel.empty() } + def chPassthroughElse = null + if (runElse_){ + chPassthroughElse = chPassthrough.map{tup -> + runElse_(tup[0], tup[1], comp_) + } + } else { + chPassthroughElse = Channel.empty() + } + def data_ch = chRun | map{tup -> def new_data = tup[1] if (fromState_ instanceof Map) { @@ -105,7 +116,7 @@ def runEach(Map args) { : out_ch def return_ch = post_ch - | concat(chPassthrough) + | concat(chPassthroughElse) return_ch } diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf index 2361cfa0a..ed8972651 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf @@ -118,6 +118,15 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { chPassthrough = Channel.empty() } + def chPassthroughElse = null + if (runElse_){ + chPassthroughElse = chPassthrough.map{tup -> + runElse_(tup[0], tup[1]) + } + } else { + chPassthroughElse = Channel.empty() + } + def chRunFiltered = workflowArgs.filter ? chRun | filter{workflowArgs.filter(it)} : chRun diff --git a/src/test/resources/testnextflowvdsl3/src/test_wfs/filter_runif/main.nf b/src/test/resources/testnextflowvdsl3/src/test_wfs/filter_runif/main.nf index a5be5998f..2ba67b40b 100644 --- a/src/test/resources/testnextflowvdsl3/src/test_wfs/filter_runif/main.nf +++ b/src/test/resources/testnextflowvdsl3/src/test_wfs/filter_runif/main.nf @@ -12,7 +12,8 @@ workflow base { ]) | step1.run( filter: { id, data -> id != "three" }, - runIf: { id, data -> data.input.size() == 2 || id == "three" } + runIf: { id, data -> data.input.size() == 2 || id == "three" }, + runElse: {it} ) | view{"output: $it"} | toSortedList{ a, b -> a[0] <=> b[0] } From 49fff61c54dbf2ad9d6b4255f23a7975f1216b02 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:51:31 +0000 Subject: [PATCH 2/3] Fix missing variable --- .../viash/runners/nextflow/workflowFactory/workflowFactory.nf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf index ed8972651..27ea30649 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf @@ -119,9 +119,9 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } def chPassthroughElse = null - if (runElse_){ + if (workflowArgs.runElse_){ chPassthroughElse = chPassthrough.map{tup -> - runElse_(tup[0], tup[1]) + workflowArgs.runElse_(tup[0], tup[1]) } } else { chPassthroughElse = Channel.empty() From cb40df2f91a2fe7a314b36ac01127e1b9f22d7df Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Thu, 2 Jan 2025 09:09:00 +0000 Subject: [PATCH 3/3] Add runElse to valid argument list --- .../runners/nextflow/workflowFactory/processWorkflowArgs.nf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf index 7645d7ca3..53f65f99b 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf @@ -14,7 +14,7 @@ def processWorkflowArgs(Map args, Map defaultWfArgs, Map meta) { assert key ==~ /^[a-zA-Z_]\w*$/ : "Error in module '$key': Expected process argument 'key' to consist of only letters, digits or underscores. Found: ${key}" // check for any unexpected keys - def expectedKeys = ["key", "directives", "auto", "map", "mapId", "mapData", "mapPassthrough", "filter", "runIf", "fromState", "toState", "args", "renameKeys", "debug"] + def expectedKeys = ["key", "directives", "auto", "map", "mapId", "mapData", "mapPassthrough", "filter", "runIf", "runElse", "fromState", "toState", "args", "renameKeys", "debug"] def unexpectedKeys = workflowArgs.keySet() - expectedKeys assert unexpectedKeys.isEmpty() : "Error in module '$key': unexpected arguments to the '.run()' function: '${unexpectedKeys.join("', '")}'"