Skip to content

Commit

Permalink
Add runIf to runEach (#660)
Browse files Browse the repository at this point in the history
Co-authored-by: Robrecht Cannoodt <[email protected]>
  • Loading branch information
DriesSchaumont and rcannood authored Aug 13, 2024
1 parent 10a0ca3 commit 205f395
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

TODO add summary

## BREAKING CHANGES

* `NextflowPlatform`: Swap the order of execution of `runIf` and `filter` when calling `.run()`. This means that `runIf` is now executed before `filter` (PR #660).

## NEW FUNCTIONALITY

* `ExecutableRunner`: Add a `---docker_image_id` flag to view the Docker image ID of a built executable (PR #741).
Expand All @@ -10,6 +14,8 @@ TODO add summary

* `config schema`: Add `label` & `summary` fields for Config, PackageConfig, argument groups, and all argument types (PR #743).

* `NextflowPlatform`: Added `runIf` functionality to `runEach` (PR #660).

## MINOR CHANGES

* `ExecutableRunner`: Add parameter `docker_automount_prefix` to allow for a custom prefix for automounted folders (PR #739).
Expand Down
23 changes: 21 additions & 2 deletions src/main/resources/io/viash/runners/nextflow/channel/runEach.nf
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ def runEach(Map args) {
def fromState_ = args.fromState
def toState_ = args.toState
def filter_ = args.filter
def runIf_ = args.runIf
def id_ = args.id

assert !runIf_ || runIf_ instanceof Closure: "runEach: must pass a Closure to runIf."

workflow runEachWf {
take: input_ch
main:
Expand All @@ -49,7 +52,20 @@ def runEach(Map args) {
[new_id] + tup.drop(1)
}
: filter_ch
def data_ch = id_ch | map{tup ->
def chPassthrough = null
def chRun = null
if (runIf_) {
def idRunIfBranch = id_ch.branch{ tup ->
run: runIf_(tup[0], tup[1], comp_)
passthrough: true
}
chPassthrough = idRunIfBranch.passthrough
chRun = idRunIfBranch.run
} else {
chRun = id_ch
chPassthrough = Channel.empty()
}
def data_ch = chRun | map{tup ->
def new_data = tup[1]
if (fromState_ instanceof Map) {
new_data = fromState_.collectEntries{ key0, key1 ->
Expand Down Expand Up @@ -87,8 +103,11 @@ def runEach(Map args) {
[tup[0], new_state] + tup.drop(3)
}
: out_ch

def return_ch = post_ch
| concat(chPassthrough)

post_ch
return_ch
}

// mix all results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,31 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
tuple
}

def chModifiedFiltered = workflowArgs.filter ?
chModified | filter{workflowArgs.filter(it)} :
chModified

def chRun = null
def chPassthrough = null
if (workflowArgs.runIf) {
def runIfBranch = chModifiedFiltered.branch{ tup ->
def runIfBranch = chModified.branch{ tup ->
run: workflowArgs.runIf(tup[0], tup[1])
passthrough: true
}
chRun = runIfBranch.run
chPassthrough = runIfBranch.passthrough
} else {
chRun = chModifiedFiltered
chRun = chModified
chPassthrough = Channel.empty()
}

def chRunFiltered = workflowArgs.filter ?
chRun | filter{workflowArgs.filter(it)} :
chRun

def chArgs = workflowArgs.fromState ?
chRun | map{
chRunFiltered | map{
def new_data = workflowArgs.fromState(it.take(2))
[it[0], new_data]
} :
chRun | map {tup -> tup.take(2)}
chRunFiltered | map {tup -> tup.take(2)}

// fill in defaults
def chArgsWithDefaults = chArgs
Expand Down Expand Up @@ -196,7 +197,7 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
// | view{"chInitialOutput: ${it.take(3)}"}

// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
def chNewState = safeJoin(chInitialOutput, chModifiedFiltered, key_)
def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_)
// input tuple format: [join_id, id, output, prev_state, ...]
// output tuple format: [join_id, id, new_state, ...]
| map{ tup ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workflow base {
])
| step1.run(
filter: { id, data -> id != "three" },
runIf: { id, data -> data.input.size() == 2 }
runIf: { id, data -> data.input.size() == 2 || id == "three" }
)
| view{"output: $it"}
| toSortedList{ a, b -> a[0] <=> b[0] }
Expand Down

0 comments on commit 205f395

Please sign in to comment.