From e041735dbed62030afb2590fbf83538b3498d55e Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 24 Dec 2024 09:26:33 +0000 Subject: [PATCH] Spawn a different publishing process for each output argument --- .../runners/nextflow/states/publishFiles.nf | 248 +++++++++--------- .../workflowFactory/workflowFactory.nf | 13 +- 2 files changed, 138 insertions(+), 123 deletions(-) diff --git a/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf b/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf index a0270d168..24af6c56b 100644 --- a/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf +++ b/src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf @@ -1,67 +1,136 @@ -def publishFiles(Map args) { - def key_ = args.get("key") +def _publishingProcessFactory(String wfKey) { + // autodetect process key + def procKeyPrefix = "PublishFiles_${wfKey}" + def scriptMeta = nextflow.script.ScriptMeta.current() + def existing = scriptMeta.getProcessNames().findAll{it.startsWith(procKeyPrefix)} + def numbers = existing.collect{it.replace(procKeyPrefix, "0").toInteger()} + def newNumber = (numbers + [-1]).max() + 1 - assert key_ != null : "publishFiles: key must be specified" - - workflow publishFilesWf { - take: input_ch - main: - input_ch - | map { tup -> - def id_ = tup[0] - def state_ = tup[1] + def procKey = newNumber == 0 ? procKeyPrefix : "$procKeyPrefix$newNumber" - // the input files and the target output filenames - def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() - def inputFiles_ = inputoutputFilenames_[0] - def outputFilenames_ = inputoutputFilenames_[1] - - [id_, inputFiles_, outputFilenames_] - } - | publishFilesProc - emit: input_ch + if (newNumber > 0) { + log.warn "Key for module '${wfKey}' is duplicated.\n", + "If you run a component multiple times in the same workflow,\n" + + "it's recommended you set a unique key for every call,\n" + + "for example: ${wfKey}.run(key: \"foo\")." } - return publishFilesWf + + def publishDir = getPublishDir() + + // generate process string + def procStr = + """nextflow.enable.dsl=2 + | + |process $procKey { + |publishDir path: "$publishDir", mode: "copy" + |tag "\$id" + |input: + | tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + |output: + | tuple val(id), path{outputFiles} + |script: + |def copyCommands = [ + | inputFiles instanceof List ? inputFiles : [inputFiles], + | outputFiles instanceof List ? outputFiles : [outputFiles] + |] + | .transpose() + | .collectMany{infile, outfile -> + | if (infile.toString() != outfile.toString()) { + | [ + | "[ -d \\"\\\$(dirname '\${outfile.toString()}')\\" ] || mkdir -p \\"\\\$(dirname '\${outfile.toString()}')\\"", + | "cp -r '\${infile.toString()}' '\${outfile.toString()}'" + | ] + | } else { + | // no need to copy if infile is the same as outfile + | [] + | } + | } + | + |\"\"\" + |echo "Copying output files to destination folder" + |\${copyCommands.join("\\n ")} + |\"\"\" + |} + |""".stripMargin() + + // write process to temp file + def tempFile = java.nio.file.Files.createTempFile("viash-process-${procKey}-", ".nf") + // addShutdownHook { java.nio.file.Files.deleteIfExists(tempFile) } + tempFile.text = procStr + + // create process from temp file + def binding = new nextflow.script.ScriptBinding([:]) + def session = nextflow.Nextflow.getSession() + def parser = new nextflow.script.ScriptParser(session) + .setModule(true) + .setBinding(binding) + def moduleScript = parser.runScript(tempFile) + .getScript() + + // register module in meta + def module = new nextflow.script.IncludeDef.Module(name: procKey) + scriptMeta.addModule(moduleScript, module.name, module.alias) + + // retrieve and return process from meta + return scriptMeta.getProcess(procKey) } -process publishFilesProc { - // todo: check publishpath? - publishDir path: "${getPublishDir()}/", mode: "copy" - tag "$id" - input: - tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) - output: - tuple val(id), path{outputFiles} - script: - def copyCommands = [ - inputFiles instanceof List ? inputFiles : [inputFiles], - outputFiles instanceof List ? outputFiles : [outputFiles] - ] - .transpose() - .collectMany{infile, outfile -> - if (infile.toString() != outfile.toString()) { - [ - "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", - "cp -r '${infile.toString()}' '${outfile.toString()}'" - ] - } else { - // no need to copy if infile is the same as outfile - [] - } +def processStateForPar(par, id_, key_, state_, origState_) { + def plainName_ = par.plainName + // if the state does not contain the key, it's an + // optional argument for which the component did + // not generate any output OR multiple channels were emitted + // and the output was just not added to using the channel + // that is now being parsed + if (!state_.containsKey(plainName_)) { + return [] + } + def value = state_[plainName_] + // if the orig state does not contain this filename, + // it's an optional argument for which the user specified + // that it should not be returned as a state + if (!origState_.containsKey(plainName_)) { + return [] + } + def filenameTemplate = origState_[plainName_] + // if the pararameter is multiple: true, fetch the template + if (par.multiple && filenameTemplate instanceof List) { + filenameTemplate = filenameTemplate[0] + } + // instantiate the template + def filename = filenameTemplate + .replaceAll('\\$id', id_) + .replaceAll('\\$\\{id\\}', id_) + .replaceAll('\\$key', key_) + .replaceAll('\\$\\{key\\}', key_) + + if (par.multiple) { + // if the parameter is multiple: true, the filename + // should contain a wildcard '*' that is replaced with + // the index of the file + assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" + def outputPerFile = value.withIndex().collect{ val, ix -> + def filename_ix = filename.replace("*", ix.toString()) + def inputPath = val instanceof File ? val.toPath() : val + [inputPath: inputPath, outputFilename: filename_ix] } - """ - echo "Copying output files to destination folder" - ${copyCommands.join("\n ")} - """ -} + def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> + [key, outputPerFile.collect{dic -> dic[key]}] + } + return [[key: plainName_] + transposedOutputs] + } + def value_ = java.nio.file.Paths.get(filename) + def inputPath = value instanceof File ? value.toPath() : value + return [[inputPath: [inputPath], outputFilename: [filename]]] + +} // this assumes that the state contains no other values other than those specified in the config def publishFilesByConfig(Map args) { - def config = args.get("config") - assert config != null : "publishFilesByConfig: config must be specified" - - def key_ = args.get("key", config.name) + def parameter_info = args.get("par") + assert parameter_info != null : "publishFilesByConfig: par must be specified" + def key_ = args.get("key") assert key_ != null : "publishFilesByConfig: key must be specified" workflow publishFilesSimpleWf { @@ -72,79 +141,18 @@ def publishFilesByConfig(Map args) { def id_ = tup[0] def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10] def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad'] - - - // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + + // the processed state is a list of [key, inputPath, outputFilename] tuples, where // - key is a String - // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) // - inputPath is a List[Path] // - outputFilename is a List[String] - // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) - def processedState = - config.allArguments - .findAll { it.direction == "output" } - .collectMany { par -> - def plainName_ = par.plainName - // if the state does not contain the key, it's an - // optional argument for which the component did - // not generate any output OR multiple channels were emitted - // and the output was just not added to using the channel - // that is now being parsed - if (!state_.containsKey(plainName_)) { - return [] - } - def value = state_[plainName_] - // if the parameter is not a file, it should be stored - // in the state as-is, but is not something that needs - // to be copied from the source path to the dest path - if (par.type != "file") { - return [[inputPath: [], outputFilename: []]] - } - // if the orig state does not contain this filename, - // it's an optional argument for which the user specified - // that it should not be returned as a state - if (!origState_.containsKey(plainName_)) { - return [] - } - def filenameTemplate = origState_[plainName_] - // if the pararameter is multiple: true, fetch the template - if (par.multiple && filenameTemplate instanceof List) { - filenameTemplate = filenameTemplate[0] - } - // instantiate the template - def filename = filenameTemplate - .replaceAll('\\$id', id_) - .replaceAll('\\$\\{id\\}', id_) - .replaceAll('\\$key', key_) - .replaceAll('\\$\\{key\\}', key_) - if (par.multiple) { - // if the parameter is multiple: true, the filename - // should contain a wildcard '*' that is replaced with - // the index of the file - assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" - def outputPerFile = value.withIndex().collect{ val, ix -> - def filename_ix = filename.replace("*", ix.toString()) - def inputPath = val instanceof File ? val.toPath() : val - [inputPath: inputPath, outputFilename: filename_ix] - } - def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> - [key, outputPerFile.collect{dic -> dic[key]}] - } - return [[key: plainName_] + transposedOutputs] - } else { - def value_ = java.nio.file.Paths.get(filename) - def inputPath = value instanceof File ? value.toPath() : value - return [[inputPath: [inputPath], outputFilename: [filename]]] - } - } - + // - (inputPath, outputFilename) are the files that will be copied from src to dest + def processedState = processStateForPar(parameter_info, id_, key_, state_, origState_) def inputPaths = processedState.collectMany{it.inputPath} def outputFilenames = processedState.collectMany{it.outputFilename} - - [id_, inputPaths, outputFilenames] } - | publishFilesProc + | _publishingProcessFactory(parameter_info.plainName) emit: input_ch } return publishFilesSimpleWf diff --git a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf index 2361cfa0a..1dda4f6f7 100644 --- a/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf +++ b/src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf @@ -231,14 +231,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { | map{ tup -> tup.take(4) } - - safeJoin(chPublishFiles, chArgsWithDefaults, key_) + + def chPublishFilesWithDefaults = safeJoin(chPublishFiles, chArgsWithDefaults, key_) // input tuple format: [join_id, channel_id, id, new_state, orig_state, ...] // output tuple format: [id, new_state, orig_state] | map { tup -> tup.drop(2).take(3) } - | publishFilesByConfig(key: key_, config: meta.config) + + meta.config.allArguments.findAll { + it.type == "file" && it.direction == "output" + }.each{par -> + chPublishFilesWithDefaults + | publishFilesByConfig(key: key_, par: par) + } + } // Join the state from the events that were emitted from different channels def chJoined = chInitialOutputProcessed