Skip to content

Commit

Permalink
Spawn a different publishing process for each output argument
Browse files Browse the repository at this point in the history
  • Loading branch information
DriesSchaumont committed Dec 24, 2024
1 parent 7ebb587 commit e041735
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 123 deletions.
248 changes: 128 additions & 120 deletions src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf
Original file line number Diff line number Diff line change
@@ -1,67 +1,136 @@
def publishFiles(Map args) {
def key_ = args.get("key")
def _publishingProcessFactory(String wfKey) {
// autodetect process key
def procKeyPrefix = "PublishFiles_${wfKey}"
def scriptMeta = nextflow.script.ScriptMeta.current()
def existing = scriptMeta.getProcessNames().findAll{it.startsWith(procKeyPrefix)}
def numbers = existing.collect{it.replace(procKeyPrefix, "0").toInteger()}
def newNumber = (numbers + [-1]).max() + 1

assert key_ != null : "publishFiles: key must be specified"

workflow publishFilesWf {
take: input_ch
main:
input_ch
| map { tup ->
def id_ = tup[0]
def state_ = tup[1]
def procKey = newNumber == 0 ? procKeyPrefix : "$procKeyPrefix$newNumber"

// the input files and the target output filenames
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
def inputFiles_ = inputoutputFilenames_[0]
def outputFilenames_ = inputoutputFilenames_[1]

[id_, inputFiles_, outputFilenames_]
}
| publishFilesProc
emit: input_ch
if (newNumber > 0) {
log.warn "Key for module '${wfKey}' is duplicated.\n",
"If you run a component multiple times in the same workflow,\n" +
"it's recommended you set a unique key for every call,\n" +
"for example: ${wfKey}.run(key: \"foo\")."
}
return publishFilesWf

def publishDir = getPublishDir()

// generate process string
def procStr =
"""nextflow.enable.dsl=2
|
|process $procKey {
|publishDir path: "$publishDir", mode: "copy"
|tag "\$id"
|input:
| tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|output:
| tuple val(id), path{outputFiles}
|script:
|def copyCommands = [
| inputFiles instanceof List ? inputFiles : [inputFiles],
| outputFiles instanceof List ? outputFiles : [outputFiles]
|]
| .transpose()
| .collectMany{infile, outfile ->
| if (infile.toString() != outfile.toString()) {
| [
| "[ -d \\"\\\$(dirname '\${outfile.toString()}')\\" ] || mkdir -p \\"\\\$(dirname '\${outfile.toString()}')\\"",
| "cp -r '\${infile.toString()}' '\${outfile.toString()}'"
| ]
| } else {
| // no need to copy if infile is the same as outfile
| []
| }
| }
|
|\"\"\"
|echo "Copying output files to destination folder"
|\${copyCommands.join("\\n ")}
|\"\"\"
|}
|""".stripMargin()

// write process to temp file
def tempFile = java.nio.file.Files.createTempFile("viash-process-${procKey}-", ".nf")
// addShutdownHook { java.nio.file.Files.deleteIfExists(tempFile) }
tempFile.text = procStr

// create process from temp file
def binding = new nextflow.script.ScriptBinding([:])
def session = nextflow.Nextflow.getSession()
def parser = new nextflow.script.ScriptParser(session)
.setModule(true)
.setBinding(binding)
def moduleScript = parser.runScript(tempFile)
.getScript()

// register module in meta
def module = new nextflow.script.IncludeDef.Module(name: procKey)
scriptMeta.addModule(moduleScript, module.name, module.alias)

// retrieve and return process from meta
return scriptMeta.getProcess(procKey)
}

process publishFilesProc {
// todo: check publishpath?
publishDir path: "${getPublishDir()}/", mode: "copy"
tag "$id"
input:
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
output:
tuple val(id), path{outputFiles}
script:
def copyCommands = [
inputFiles instanceof List ? inputFiles : [inputFiles],
outputFiles instanceof List ? outputFiles : [outputFiles]
]
.transpose()
.collectMany{infile, outfile ->
if (infile.toString() != outfile.toString()) {
[
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
"cp -r '${infile.toString()}' '${outfile.toString()}'"
]
} else {
// no need to copy if infile is the same as outfile
[]
}
def processStateForPar(par, id_, key_, state_, origState_) {
def plainName_ = par.plainName
// if the state does not contain the key, it's an
// optional argument for which the component did
// not generate any output OR multiple channels were emitted
// and the output was just not added to using the channel
// that is now being parsed
if (!state_.containsKey(plainName_)) {
return []
}
def value = state_[plainName_]
// if the orig state does not contain this filename,
// it's an optional argument for which the user specified
// that it should not be returned as a state
if (!origState_.containsKey(plainName_)) {
return []
}
def filenameTemplate = origState_[plainName_]
// if the pararameter is multiple: true, fetch the template
if (par.multiple && filenameTemplate instanceof List) {
filenameTemplate = filenameTemplate[0]
}
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)

if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
// the index of the file
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
def outputPerFile = value.withIndex().collect{ val, ix ->
def filename_ix = filename.replace("*", ix.toString())
def inputPath = val instanceof File ? val.toPath() : val
[inputPath: inputPath, outputFilename: filename_ix]
}
"""
echo "Copying output files to destination folder"
${copyCommands.join("\n ")}
"""
}
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
[key, outputPerFile.collect{dic -> dic[key]}]
}
return [[key: plainName_] + transposedOutputs]
}

def value_ = java.nio.file.Paths.get(filename)
def inputPath = value instanceof File ? value.toPath() : value
return [[inputPath: [inputPath], outputFilename: [filename]]]

}

// this assumes that the state contains no other values other than those specified in the config
def publishFilesByConfig(Map args) {
def config = args.get("config")
assert config != null : "publishFilesByConfig: config must be specified"

def key_ = args.get("key", config.name)
def parameter_info = args.get("par")
assert parameter_info != null : "publishFilesByConfig: par must be specified"
def key_ = args.get("key")
assert key_ != null : "publishFilesByConfig: key must be specified"

workflow publishFilesSimpleWf {
Expand All @@ -72,79 +141,18 @@ def publishFilesByConfig(Map args) {
def id_ = tup[0]
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']


// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where

// the processed state is a list of [key, inputPath, outputFilename] tuples, where
// - key is a String
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
// - inputPath is a List[Path]
// - outputFilename is a List[String]
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
def processedState =
config.allArguments
.findAll { it.direction == "output" }
.collectMany { par ->
def plainName_ = par.plainName
// if the state does not contain the key, it's an
// optional argument for which the component did
// not generate any output OR multiple channels were emitted
// and the output was just not added to using the channel
// that is now being parsed
if (!state_.containsKey(plainName_)) {
return []
}
def value = state_[plainName_]
// if the parameter is not a file, it should be stored
// in the state as-is, but is not something that needs
// to be copied from the source path to the dest path
if (par.type != "file") {
return [[inputPath: [], outputFilename: []]]
}
// if the orig state does not contain this filename,
// it's an optional argument for which the user specified
// that it should not be returned as a state
if (!origState_.containsKey(plainName_)) {
return []
}
def filenameTemplate = origState_[plainName_]
// if the pararameter is multiple: true, fetch the template
if (par.multiple && filenameTemplate instanceof List) {
filenameTemplate = filenameTemplate[0]
}
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
// the index of the file
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
def outputPerFile = value.withIndex().collect{ val, ix ->
def filename_ix = filename.replace("*", ix.toString())
def inputPath = val instanceof File ? val.toPath() : val
[inputPath: inputPath, outputFilename: filename_ix]
}
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
[key, outputPerFile.collect{dic -> dic[key]}]
}
return [[key: plainName_] + transposedOutputs]
} else {
def value_ = java.nio.file.Paths.get(filename)
def inputPath = value instanceof File ? value.toPath() : value
return [[inputPath: [inputPath], outputFilename: [filename]]]
}
}

// - (inputPath, outputFilename) are the files that will be copied from src to dest
def processedState = processStateForPar(parameter_info, id_, key_, state_, origState_)
def inputPaths = processedState.collectMany{it.inputPath}
def outputFilenames = processedState.collectMany{it.outputFilename}


[id_, inputPaths, outputFilenames]
}
| publishFilesProc
| _publishingProcessFactory(parameter_info.plainName)
emit: input_ch
}
return publishFilesSimpleWf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
| map{ tup ->
tup.take(4)
}

safeJoin(chPublishFiles, chArgsWithDefaults, key_)
def chPublishFilesWithDefaults = safeJoin(chPublishFiles, chArgsWithDefaults, key_)
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
// output tuple format: [id, new_state, orig_state]
| map { tup ->
tup.drop(2).take(3)
}
| publishFilesByConfig(key: key_, config: meta.config)

meta.config.allArguments.findAll {
it.type == "file" && it.direction == "output"
}.each{par ->
chPublishFilesWithDefaults
| publishFilesByConfig(key: key_, par: par)
}

}
// Join the state from the events that were emitted from different channels
def chJoined = chInitialOutputProcessed
Expand Down

0 comments on commit e041735

Please sign in to comment.