From cf0546b1006857f040e52b14a35d3e2152d5ed35 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 17 May 2024 05:12:30 -0500 Subject: [PATCH] Workflow output definition (#4784) Signed-off-by: Ben Sherman Signed-off-by: Paolo Di Tommaso Co-authored-by: Paolo Di Tommaso --- docs/channel.md | 2 +- docs/cli.md | 2 + docs/config.md | 9 + docs/operator.md | 4 + docs/process.md | 3 +- docs/workflow.md | 520 +++++++++++++----- .../src/main/groovy/nextflow/NF.groovy | 4 + .../main/groovy/nextflow/NextflowMeta.groovy | 24 +- .../src/main/groovy/nextflow/Session.groovy | 3 + .../nextflow/ast/NextflowDSLImpl.groovy | 114 +++- .../nextflow/extension/PublishOp.groovy | 161 ++++-- .../nextflow/processor/PublishDir.groovy | 23 +- .../nextflow/processor/TaskProcessor.groovy | 3 +- .../groovy/nextflow/script/BaseScript.groovy | 20 + .../nextflow/script/BaseScriptConsts.groovy | 2 +- .../groovy/nextflow/script/OutputDef.groovy | 47 ++ .../groovy/nextflow/script/OutputDsl.groovy | 254 +++++++++ .../nextflow/script/ProcessConfig.groovy | 19 + .../groovy/nextflow/script/ProcessDef.groovy | 13 +- .../nextflow/script/WorkflowBinding.groovy | 10 + .../groovy/nextflow/script/WorkflowDef.groovy | 27 +- .../groovy/nextflow/util/CsvWriter.groovy | 70 +++ .../nextflow/extension/PublishOpTest.groovy | 67 --- .../nextflow/processor/PublishDirTest.groovy | 24 +- .../nextflow/script/OutputDslTest.groovy | 85 +++ .../nextflow/script/WorkflowDefTest.groovy | 48 -- .../src/main/nextflow/util/CacheHelper.java | 402 +------------- .../src/main/nextflow/util/HashBuilder.java | 480 ++++++++++++++++ .../test/nextflow/util/CacheHelperTest.groovy | 98 ---- .../test/nextflow/util/HashBuilderTest.groovy | 130 +++++ tests/output-dsl.nf | 90 +++ 31 files changed, 1919 insertions(+), 839 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/script/OutputDef.groovy create mode 100644 modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy create mode 100644 modules/nextflow/src/main/groovy/nextflow/util/CsvWriter.groovy delete mode 100644 modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy create mode 100644 modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy create mode 100644 modules/nf-commons/src/main/nextflow/util/HashBuilder.java create mode 100644 modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy create mode 100644 tests/output-dsl.nf diff --git a/docs/channel.md b/docs/channel.md index 7d175c5fe1..5d615d1371 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -398,7 +398,7 @@ The `interval` method emits an incrementing index (starting from zero) at a peri Channel.interval('1s').view() ``` -The above snippet will emit 0, 1, 2, and so on, every second, forever. You can use an operator such as {ref}`operator-take`, {ref}`operator-timeout`, or {ref}`operator-until` to close the channel based on a stopping condition. +The above snippet will emit 0, 1, 2, and so on, every second, forever. You can use an operator such as {ref}`operator-take` or {ref}`operator-until` to close the channel based on a stopping condition. An optional closure can be used to transform the index. Additionally, returning `Channel.STOP` will close the channel. For example: diff --git a/docs/cli.md b/docs/cli.md index 928d739e3b..df93409d7d 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -1109,6 +1109,8 @@ Checking nextflow-io/hello ... checkout-out at AnyObjectId[1c3e9e7404127514d69369cd87f8036830f5cf64] - revision: 1c3e9e7404 [v1.1] ``` +(cli-run)= + ### run Execute a pipeline. diff --git a/docs/config.md b/docs/config.md index e96c5ff48f..309f2ca59b 100644 --- a/docs/config.md +++ b/docs/config.md @@ -2101,6 +2101,15 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview` - Nextflow will fail if multiple functions and/or processes with the same name are defined in a module script +`nextflow.preview.output` + +: :::{versionadded} 24.04.0 + ::: + +: *Experimental: may change in a future release.* + +: When `true`, enables the use of the {ref}`workflow output definition `. + `nextflow.preview.recursion` : :::{versionadded} 21.11.0-edge diff --git a/docs/operator.md b/docs/operator.md index 276e197d8b..6eab124a35 100644 --- a/docs/operator.md +++ b/docs/operator.md @@ -1472,6 +1472,8 @@ An optional {ref}`closure ` can be used to transform each item b :language: console ``` +(operator-take)= + ## take *Returns: queue channel* @@ -1675,6 +1677,8 @@ The difference between `unique` and `distinct` is that `unique` removes *all* du See also: [distinct](#distinct) +(operator-until)= + ## until *Returns: queue channel or value channel, matching the source type* diff --git a/docs/process.md b/docs/process.md index 7ef148af09..40be0a6135 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1008,7 +1008,7 @@ Some caveats on glob pattern behavior: Although the input files matching a glob output declaration are not included in the resulting output channel, these files may still be transferred from the task scratch directory to the original task work directory. Therefore, to avoid unnecessary file copies, avoid using loose wildcards when defining output files, e.g. `path '*'`. Instead, use a prefix or a suffix to restrict the set of matching files to only the expected ones, e.g. `path 'prefix_*.sorted.bam'`. ::: -Read more about glob syntax at the following link [What is a glob?][what is a glob?] +Read more about glob syntax at the following link [What is a glob?][glob] ### Dynamic output file names @@ -2791,4 +2791,3 @@ process foo { ``` [glob]: http://docs.oracle.com/javase/tutorial/essential/io/fileOps.html#glob -[what is a glob?]: http://docs.oracle.com/javase/tutorial/essential/io/fileOps.html#glob diff --git a/docs/workflow.md b/docs/workflow.md index 6bc45f1015..e9450db7fd 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -41,9 +41,115 @@ The `main:` label can be omitted if there are no `take:` or `emit:` blocks. Workflows were introduced in DSL2. If you are still using DSL1, see the {ref}`dsl1-page` page to learn how to migrate your Nextflow pipelines to DSL2. ::: +## Implicit workflow + +A script can define a single workflow without a name (also known as the *implicit workflow*), which is the default entrypoint of the script. The `-entry` command line option can be used to execute a different workflow as the entrypoint at runtime. + +:::{note} +Implicit workflow definitions are ignored when a script is included as a module. This way, a script can be written such that it can be either imported as a module or executed as a pipeline. +::: + +## Named workflows + +A named workflow is a workflow that can be invoked from other workflows. For example: + +```groovy +workflow my_pipeline { + foo() + bar( foo.out.collect() ) +} + +workflow { + my_pipeline() +} +``` + +The above snippet defines a workflow named `my_pipeline`, that can be invoked from another workflow as `my_pipeline()`, just like any other function or process. + +## Using variables and params + +A workflow can access any variable or parameter defined in the global scope: + +```groovy +params.data = '/some/data/file' + +workflow { + if( params.data ) + bar(params.data) + else + bar(foo()) +} +``` + +:::{tip} +The use of global variables and params in named workflows is discouraged because it breaks the modularity of the workflow. As a best practice, every workflow input should be explicitly defined as such in the `take:` block, and params should only be used in the implicit workflow. +::: + +## Workflow inputs (`take`) + +A workflow can declare one or more input channels using the `take` keyword. For example: + +```groovy +workflow my_pipeline { + take: + data1 + data2 + + main: + foo(data1, data2) + bar(foo.out) +} +``` + +:::{warning} +When the `take` keyword is used, the beginning of the workflow body must be defined with the `main` keyword. +::: + +Inputs can be specified like arguments when invoking the workflow: + +```groovy +workflow { + my_pipeline( channel.from('/some/data') ) +} +``` + +## Workflow outputs (`emit`) + +A workflow can declare one or more output channels using the `emit` keyword. For example: + +```groovy +workflow my_pipeline { + main: + foo(data) + bar(foo.out) + + emit: + bar.out +} +``` + +When invoking the workflow, the output channel(s) can be accessed using the `out` property, i.e. `my_pipeline.out`. When multiple output channels are declared, use the array bracket notation or the assignment syntax to access each output channel as described for [process outputs](#process-outputs). + +### Named outputs + +If an output channel is assigned to an identifier in the `emit` block, the identifier can be used to reference the channel from the calling workflow. For example: + +```groovy +workflow my_pipeline { + main: + foo(data) + bar(foo.out) + + emit: + my_data = bar.out +} +``` + +The result of the above workflow can be accessed using `my_pipeline.out.my_data`. + (workflow-process-invocation)= -## Process invocation +## Invoking processes A process can be invoked like a function in a workflow definition, passing the expected input channels like function arguments. For example: @@ -116,7 +222,7 @@ workflow { } ``` -### Process named outputs +#### Named outputs The `emit` option can be added to the process output definition to assign a name identifier. This name can be used to reference the channel from the calling workflow. For example: @@ -146,9 +252,10 @@ workflow { See {ref}`process outputs ` for more details. -### Process named stdout +#### Named stdout + +The `emit` option can also be used to name a `stdout` output. However, while process output options are usually prefixed with a comma, this is not the case for `stdout`. This is because `stdout` does not have an argument like other types. -The `emit` option can also be used to name a `stdout` output: ```groovy process sayHello { @@ -171,125 +278,7 @@ workflow { } ``` -:::{note} -Optional params for a process input/output are always prefixed with a comma, except for `stdout`. Because `stdout` does not have an associated name or value like other types, the first param should not be prefixed. -::: - -## Subworkflows - -A named workflow is a "subworkflow" that can be invoked from other workflows. For example: - -```groovy -workflow my_pipeline { - foo() - bar( foo.out.collect() ) -} - -workflow { - my_pipeline() -} -``` - -The above snippet defines a workflow named `my_pipeline`, that can be invoked from another workflow as `my_pipeline()`, just like any other function or process. - -### Workflow parameters - -A workflow component can access any variable or parameter defined in the global scope: - -```groovy -params.data = '/some/data/file' - -workflow my_pipeline { - if( params.data ) - bar(params.data) - else - bar(foo()) -} -``` - -### Workflow inputs - -A workflow can declare one or more input channels using the `take` keyword. For example: - -```groovy -workflow my_pipeline { - take: data - - main: - foo(data) - bar(foo.out) -} -``` - -Multiple inputs must be specified on separate lines: - -```groovy -workflow my_pipeline { - take: - data1 - data2 - - main: - foo(data1, data2) - bar(foo.out) -} -``` - -:::{warning} -When the `take` keyword is used, the beginning of the workflow body must be defined with the `main` keyword. -::: - -Inputs can be specified like arguments when invoking the workflow: - -```groovy -workflow { - my_pipeline( channel.from('/some/data') ) -} -``` - -### Workflow outputs - -A workflow can declare one or more output channels using the `emit` keyword. For example: - -```groovy -workflow my_pipeline { - main: - foo(data) - bar(foo.out) - - emit: - bar.out -} -``` - -When invoking the workflow, the output channel(s) can be accessed using the `out` property, i.e. `my_pipeline.out`. When multiple output channels are declared, use the array bracket notation or the assignment syntax to access each output channel as described for [process outputs](#process-outputs). - -### Workflow named outputs - -If an output channel is assigned to an identifier in the `emit` block, the identifier can be used to reference the channel from the calling workflow. For example: - -```groovy -workflow my_pipeline { - main: - foo(data) - bar(foo.out) - - emit: - my_data = bar.out -} -``` - -The result of the above workflow can be accessed using `my_pipeline.out.my_data`. - -### Workflow entrypoint - -A workflow with no name (also known as the *implicit workflow*) is the default entrypoint of the Nextflow pipeline. A different workflow entrypoint can be specified using the `-entry` command line option. - -:::{note} -Implicit workflow definitions are ignored when a script is included as a module. This way, a workflow script can be written in such a way that it can be used either as a library module or an application script. -::: - -### Workflow composition +## Invoking workflows Named workflows can be invoked and composed just like any other process or function. @@ -403,3 +392,288 @@ workflow { ``` In the above snippet, the initial channel is piped to the {ref}`operator-map` operator, which reverses the string value. Then, the result is passed to the processes `foo` and `bar`, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the {ref}`operator-mix` operator. Finally, the result is printed using the {ref}`operator-view` operator. + +(workflow-output-def)= + +## Publishing outputs + +:::{versionadded} 24.04.0 +::: + +:::{note} +This feature requires the `nextflow.preview.output` feature flag to be enabled. +::: + +A script may define the set of outputs that should be published by the implicit workflow, known as the workflow output definition: + +```groovy +workflow { + foo(bar()) +} + +output { + directory 'results' +} +``` + +The output definition must be defined after the implicit workflow. + +### Publishing channels + +Processes and workflows can each define a `publish` section which maps channels to publish targets. For example: + +```groovy +process foo { + // ... + + output: + path 'result.txt', emit: results + + publish: + results >> 'foo/' + + // ... +} + +workflow foobar { + main: + foo(data) + bar(foo.out) + + publish: + foo.out >> 'foobar/foo/' + + emit: + bar.out +} +``` + +In the above example, the output `results` of process `foo` is published to the target `foo/` by default. However, when the workflow `foobar` invokes process `foo`, it publishes `foo.out` (i.e. `foo.out.results`) to the target `foobar/foo/`, overriding the default target defined by `foo`. + +In a process, any output with an `emit` name can be published. In a workflow, any channel defined in the workflow, including process and subworkflow outputs, can be published. + +:::{note} +If the publish source is a process/workflow output (e.g. `foo.out`) with multiple channels, each channel will be published. Individual output channels can also be published by index or name (e.g. `foo.out[0]` or `foo.out.results`). +::: + +As shown in the example, workflows can override the publish targets of process and subworkflow outputs. This way, each process and workflow can define some sensible defaults for publishing, which can be overridden by calling workflows as needed. + +By default, all files emitted by the channel will be published into the specified directory. If a channel emits list values, any files in the list (including nested lists) will also be published. For example: + +```groovy +workflow { + ch_samples = Channel.of( + [ [id: 'sample1'], file('sample1.txt') ] + ) + + publish: + ch_samples >> 'samples/' // sample1.txt will be published +} +``` + +### Publish directory + +The `directory` statement is used to set the top-level publish directory of the workflow: + +```groovy +output { + directory 'results' + + // ... +} +``` + +It is optional, and it defaults to the launch directory (`workflow.launchDir`). Published files will be saved within this directory. + +### Publish targets + +A publish target is a name with a specific publish configuration. By default, when a channel is published to a target in the `publish:` section of a process or workflow, the target name is used as the publish path. + +For example, given the following output definition: + +```groovy +workflow { + ch_foo = foo() + ch_bar = bar(ch_foo) + + publish: + ch_foo >> 'foo/' + ch_bar >> 'bar/' +} + +output { + directory 'results' +} +``` + +The following directory structure will be created: + +``` +results/ +└── foo/ + └── ... +└── bar/ + └── ... +``` + +:::{note} +The trailing slash in the target name is not required; it is only used to denote that the target name is intended to be used as the publish path. +::: + +:::{warning} +The target name must not begin with a slash (`/`), it should be a relative path name. +::: + +Workflows can also disable publishing for specific channels by redirecting them to `null`: + +```groovy +workflow { + ch_foo = foo() + + publish: + ch_foo >> (params.save_foo ? 'foo/' : null) +} +``` + +Publish targets can be customized in the output definition using a set of options similar to the {ref}`process-publishdir` directive. + +For example: + +```groovy +output { + directory 'results' + mode 'copy' + + 'foo/' { + mode 'link' + } +} +``` + +In this example, all files will be copied by default, and files published to `foo/` will be hard-linked, overriding the default option. + +Available options: + +`contentType` +: *Currently only supported for S3.* +: Specify the media type a.k.a. [MIME type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_Types) of published files (default: `false`). Can be a string (e.g. `'text/html'`), or `true` to infer the content type from the file extension. + +`ignoreErrors` +: When `true`, the workflow will not fail if a file can't be published for some reason (default: `false`). + +`mode` +: The file publishing method (default: `'symlink'`). The following options are available: + + `'copy'` + : Copy each file into the output directory. + + `'copyNoFollow'` + : Copy each file into the output directory without following symlinks, i.e. only the link is copied. + + `'link'` + : Create a hard link in the output directory for each file. + + `'move'` + : Move each file into the output directory. + : Should only be used for files which are not used by downstream processes in the workflow. + + `'rellink'` + : Create a relative symbolic link in the output directory for each file. + + `'symlink'` + : Create an absolute symbolic link in the output directory for each output file. + +`overwrite` +: When `true` any existing file in the specified folder will be overwritten (default: `'standard'`). The following options are available: + + `false` + : Never overwrite existing files. + + `true` + : Always overwrite existing files. + + `'deep'` + : Overwrite existing files when the file content is different. + + `'lenient'` + : Overwrite existing files when the file size is different. + + `'standard'` + : Overwrite existing files when the file size or last modified timestamp is different. + +`path` +: Specify the publish path relative to the output directory (default: the target name). Can only be specified within a target definition. + +`storageClass` +: *Currently only supported for S3.* +: Specify the storage class for published files. + +`tags` +: *Currently only supported for S3.* +: Specify arbitrary tags for published files. For example: + ```groovy + tags FOO: 'hello', BAR: 'world' + ``` + +### Index files + +A publish target can create an index file of the values that were published. An index file is a useful way to save the metadata associated with files, and is more flexible than encoding metadata in the file path. Currently only CSV files are supported. + +For example: + +```groovy +workflow { + ch_foo = Channel.of( + [id: 1, name: 'foo 1'], + [id: 2, name: 'foo 2'], + [id: 3, name: 'foo 3'] + ) + + publish: + ch_foo >> 'foo/' +} + +output { + directory 'results' + + 'foo/' { + index { + path 'index.csv' + } + } +} +``` + +The above example will write the following CSV file to `results/foo/index.csv`: + +```csv +"id","name" +"1","foo 1" +"2","foo 2" +"3","foo 3" +``` + +You can customize the index file by specifying options in a block, for example: + +```groovy +index { + path 'index.csv' + header ['name', 'extra_option'] + sep '\t' + mapper { val -> val + [extra_option: 'bar'] } +} +``` + +The following options are available: + +`header` +: When `true`, the keys of the first record are used as the column names (default: `false`). Can also be a list of column names. + +`mapper` +: Closure which defines how to transform each published value into a CSV record. The closure should return a list or map. By default, no transformation is applied. + +`path` +: The name of the index file relative to the target path (required). + +`sep` +: The character used to separate values (default: `','`). diff --git a/modules/nextflow/src/main/groovy/nextflow/NF.groovy b/modules/nextflow/src/main/groovy/nextflow/NF.groovy index 86b24b0144..4767e41d56 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NF.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NF.groovy @@ -64,6 +64,10 @@ class NF { NextflowMeta.instance.isStrictModeEnabled() } + static boolean isOutputDefinitionEnabled() { + NextflowMeta.instance.preview.output + } + static boolean isRecurseEnabled() { NextflowMeta.instance.preview.recursion } diff --git a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy index bb836ab61a..c45d8cecfe 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy @@ -37,14 +37,15 @@ class NextflowMeta { abstract boolean strict } - @Deprecated @Slf4j static class Preview implements Flags { - volatile float dsl - boolean strict + @Deprecated volatile float dsl + @Deprecated boolean strict + boolean output boolean recursion boolean topic + @Deprecated void setDsl( float num ) { if( num == 1 ) throw new IllegalArgumentException(DSL1_EOL_MESSAGE) @@ -55,16 +56,22 @@ class NextflowMeta { dsl = num } - void setRecursion(Boolean recurse) { - if( recurse ) + void setOutput(Boolean output) { + if( output ) + log.warn "WORKFLOW OUTPUT DSL IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" + this.output = output + } + + void setRecursion(Boolean recursion) { + if( recursion ) log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" - this.recursion = recurse + this.recursion = recursion } - void setTopic(Boolean value) { + void setTopic(Boolean topic) { if( topic ) log.warn "CHANNEL TOPICS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" - this.topic = value + this.topic = topic } } @@ -81,7 +88,6 @@ class NextflowMeta { */ final String timestamp - @Deprecated final Preview preview = new Preview() final Features enable = new Features() diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index fc52baf840..bb4f6651fc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -31,6 +31,7 @@ import groovy.transform.Memoized import groovy.transform.PackageScope import groovy.util.logging.Slf4j import groovyx.gpars.GParsConfig +import groovyx.gpars.dataflow.DataflowWriteChannel import groovyx.gpars.dataflow.operator.DataflowProcessor import nextflow.cache.CacheDB import nextflow.cache.CacheFactory @@ -93,6 +94,8 @@ class Session implements ISession { final List igniters = new ArrayList<>(20) + final Map publishTargets = [:] + /** * Creates process executors */ diff --git a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy index 739c554f2d..b3c16b4af7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy @@ -82,7 +82,8 @@ class NextflowDSLImpl implements ASTTransformation { final static private String WORKFLOW_TAKE = 'take' final static private String WORKFLOW_EMIT = 'emit' final static private String WORKFLOW_MAIN = 'main' - final static private List SCOPES = [WORKFLOW_TAKE, WORKFLOW_EMIT, WORKFLOW_MAIN] + final static private String WORKFLOW_PUBLISH = 'publish' + final static private List SCOPES = [WORKFLOW_TAKE, WORKFLOW_EMIT, WORKFLOW_MAIN, WORKFLOW_PUBLISH] final static public String PROCESS_WHEN = 'when' final static public String PROCESS_STUB = 'stub' @@ -172,11 +173,17 @@ class NextflowDSLImpl implements ASTTransformation { currentTaskName = null } } + else if( methodName == 'workflow' && preCondition ) { convertWorkflowDef(methodCall,sourceUnit) super.visitMethodCallExpression(methodCall) } + else if( methodName == 'output' && preCondition ) { + convertOutputDef(methodCall,sourceUnit) + super.visitMethodCallExpression(methodCall) + } + // just apply the default behavior else { super.visitMethodCallExpression(methodCall) @@ -423,6 +430,21 @@ class NextflowDSLImpl implements ASTTransformation { return result } + protected Statement normWorkflowPublish(ExpressionStatement stm) { + if( stm.expression !instanceof BinaryExpression ) { + syntaxError(stm, "Invalid workflow publish statement") + return stm + } + + final binaryX = (BinaryExpression)stm.expression + if( binaryX.operation.type != Types.RIGHT_SHIFT ) { + syntaxError(stm, "Invalid workflow publish statement") + return stm + } + + return stmt( callThisX('_publish_target', args(binaryX.leftExpression, binaryX.rightExpression)) ) + } + protected Expression makeWorkflowDefWrapper( ClosureExpression closure, boolean anonymous ) { final codeBlock = (BlockStatement) closure.code @@ -462,6 +484,14 @@ class NextflowDSLImpl implements ASTTransformation { body.add(stm) break + case WORKFLOW_PUBLISH: + if( !(stm instanceof ExpressionStatement) ) { + syntaxError(stm, "Invalid workflow publish statement") + break + } + body.add(normWorkflowPublish(stm as ExpressionStatement)) + break + default: if( context ) { def opts = SCOPES.closest(context) @@ -488,6 +518,62 @@ class NextflowDSLImpl implements ASTTransformation { unit.addError( new SyntaxException(message,line,coln)) } + /** + * Transform targets in the workflow output definition: + * + * output { + * 'foo' { ... } + * } + * + * becomes: + * + * output { + * target('foo') { ... } + * } + * + * @param methodCall + * @param unit + */ + protected void convertOutputDef(MethodCallExpression methodCall, SourceUnit unit) { + log.trace "Convert 'output' ${methodCall.arguments}" + + assert methodCall.arguments instanceof ArgumentListExpression + final arguments = (ArgumentListExpression)methodCall.arguments + + if( arguments.size() != 1 || arguments[0] !instanceof ClosureExpression ) { + syntaxError(methodCall, "Invalid output definition") + return + } + + final closure = (ClosureExpression)arguments[0] + final block = (BlockStatement)closure.code + for( Statement stmt : block.statements ) { + if( stmt !instanceof ExpressionStatement ) { + syntaxError(stmt, "Invalid publish target definition") + return + } + + final stmtExpr = (ExpressionStatement)stmt + if( stmtExpr.expression !instanceof MethodCallExpression ) { + syntaxError(stmt, "Invalid publish target definition") + return + } + + final call = (MethodCallExpression)stmtExpr.expression + assert call.arguments instanceof ArgumentListExpression + + // HACK: target definition is a method call with single closure argument + // custom parser will be able to detect more elegantly + final targetArgs = (ArgumentListExpression)call.arguments + if( targetArgs.size() != 1 || targetArgs[0] !instanceof ClosureExpression ) + continue + + final targetName = call.method + final targetBody = (ClosureExpression)targetArgs[0] + stmtExpr.expression = callThisX('target', args(targetName, targetBody)) + } + } + /** * Transform a DSL `process` definition into a proper method invocation * @@ -547,6 +633,11 @@ class NextflowDSLImpl implements ASTTransformation { } break + case 'publish': + if( stm instanceof ExpressionStatement ) + convertPublishMethod( stm ) + break + case 'exec': bodyLabel = currentLabel iterator.remove() @@ -1208,6 +1299,27 @@ class NextflowDSLImpl implements ASTTransformation { return false } + protected void convertPublishMethod(ExpressionStatement stmt) { + if( stmt.expression !instanceof BinaryExpression ) { + syntaxError(stmt, "Invalid process publish statement") + return + } + + final binaryX = (BinaryExpression)stmt.expression + if( binaryX.operation.type != Types.RIGHT_SHIFT ) { + syntaxError(stmt, "Invalid process publish statement") + return + } + + final left = binaryX.leftExpression + if( left !instanceof VariableExpression ) { + syntaxError(stmt, "Invalid process publish statement") + return + } + + stmt.expression = callThisX('_publish_target', args(constX(((VariableExpression)left).name), binaryX.rightExpression)) + } + protected boolean isIllegalName(String name, ASTNode node) { if( name in RESERVED_NAMES ) { unit.addError( new SyntaxException("Identifier `$name` is reserved for internal use", node.lineNumber, node.columnNumber+8) ) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index aa4bda7c17..5e1e7ae181 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -24,7 +24,9 @@ import groovyx.gpars.dataflow.DataflowReadChannel import nextflow.Global import nextflow.Session import nextflow.processor.PublishDir +import nextflow.util.CsvWriter /** + * Publish files from a source channel. * * @author Paolo Di Tommaso */ @@ -34,11 +36,13 @@ class PublishOp { private DataflowReadChannel source - private Map opts - private PublishDir publisher - private Path sourceDir + private Path targetDir + + private IndexOpts indexOpts + + private List indexRecords = [] private volatile boolean complete @@ -46,80 +50,151 @@ class PublishOp { PublishOp(DataflowReadChannel source, Map opts) { this.source = source - this.opts = opts ? new LinkedHashMap(opts) : Collections.emptyMap() - - // adapt `to` option - if( this.opts.containsKey('to') ) { - this.opts.path = this.opts.to - this.opts.remove('to') - } - - this.publisher = PublishDir.create(this.opts) + this.publisher = PublishDir.create(opts) + this.targetDir = opts.path as Path + if( opts.index ) + this.indexOpts = new IndexOpts(targetDir, opts.index as Map) } - protected boolean getComplete() { complete } + boolean getComplete() { complete } PublishOp apply() { final events = new HashMap(2) - events.onNext = this.&publish0 - events.onComplete = this.&done0 + events.onNext = this.&onNext + events.onComplete = this.&onComplete DataflowHelper.subscribeImpl(source, events) return this } - protected void publish0(entry) { - log.debug "Publish operator got: $entry" - sourceDir = null - // use a set to avoid duplicates - final result = new HashSet(10) - collectFiles(entry, result) - publisher.apply(result, sourceDir) + protected void onNext(value) { + log.trace "Publish operator received: $value" + final result = collectFiles([:], value) + for( final entry : result ) { + final sourceDir = entry.key + final files = entry.value + publisher.apply(files, sourceDir) + } + + if( indexOpts ) { + final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value + final normalized = normalizePaths(record) + log.trace "Normalized record for index file: ${normalized}" + indexRecords << normalized + } } - protected void done0(nope) { - log.debug "Publish operator complete" + protected void onComplete(nope) { + if( indexOpts && indexRecords.size() > 0 ) { + log.trace "Saving records to index file: ${indexRecords}" + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path) + session.notifyFilePublish(indexOpts.path) + } + + log.trace "Publish operator complete" this.complete = true } - protected void collectFiles(entry, Collection result) { - if( entry instanceof Path ) { - result.add(entry) - if( sourceDir == null ) - sourceDir = getTaskDir(entry) + /** + * Extract files from a received value for publishing. + * + * @param result + * @param value + */ + protected Map> collectFiles(Map> result, value) { + if( value instanceof Path ) { + final sourceDir = getTaskDir(value) + if( sourceDir !in result ) + result[sourceDir] = new HashSet(10) + result[sourceDir] << value + } + else if( value instanceof Collection ) { + for( final el : value ) + collectFiles(result, el) } - else if( entry instanceof List ) { - for( def x : entry ) { - collectFiles(x, result) + return result + } + + /** + * Normalize the paths in a record by converting + * work directory paths to publish paths. + * + * @param value + */ + protected Object normalizePaths(value) { + if( value instanceof Path ) { + return List.of(value.getBaseName(), normalizePath(value)) + } + + if( value instanceof Collection ) { + return value.collect { el -> + if( el instanceof Path ) + return normalizePath(el) + if( el instanceof Collection ) + return normalizePaths(el) + return el + } + } + + if( value instanceof Map ) { + return value.collectEntries { k, v -> + if( v instanceof Path ) + return List.of(k, normalizePath(v)) + if( v instanceof Collection ) + return List.of(k, normalizePaths(v)) + return List.of(k, v) } } + + throw new IllegalArgumentException("Index file record must be a list, map, or file: ${value} [${value.class.simpleName}]") + } + + private Path normalizePath(Path path) { + final sourceDir = getTaskDir(path) + return targetDir.resolve(sourceDir.relativize(path)) } /** - * Given a path try to infer the task directory to which the path below - * ie. the directory starting with a workflow work dir and having at lest - * two sub-directories eg work-dir/xx/yyyyyy/etc + * Try to infer the parent task directory to which a path belongs. It + * should be a directory starting with a session work dir and having + * at lest two sub-directories, e.g. work/ab/cdef/etc * * @param path - * @return */ protected Path getTaskDir(Path path) { - if( path==null ) + if( path == null ) return null - def result = getTaskDir0(path, session.workDir) - if( result == null ) - result = getTaskDir0(path, session.bucketDir) - return result + return getTaskDir0(path, session.workDir.resolve('tmp')) + ?: getTaskDir0(path, session.workDir) + ?: getTaskDir0(path, session.bucketDir) } private Path getTaskDir0(Path file, Path base) { - if( base==null ) + if( base == null ) return null if( base.fileSystem != file.fileSystem ) return null final len = base.nameCount - if( file.startsWith(base) && file.getNameCount()>len+2 ) + if( file.startsWith(base) && file.getNameCount() > len+2 ) return base.resolve(file.subpath(len,len+2)) return null } + static class IndexOpts { + Path path + Closure mapper + def /* boolean | List */ header = false + String sep = ',' + + IndexOpts(Path targetDir, Map opts) { + this.path = targetDir.resolve(opts.path as String) + + if( opts.mapper ) + this.mapper = opts.mapper as Closure + if( opts.header != null ) + this.header = opts.header + if( opts.sep ) + this.sep = opts.sep as String + } + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 6c7aed3735..354e42ab9a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -46,7 +46,11 @@ import nextflow.extension.FilesEx import nextflow.file.FileHelper import nextflow.file.TagAwareFile import nextflow.fusion.FusionHelper +import nextflow.util.HashBuilder import nextflow.util.PathTrie + +import static nextflow.util.CacheHelper.HashMode + /** * Implements the {@code publishDir} directory. It create links or copies the output * files of a given task to a user specified directory. @@ -73,9 +77,9 @@ class PublishDir { Path path /** - * Whenever overwrite existing files + * Whether to overwrite existing files */ - Boolean overwrite + def /* Boolean | String */ overwrite /** * The publish {@link Mode} @@ -199,7 +203,7 @@ class PublishDir { result.pattern = params.pattern if( params.overwrite != null ) - result.overwrite = Boolean.parseBoolean(params.overwrite.toString()) + result.overwrite = params.overwrite if( params.saveAs ) result.saveAs = (Closure) params.saveAs @@ -427,7 +431,7 @@ class PublishDir { if( !sameRealPath && checkSourcePathConflicts(destination)) return - if( !sameRealPath && overwrite ) { + if( !sameRealPath && shouldOverwrite(source, destination) ) { FileHelper.deletePath(destination) processFileImpl(source, destination) } @@ -511,6 +515,17 @@ class PublishDir { return !mode || mode == Mode.SYMLINK || mode == Mode.RELLINK } + protected boolean shouldOverwrite(Path source, Path target) { + if( overwrite instanceof Boolean ) + return overwrite + + final hashMode = HashMode.of(overwrite) ?: HashMode.DEFAULT() + final sourceHash = HashBuilder.hashPath(source, source.parent, hashMode) + final targetHash = HashBuilder.hashPath(target, target.parent, hashMode) + log.trace "comparing source and target with mode=${overwrite}, source=${sourceHash}, target=${targetHash}, should overwrite=${sourceHash != targetHash}" + return sourceHash != targetHash + } + protected void processFileImpl( Path source, Path destination ) { log.trace "publishing file: $source -[$mode]-> $destination" diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 302955617a..19f4f1888f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -109,6 +109,7 @@ import nextflow.util.ArrayBag import nextflow.util.BlankSeparatedList import nextflow.util.CacheHelper import nextflow.util.Escape +import nextflow.util.HashBuilder import nextflow.util.LockManager import nextflow.util.LoggerHelper import nextflow.util.TestOnly @@ -803,7 +804,7 @@ class TaskProcessor { int tries = task.failCount +1 while( true ) { - hash = CacheHelper.defaultHasher().newHasher().putBytes(hash.asBytes()).putInt(tries).hash() + hash = HashBuilder.defaultHasher().putBytes(hash.asBytes()).putInt(tries).hash() Path resumeDir = null boolean exists = false diff --git a/modules/nextflow/src/main/groovy/nextflow/script/BaseScript.groovy b/modules/nextflow/src/main/groovy/nextflow/script/BaseScript.groovy index d7421d3345..5ec823b5a8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/BaseScript.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/BaseScript.groovy @@ -20,6 +20,7 @@ import java.lang.reflect.InvocationTargetException import java.nio.file.Paths import groovy.util.logging.Slf4j +import nextflow.NF import nextflow.NextflowMeta import nextflow.Session import nextflow.exception.AbortOperationException @@ -41,6 +42,8 @@ abstract class BaseScript extends Script implements ExecutionContext { private WorkflowDef entryFlow + private OutputDef publisher + @Lazy InputStream stdin = { System.in }() BaseScript() { @@ -57,6 +60,10 @@ abstract class BaseScript extends Script implements ExecutionContext { (ScriptBinding)super.getBinding() } + Session getSession() { + session + } + /** * Holds the configuration object which will used to execution the user tasks */ @@ -116,6 +123,17 @@ abstract class BaseScript extends Script implements ExecutionContext { meta.addDefinition(workflow) } + protected output(Closure closure) { + if( !NF.outputDefinitionEnabled ) + throw new IllegalStateException("Workflow output definition requires the `nextflow.preview.output` feature flag") + if( !entryFlow ) + throw new IllegalStateException("Workflow output definition must be defined after the anonymous workflow") + if( ExecutionStack.withinWorkflow() ) + throw new IllegalStateException("Workflow output definition is not allowed within a workflow") + + publisher = new OutputDef(closure) + } + protected IncludeDef include( IncludeDef include ) { if(ExecutionStack.withinWorkflow()) throw new IllegalStateException("Include statement is not allowed within a workflow definition") @@ -178,6 +196,8 @@ abstract class BaseScript extends Script implements ExecutionContext { // invoke the entry workflow session.notifyBeforeWorkflowExecution() final ret = entryFlow.invoke_a(BaseScriptConsts.EMPTY_ARGS) + if( publisher ) + publisher.run(session.publishTargets) session.notifyAfterWorkflowExecution() return ret } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/BaseScriptConsts.groovy b/modules/nextflow/src/main/groovy/nextflow/script/BaseScriptConsts.groovy index 3de58346dc..6e69964e9f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/BaseScriptConsts.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/BaseScriptConsts.groovy @@ -25,5 +25,5 @@ class BaseScriptConsts { public static Object[] EMPTY_ARGS = [] as Object[] - public static List PRIVATE_NAMES = ['session','processFactory','taskProcessor','meta','entryFlow'] + public static List PRIVATE_NAMES = ['session','processFactory','taskProcessor','meta','entryFlow', 'publisher'] } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/OutputDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/OutputDef.groovy new file mode 100644 index 0000000000..73de6e16ee --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/script/OutputDef.groovy @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.script + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowWriteChannel +/** + * Models the workflow output definition + * + * @author Ben Sherman + */ +@Slf4j +@CompileStatic +class OutputDef { + + private Closure closure + + OutputDef(Closure closure) { + this.closure = closure + } + + void run(Map targets) { + final dsl = new OutputDsl() + final cl = (Closure)closure.clone() + cl.setDelegate(dsl) + cl.setResolveStrategy(Closure.DELEGATE_FIRST) + cl.call() + + dsl.build(targets) + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy new file mode 100644 index 0000000000..5a13db1539 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy @@ -0,0 +1,254 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.script + +import java.nio.file.Path +import java.nio.file.Paths + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.exception.ScriptRuntimeException +import nextflow.extension.CH +import nextflow.extension.MixOp +import nextflow.extension.PublishOp +import nextflow.file.FileHelper + +/** + * Implements the DSL for publishing workflow outputs + * + * @author Ben Sherman + */ +@Slf4j +@CompileStatic +class OutputDsl { + + private Map publishConfigs = [:] + + private Path directory + + private Map defaults = [:] + + private volatile List ops = [] + + void directory(String directory) { + if( this.directory ) + throw new ScriptRuntimeException("Publish directory cannot be defined more than once in the workflow publish definition") + this.directory = FileHelper.toCanonicalPath(directory) + } + + void contentType(String value) { + setDefault('contentType', value) + } + + void contentType(boolean value) { + setDefault('contentType', value) + } + + void ignoreErrors(boolean value) { + setDefault('ignoreErrors', value) + } + + void mode(String value) { + setDefault('mode', value) + } + + void overwrite(boolean value) { + setDefault('overwrite', value) + } + + void overwrite(String value) { + setDefault('overwrite', value) + } + + void storageClass(String value) { + setDefault('storageClass', value) + } + + void tags(Map value) { + setDefault('tags', value) + } + + private void setDefault(String name, Object value) { + if( defaults.containsKey(name) ) + throw new ScriptRuntimeException("Default `${name}` option cannot be defined more than once in the workflow publish definition") + defaults[name] = value + } + + void target(String name, Closure closure) { + if( publishConfigs.containsKey(name) ) + throw new ScriptRuntimeException("Target '${name}' is defined more than once in the workflow publish definition") + + final dsl = new TargetDsl() + final cl = (Closure)closure.clone() + cl.setResolveStrategy(Closure.DELEGATE_FIRST) + cl.setDelegate(dsl) + cl.call() + + publishConfigs[name] = dsl.getOptions() + } + + void build(Map targets) { + // construct mapping of target name -> source channels + final Map> publishSources = [:] + for( final source : targets.keySet() ) { + final name = targets[source] + if( !name ) + continue + if( name !in publishSources ) + publishSources[name] = [] + publishSources[name] << source + } + + // create publish op (and optional index op) for each target + for( final name : publishSources.keySet() ) { + final sources = publishSources[name] + final mixed = sources.size() > 1 + ? new MixOp(sources.collect( ch -> CH.getReadChannel(ch) )).apply() + : sources.first() + final opts = publishOptions(name, publishConfigs[name] ?: [:]) + + ops << new PublishOp(CH.getReadChannel(mixed), opts).apply() + } + } + + private Map publishOptions(String name, Map overrides) { + if( !directory ) + directory = FileHelper.toCanonicalPath('.') + + final opts = defaults + overrides + if( opts.containsKey('ignoreErrors') ) + opts.failOnError = !opts.remove('ignoreErrors') + if( !opts.containsKey('overwrite') ) + opts.overwrite = 'standard' + + final path = opts.path as String ?: name + if( path.startsWith('/') ) + throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should be a relative path") + opts.path = directory.resolve(path) + + if( opts.index && !(opts.index as Map).path ) + throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option") + + return opts + } + + boolean getComplete() { + for( final op : ops ) + if( !op.complete ) + return false + return true + } + + static class TargetDsl { + + private Map opts = [:] + + void contentType(String value) { + setOption('contentType', value) + } + + void contentType(boolean value) { + setOption('contentType', value) + } + + void ignoreErrors(boolean value) { + setOption('ignoreErrors', value) + } + + void index(Closure closure) { + final dsl = new IndexDsl() + final cl = (Closure)closure.clone() + cl.setResolveStrategy(Closure.DELEGATE_FIRST) + cl.setDelegate(dsl) + cl.call() + setOption('index', dsl.getOptions()) + } + + void mode(String value) { + setOption('mode', value) + } + + void overwrite(boolean value) { + setOption('overwrite', value) + } + + void overwrite(String value) { + setOption('overwrite', value) + } + + void path(String value) { + setOption('path', value) + } + + void storageClass(String value) { + setOption('storageClass', value) + } + + void tags(Map value) { + setOption('tags', value) + } + + private void setOption(String name, Object value) { + if( opts.containsKey(name) ) + throw new ScriptRuntimeException("Publish option `${name}` cannot be defined more than once for a given target") + opts[name] = value + } + + Map getOptions() { + opts + } + + } + + static class IndexDsl { + + private Map opts = [:] + + void header(boolean value) { + setOption('header', value) + } + + void header(List value) { + setOption('header', value) + } + + void mapper(Closure value) { + setOption('mapper', value) + } + + void path(String value) { + setOption('path', value) + } + + void sep(String value) { + setOption('sep', value) + } + + private void setOption(String name, Object value) { + if( opts.containsKey(name) ) + throw new ScriptRuntimeException("Index option `${name}` cannot be defined more than once for a given index definition") + opts[name] = value + } + + Map getOptions() { + opts + } + + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy index df7ce27ec6..a17e80eb5a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy @@ -164,6 +164,11 @@ class ProcessConfig implements Map, Cloneable { */ private outputs = new OutputsList() + /** + * Map of default publish targets + */ + private Map publishTargets = [:] + /** * Initialize the taskConfig object with the defaults values * @@ -514,6 +519,13 @@ class ProcessConfig implements Map, Cloneable { outputs } + /** + * Typed shortcut to {@code #publishTargets} + */ + Map getPublishTargets() { + publishTargets + } + /** * Implements the process {@code debug} directive. */ @@ -651,6 +663,13 @@ class ProcessConfig implements Map, Cloneable { result } + void _publish_target(String emit, String name) { + final emitNames = outputs.collect { param -> param.channelEmitName } + if( emit !in emitNames ) + throw new IllegalArgumentException("Invalid emit name '${emit}' in publish statement, valid emits are: ${emitNames.join(', ')}") + publishTargets[emit] = name + } + /** * Defines a special *dummy* input parameter, when no inputs are * provided by the user for the current task diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index 3c54f0e426..f7e59b371e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -18,6 +18,7 @@ package nextflow.script import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Const import nextflow.Global import nextflow.Session @@ -206,7 +207,15 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { } // make a copy of the output list because execution can change it - final copyOuts = declaredOutputs.clone() + output = new ChannelOut(declaredOutputs.clone()) + + // register process publish targets + for( final entry : processConfig.getPublishTargets() ) { + final emit = entry.key + final name = entry.value + final source = (DataflowWriteChannel)output.getProperty(emit) + session.publishTargets[source] = name + } // create the executor final executor = session @@ -221,7 +230,7 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { // the result channels assert declaredOutputs.size()>0, "Process output should contains at least one channel" - return output = new ChannelOut(copyOuts) + return output } } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowBinding.groovy b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowBinding.groovy index 9b45ee6c59..105d67a246 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowBinding.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowBinding.groovy @@ -19,6 +19,7 @@ package nextflow.script import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.NF import nextflow.exception.IllegalInvocationException import nextflow.extension.OpCall @@ -155,4 +156,13 @@ class WorkflowBinding extends Binding { } } + void _publish_target(DataflowWriteChannel source, String name) { + owner.session.publishTargets[source] = name + } + + void _publish_target(ChannelOut out, String name) { + for( final ch : out ) + _publish_target(ch, name) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowDef.groovy index b540a53451..7a74cc4fff 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowDef.groovy @@ -57,7 +57,7 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec this.name = name // invoke the body resolving in/out params final copy = (Closure)rawBody.clone() - final resolver = new WorkflowParamsResolver() + final resolver = new WorkflowParamsDsl() copy.setResolveStrategy(Closure.DELEGATE_FIRST) copy.setDelegate(resolver) this.body = copy.call() @@ -199,7 +199,7 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec collectInputs(binding, args) // invoke the workflow execution final closure = body.closure - closure.delegate = binding + closure.setDelegate(binding) closure.setResolveStrategy(Closure.DELEGATE_FIRST) closure.call() // collect the workflow outputs @@ -210,11 +210,11 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec } /** - * Hold workflow parameters + * Implements the DSL for defining workflow takes and emits */ @Slf4j @CompileStatic -class WorkflowParamsResolver { +class WorkflowParamsDsl { static final private String TAKE_PREFIX = '_take_' static final private String EMIT_PREFIX = '_emit_' @@ -234,23 +234,4 @@ class WorkflowParamsResolver { else throw new MissingMethodException(name, WorkflowDef, args) } - - private Map argsToMap(Object args) { - if( args && args.getClass().isArray() ) { - if( ((Object[])args)[0] instanceof Map ) { - def map = (Map)((Object[])args)[0] - return new HashMap(map) - } - } - Collections.emptyMap() - } - - private Map argToPublishOpts(Object args) { - final opts = argsToMap(args) - if( opts.containsKey('saveAs')) { - log.warn "Workflow publish does not support `saveAs` option" - opts.remove('saveAs') - } - return opts - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/CsvWriter.groovy b/modules/nextflow/src/main/groovy/nextflow/util/CsvWriter.groovy new file mode 100644 index 0000000000..b2d4736bbc --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/util/CsvWriter.groovy @@ -0,0 +1,70 @@ +/* + * Copyright 2024, Ben Sherman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.util + +import java.nio.file.Path + +import groovy.transform.CompileStatic + +@CompileStatic +class CsvWriter { + + private /* boolean | List */ header = false + + private String sep = ',' + + CsvWriter(Map opts) { + if( opts.header ) + this.header = opts.header + + if( opts.sep ) + this.sep = opts.sep.toString() + } + + void apply(List records, Path path) { + Collection columns + if( header == true ) { + final first = records.first() + if( first !instanceof Map ) + throw new IllegalArgumentException('Records must be map objects when header=true') + columns = ((Map)first).keySet() + } + else if( header instanceof List ) { + columns = header + } + + path.delete() + + if( columns ) + path << columns.collect(it -> '"' + it + '"').join(sep) << '\n' + + for( final record : records ) { + Collection values + if( record instanceof List ) + values = record + else if( record instanceof Map ) + values = columns + ? record.subMap(columns).values() + : record.values() + else + throw new IllegalArgumentException('Records must be list or map objects') + + path << values.collect(it -> '"' + it + '"').join(sep) << '\n' + } + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy deleted file mode 100644 index 4923606887..0000000000 --- a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2013-2024, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.extension - -import java.nio.file.Files -import java.util.concurrent.TimeoutException - -import groovyx.gpars.dataflow.DataflowQueue -import nextflow.Channel -import nextflow.Global -import nextflow.Session -import test.BaseSpec -/** - * - * @author Paolo Di Tommaso - */ -class PublishOpTest extends BaseSpec { - - - def 'should publish files' () { - given: - def folder = Files.createTempDirectory('test') - def file1 = folder.resolve('file1.txt'); file1.text = 'Hello' - def file2 = folder.resolve('file2.txt'); file2.text = 'world' - def target = folder.resolve('target/dir') - - - def BASE = folder - def sess = Mock(Session) { - getWorkDir() >> BASE - getConfig() >> [:] - } - Global.session = sess - - and: - def ch = new DataflowQueue() - ch.bind(file1) - ch.bind(file2) - ch.bind(Channel.STOP) - - when: - def now = System.currentTimeMillis() - def op = new PublishOp(ch, [to:target, mode:'symlink']) .apply() - while( !op.complete ) { sleep 100; if( System.currentTimeMillis()-now>5_000) throw new TimeoutException() } - then: - target.resolve('file1.txt').text == 'Hello' - target.resolve('file2.txt').text == 'world' - - cleanup: - folder?.deleteDir() - } - -} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy index 74ba70740f..60f183fb23 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy @@ -44,17 +44,17 @@ class PublishDirTest extends Specification { publish.path == Paths.get('/data') when: - publish = PublishDir.create(path: 'data') + publish = PublishDir.create(path: 'data') then: publish.path == Paths.get('data').complete() when: - publish = PublishDir.create( path: Paths.get('data') ) + publish = PublishDir.create( path: Paths.get('data') ) then: publish.path == Paths.get('data').complete() when: - publish = PublishDir.create( [path: '/some/dir', overwrite: true, pattern: '*.bam', mode: 'link'] ) + publish = PublishDir.create( [path: '/some/dir', overwrite: true, pattern: '*.bam', mode: 'link'] ) then: publish.path == Paths.get('/some/dir') publish.mode == PublishDir.Mode.LINK @@ -63,7 +63,7 @@ class PublishDirTest extends Specification { publish.enabled when: - publish = PublishDir.create( [path: '/some/data', mode: 'copy', enabled: false] ) + publish = PublishDir.create( [path: '/some/data', mode: 'copy', enabled: false] ) then: publish.path == Paths.get('/some/data') publish.mode == PublishDir.Mode.COPY @@ -72,7 +72,7 @@ class PublishDirTest extends Specification { !publish.enabled when: - publish = PublishDir.create( [path: '/some/data', mode: 'copy', enabled: 'false'] ) + publish = PublishDir.create( [path: '/some/data', mode: 'copy', enabled: 'false'] ) then: publish.path == Paths.get('/some/data') publish.mode == PublishDir.Mode.COPY @@ -81,15 +81,7 @@ class PublishDirTest extends Specification { !publish.enabled when: - publish = PublishDir.create( [path:'this/folder', overwrite: false, pattern: '*.txt', mode: 'copy'] ) - then: - publish.path == Paths.get('this/folder').complete() - publish.mode == PublishDir.Mode.COPY - publish.pattern == '*.txt' - publish.overwrite == false - - when: - publish = PublishDir.create( [path:'this/folder', overwrite: 'false', pattern: '*.txt', mode: 'copy'] ) + publish = PublishDir.create( [path:'this/folder', overwrite: false, pattern: '*.txt', mode: 'copy'] ) then: publish.path == Paths.get('this/folder').complete() publish.mode == PublishDir.Mode.COPY @@ -132,7 +124,7 @@ class PublishDirTest extends Specification { def task = new TaskRun(workDir: workDir, config: new TaskConfig(), name: 'foo') when: - def outputs = [ + def outputs = [ workDir.resolve('file1.txt'), workDir.resolve('file2.bam'), workDir.resolve('file3.fastq') @@ -326,7 +318,7 @@ class PublishDirTest extends Specification { def task = new TaskRun(workDir: workDir, config: Mock(TaskConfig)) when: - def outputs = [ + def outputs = [ workDir.resolve('file1.txt'), ] as Set def publisher = new PublishDir(path: publishDir, enabled: false) diff --git a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy new file mode 100644 index 0000000000..dabf4c6212 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy @@ -0,0 +1,85 @@ +package nextflow.script + +import java.nio.file.Files +import java.util.concurrent.TimeoutException + +import groovyx.gpars.dataflow.DataflowQueue +import nextflow.Channel +import nextflow.Global +import nextflow.Session +import nextflow.SysEnv +import spock.lang.Specification +/** + * + * @author Ben Sherman + */ +class OutputDslTest extends Specification { + + def 'should publish workflow outputs'() { + given: + def root = Files.createTempDirectory('test') + def workDir = root.resolve('work') + def work1 = workDir.resolve('ab/1234'); Files.createDirectories(work1) + def work2 = workDir.resolve('cd/5678'); Files.createDirectories(work2) + def file1 = work1.resolve('file1.txt'); file1.text = 'Hello' + def file2 = work2.resolve('file2.txt'); file2.text = 'world' + def target = root.resolve('results') + and: + def session = Mock(Session) { + getConfig() >> [:] + getWorkDir() >> workDir + } + Global.session = session + and: + def ch1 = new DataflowQueue() + ch1.bind(file1) + ch1.bind(Channel.STOP) + and: + def ch2 = new DataflowQueue() + ch2.bind(file2) + ch2.bind(Channel.STOP) + and: + def targets = [ + (ch1): 'foo', + (ch2): 'bar' + ] + def dsl = new OutputDsl() + and: + SysEnv.push(NXF_FILE_ROOT: root.toString()) + + when: + dsl.directory('results') + dsl.mode('symlink') + dsl.overwrite(true) + dsl.target('bar') { + path('barbar') + index { + path 'index.csv' + } + } + dsl.build(targets) + + def now = System.currentTimeMillis() + while( !dsl.complete ) { + sleep 100 + if( System.currentTimeMillis() - now > 5_000 ) + throw new TimeoutException() + } + + then: + target.resolve('foo/file1.txt').text == 'Hello' + target.resolve('barbar/file2.txt').text == 'world' + target.resolve('barbar/index.csv').text == """\ + "file2","${target}/barbar/file2.txt" + """.stripIndent() + and: + 1 * session.notifyFilePublish(target.resolve('foo/file1.txt'), file1) + 1 * session.notifyFilePublish(target.resolve('barbar/file2.txt'), file2) + 1 * session.notifyFilePublish(target.resolve('barbar/index.csv')) + + cleanup: + SysEnv.pop() + root?.deleteDir() + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/script/WorkflowDefTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/WorkflowDefTest.groovy index 00a0f9dedf..4607a45619 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/WorkflowDefTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/WorkflowDefTest.groovy @@ -161,54 +161,6 @@ class WorkflowDefTest extends Dsl2Spec { } - - def 'should capture publish defs' () { - - given: - def config = new CompilerConfiguration() - config.setScriptBaseClass(TestScript.class.name) - config.addCompilationCustomizers( new ASTTransformationCustomizer(NextflowDSL)) - - def SCRIPT = ''' - - workflow { - publish: - foo - bar to: 'some/path' - baz.out to: 'other/path' - main: - x = 1 - } - ''' - - when: - def script = (TestScript)new GroovyShell(new ScriptBinding(), config).parse(SCRIPT).run() - then: - thrown(MultipleCompilationErrorsException) - } - - def 'should not allow publish is sub-workflow' () { - - given: - def config = new CompilerConfiguration() - config.setScriptBaseClass(TestScript.class.name) - config.addCompilationCustomizers( new ASTTransformationCustomizer(NextflowDSL)) - - def SCRIPT = ''' - - workflow alpha { - publish: foo - main: - x = 1 - } - ''' - - when: - new GroovyShell(config).parse(SCRIPT) - then: - thrown(MultipleCompilationErrorsException) - } - def 'should report malformed workflow block' () { given: diff --git a/modules/nf-commons/src/main/nextflow/util/CacheHelper.java b/modules/nf-commons/src/main/nextflow/util/CacheHelper.java index 95bf2f5683..b06c1e3203 100644 --- a/modules/nf-commons/src/main/nextflow/util/CacheHelper.java +++ b/modules/nf-commons/src/main/nextflow/util/CacheHelper.java @@ -16,40 +16,10 @@ package nextflow.util; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.ProviderMismatchException; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutionException; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.hash.Funnels; -import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import com.google.common.io.ByteStreams; -import nextflow.Global; -import nextflow.ISession; -import nextflow.extension.Bolts; -import nextflow.extension.FilesEx; -import nextflow.file.FileHolder; -import nextflow.io.SerializableMarker; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Provide helper method to handle caching * @@ -92,31 +62,12 @@ public static HashMode of( Object obj ) { } } - private static final Logger log = LoggerFactory.getLogger(CacheHelper.class); - - private static HashFunction DEFAULT_HASHING = Hashing.murmur3_128(); - - private static int HASH_BITS = DEFAULT_HASHING.bits(); - - private static int HASH_BYTES = HASH_BITS / 8; - - private static final Map FIRST_ONLY; - - static { - FIRST_ONLY = new HashMap<>(1); - FIRST_ONLY.put("firstOnly", Boolean.TRUE); - } - - public static HashFunction defaultHasher() { - return DEFAULT_HASHING; - } - public static Hasher hasher( Object value ) { return hasher(value, HashMode.STANDARD); } public static Hasher hasher( Object value, HashMode mode ) { - return hasher( DEFAULT_HASHING, value, mode ); + return hasher( HashBuilder.defaultHasher(), value, mode ); } public static Hasher hasher( HashFunction function, Object value, HashMode mode ) { @@ -124,356 +75,7 @@ public static Hasher hasher( HashFunction function, Object value, HashMode mode } public static Hasher hasher( Hasher hasher, Object value, HashMode mode ) { - - if( value == null ) - return hasher; - - if( value instanceof Boolean ) - return hasher.putBoolean((Boolean) value); - - if( value instanceof Short ) - return hasher.putShort((Short) value); - - if( value instanceof Integer) - return hasher.putInt((Integer) value); - - if( value instanceof Long ) - return hasher.putLong((Long) value); - - if( value instanceof Float ) - return hasher.putFloat((Float) value); - - if( value instanceof Double ) - return hasher.putDouble( (Double)value ); - - if( value instanceof Byte ) - return hasher.putByte( (Byte)value ); - - if( value instanceof Number ) - // reduce all other number types (BigInteger, BigDecimal, AtomicXxx, etc) to string equivalent - return hasher.putUnencodedChars(value.toString()); - - if( value instanceof Character ) - return hasher.putChar( (Character)value ); - - if( value instanceof CharSequence ) - return hasher.putUnencodedChars( (CharSequence)value ); - - if( value instanceof byte[] ) - return hasher.putBytes( (byte[])value ); - - if( value instanceof Object[]) { - for( Object item: ((Object[])value) ) - hasher = CacheHelper.hasher( hasher, item, mode ); - return hasher; - } - - if( value instanceof Map ) { - // note: should map be order invariant as Set ? - for( Object item : ((Map)value).values() ) - hasher = CacheHelper.hasher( hasher, item, mode ); - return hasher; - } - - if( value instanceof Map.Entry ) { - Map.Entry entry = (Map.Entry)value; - hasher = CacheHelper.hasher( hasher, entry.getKey(), mode ); - hasher = CacheHelper.hasher( hasher, entry.getValue(), mode ); - return hasher; - } - - if( value instanceof Bag || value instanceof Set ) - return hashUnorderedCollection(hasher, (Collection) value, mode); - - if( value instanceof Collection) { - for( Object item: ((Collection)value) ) - hasher = CacheHelper.hasher( hasher, item, mode ); - return hasher; - } - - if( value instanceof FileHolder ) - return CacheHelper.hasher(hasher, ((FileHolder) value).getSourceObj(), mode ); - - if( value instanceof Path ) - return hashFile(hasher, (Path)value, mode); - - if( value instanceof java.io.File ) - return hashFile(hasher, (java.io.File)value, mode); - - if( value instanceof UUID ) { - UUID uuid = (UUID)value; - return hasher.putLong(uuid.getMostSignificantBits()).putLong(uuid.getLeastSignificantBits()); - } - - if( value instanceof VersionNumber ) { - return hasher.putInt( value.hashCode() ); - } - - if( value instanceof SerializableMarker) { - return hasher.putInt( value.hashCode() ); - } - - if( value instanceof CacheFunnel ) { - return ((CacheFunnel) value).funnel(hasher,mode); - } - - if( value instanceof Enum ) { - return hasher.putUnencodedChars( value.getClass().getName() + "." + value ); - } - - Bolts.debug1(log, FIRST_ONLY, "[WARN] Unknown hashing type: "+value.getClass()); - return hasher.putInt( value.hashCode() ); - } - - /** - * Hashes the specified file - * - * @param hasher The current {@code Hasher} object - * @param file The {@code File} object to hash - * @param mode When {@code mode} is equals to the string {@code deep} is used the file content - * in order to create the hash key for this file, otherwise just the file metadata information - * (full name, size and last update timestamp) - * @return The updated {@code Hasher} object - */ - static private Hasher hashFile( Hasher hasher, java.io.File file, HashMode mode ) { - return hashFile(hasher, file.toPath(), mode); - } - - /** - * Hashes the specified file - * - * @param hasher The current {@code Hasher} object - * @param path The {@code Path} object to hash - * @param mode When {@code mode} is equals to the string {@code deep} is used the file content - * in order to create the hash key for this file, otherwise just the file metadata information - * (full name, size and last update timestamp) - * @return The updated {@code Hasher} object - */ - static private Hasher hashFile( Hasher hasher, Path path, HashMode mode ) { - BasicFileAttributes attrs=null; - try { - attrs = Files.readAttributes(path, BasicFileAttributes.class); - } - catch(IOException e) { - log.debug("Unable to get file attributes file: {} -- Cause: {}", FilesEx.toUriString(path), e.toString()); - } - catch(ProviderMismatchException e) { - // see https://github.com/nextflow-io/nextflow/pull/1382 - log.warn("File system is unable to get file attributes file: {} -- Cause: {}", FilesEx.toUriString(path), e.toString()); - } - catch(Exception e) { - log.warn("Unable to get file attributes file: {} -- Cause: {}", FilesEx.toUriString(path), e.toString()); - } - - if( (mode==HashMode.STANDARD || mode==HashMode.LENIENT) && isAssetFile(path) ) { - if( attrs==null ) { - // when file attributes are not avail or it's a directory - // hash the file using the file name path and the repository - log.warn("Unable to fetch attribute for file: {} - Hash is inferred from Git repository commit Id", FilesEx.toUriString(path)); - return hashFileAsset(hasher, path); - } - final Path base = Global.getSession().getBaseDir(); - if( attrs.isDirectory() ) { - // hash all the directory content - return hashDirSha256(hasher, path, base); - } - else { - // hash the content being an asset file - // (i.e. included in the project repository) it's expected to small file - // which makes the content hashing doable - return hashFileSha256(hasher, path, base); - } - } - - if( mode==HashMode.DEEP && attrs!=null && attrs.isRegularFile() ) - return hashFileContent(hasher, path); - if( mode==HashMode.SHA256 && attrs!=null && attrs.isRegularFile() ) - return hashFileSha256(hasher, path, null); - // default - return hashFileMetadata(hasher, path, attrs, mode); - } - - - static private LoadingCache sha256Cache = CacheBuilder - .newBuilder() - .maximumSize(10_000) - .build(new CacheLoader() { - @Override - public String load(Path key) throws Exception { - return hashFileSha256Impl0(key); - } - }); - - static protected Hasher hashFileSha256( Hasher hasher, Path path, Path base ) { - try { - log.trace("Hash sha-256 file content path={} - base={}", path, base); - // the file relative base - if( base!=null ) - hasher.putUnencodedChars(base.relativize(path).toString()); - // file content hash - String sha256 = sha256Cache.get(path); - hasher.putUnencodedChars(sha256); - } - catch (ExecutionException t) { - Throwable err = t.getCause()!=null ? t.getCause() : t; - String msg = err.getMessage()!=null ? err.getMessage() : err.toString(); - log.warn("Unable to compute sha-256 hashing for file: {} - Cause: {}", FilesEx.toUriString(path), msg); - } - return hasher; - } - - static protected Hasher hashDirSha256( Hasher hasher, Path dir, Path base ) { - try { - Files.walkFileTree(dir, new SimpleFileVisitor() { - public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException { - log.trace("Hash sha-256 dir content [FILE] path={} - base={}", path, base); - try { - // the file relative base - if( base!=null ) - hasher.putUnencodedChars(base.relativize(path).toString()); - // the file content sha-256 checksum - String sha256 = sha256Cache.get(path); - hasher.putUnencodedChars(sha256); - return FileVisitResult.CONTINUE; - } - catch (ExecutionException t) { - throw new IOException(t); - } - } - - public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) { - log.trace("Hash sha-256 dir content [DIR] path={} - base={}", path, base); - // the file relative base - if( base!=null ) - hasher.putUnencodedChars(base.relativize(path).toString()); - hasher.putUnencodedChars(base.relativize(path).toString()); - return FileVisitResult.CONTINUE; - } - }); - } - catch (IOException t) { - Throwable err = t.getCause()!=null ? t.getCause() : t; - String msg = err.getMessage()!=null ? err.getMessage() : err.toString(); - log.warn("Unable to compute sha-256 hashing for directory: {} - Cause: {}", FilesEx.toUriString(dir), msg); - } - return hasher; - } - - static protected String hashFileSha256Impl0(Path path) throws IOException { - log.debug("Hash asset file sha-256: {}", path); - Hasher hasher = Hashing.sha256().newHasher(); - ByteStreams.copy(Files.newInputStream(path), Funnels.asOutputStream(hasher)); - return hasher.hash().toString(); - } - - static private Hasher hashFileAsset( Hasher hasher, Path path ) { - log.debug("Hash asset file: {}", path); - hasher.putUnencodedChars( Global.getSession().getCommitId() ); - return hasher; - } - - /** - * Hashes the file by using the metadata information: full path string, size and last update timestamp - * - * @param hasher The current {@code Hasher} object - * @param file file The {@code Path} object to hash - * @return The updated {@code Hasher} object - */ - static private Hasher hashFileMetadata( Hasher hasher, Path file, BasicFileAttributes attrs, HashMode mode ) { - - hasher = hasher.putUnencodedChars( file.toAbsolutePath().toString() ); - if( attrs != null ) { - hasher = hasher.putLong(attrs.size()); - if( attrs.lastModifiedTime() != null && mode != HashMode.LENIENT ) { - hasher = hasher.putLong( attrs.lastModifiedTime().toMillis() ); - } - } - - if( log.isTraceEnabled() ) { - log.trace("Hashing file meta: path={}; size={}, lastModified={}, mode={}", - file.toAbsolutePath().toString(), - attrs!=null ? attrs.size() : "--", - attrs!=null && attrs.lastModifiedTime() != null && mode != HashMode.LENIENT ? attrs.lastModifiedTime().toMillis() : "--", - mode - ); - } - return hasher; - } - - - /** - * Hashes the file by reading file content - * - * @param hasher The current {@code Hasher} object - * @param path file The {@code Path} object to hash - * @return The updated {@code Hasher} object - */ - - static private Hasher hashFileContent( Hasher hasher, Path path ) { - - OutputStream output = Funnels.asOutputStream(hasher); - try { - Files.copy(path, output); - } - catch( IOException e ) { - throw new IllegalStateException("Unable to hash content: " + FilesEx.toUriString(path), e); - } - finally { - FilesEx.closeQuietly(output); - } - - return hasher; - } - - static HashCode hashContent( Path file ) { - return hashContent(file, null); - } - - static HashCode hashContent( Path file, HashFunction function ) { - - if( function == null ) - function = DEFAULT_HASHING; - - Hasher hasher = function.newHasher(); - return hashFileContent(hasher, file).hash(); - } - - static private Hasher hashUnorderedCollection(Hasher hasher, Collection collection, HashMode mode) { - - byte[] resultBytes = new byte[HASH_BYTES]; - for (Object item : collection) { - byte[] nextBytes = CacheHelper.hasher(item,mode).hash().asBytes(); - if( nextBytes.length != resultBytes.length ) - throw new IllegalStateException("All hash codes must have the same bit length"); - - for (int i = 0; i < nextBytes.length; i++) { - resultBytes[i] += nextBytes[i]; - } - } - - return hasher.putBytes(resultBytes); - - } - - /** - * Check if the argument is an asset file i.e. a file that makes part of the - * pipeline Git repository - * - * @param path - * @return - */ - static protected boolean isAssetFile(Path path) { - final ISession session = Global.getSession(); - if( session==null ) - return false; - // if the commit ID is null the current run is not launched from a repo - if( session.getCommitId()==null ) - return false; - // if the file belong to different file system, cannot be a file belonging to the repo - if( session.getBaseDir().getFileSystem()!=path.getFileSystem() ) - return false; - // if the file is in the same directory as the base dir it's a asset by definition - return path.startsWith(session.getBaseDir()); + return HashBuilder.hasher(hasher, value, mode); } } diff --git a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java new file mode 100644 index 0000000000..46c3fedf84 --- /dev/null +++ b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java @@ -0,0 +1,480 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.ProviderMismatchException; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.hash.Funnels; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.ByteStreams; +import nextflow.Global; +import nextflow.ISession; +import nextflow.extension.Bolts; +import nextflow.extension.FilesEx; +import nextflow.file.FileHolder; +import nextflow.io.SerializableMarker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static nextflow.util.CacheHelper.HashMode; + + +/** + * Implements the hashing of objects + * + * @author Paolo Di Tommaso + */ +public class HashBuilder { + + private static final Logger log = LoggerFactory.getLogger(HashBuilder.class); + + private static final HashFunction DEFAULT_HASHING = Hashing.murmur3_128(); + + private static final int HASH_BITS = DEFAULT_HASHING.bits(); + + private static final int HASH_BYTES = HASH_BITS / 8; + + private static final Map FIRST_ONLY; + + static { + FIRST_ONLY = new HashMap<>(1); + FIRST_ONLY.put("firstOnly", Boolean.TRUE); + } + + public static Hasher defaultHasher() { + return DEFAULT_HASHING.newHasher(); + } + + private Hasher hasher = defaultHasher(); + + private HashMode mode = HashMode.STANDARD; + + private Path basePath; + + public HashBuilder() {} + + public HashBuilder withHasher(Hasher hasher) { + this.hasher = hasher; + return this; + } + + public HashBuilder withMode(HashMode mode) { + this.mode = mode; + return this; + } + + public HashBuilder withBasePath(Path basePath) { + this.basePath = basePath; + return this; + } + + public HashBuilder with(Object value) { + + if( value == null ) + return this; + + else if( value instanceof Boolean ) + hasher.putBoolean((Boolean) value); + + else if( value instanceof Short ) + hasher.putShort((Short) value); + + else if( value instanceof Integer) + hasher.putInt((Integer) value); + + else if( value instanceof Long ) + hasher.putLong((Long) value); + + else if( value instanceof Float ) + hasher.putFloat((Float) value); + + else if( value instanceof Double ) + hasher.putDouble( (Double)value ); + + else if( value instanceof Byte ) + hasher.putByte( (Byte)value ); + + else if( value instanceof Number ) + // reduce all other number types (BigInteger, BigDecimal, AtomicXxx, etc) to string equivalent + hasher.putUnencodedChars(value.toString()); + + else if( value instanceof Character ) + hasher.putChar( (Character)value ); + + else if( value instanceof CharSequence ) + hasher.putUnencodedChars( (CharSequence)value ); + + else if( value instanceof byte[] ) + hasher.putBytes( (byte[])value ); + + else if( value instanceof Object[]) + for( Object item : ((Object[])value) ) + with(item); + + // note: should map be order invariant as Set ? + else if( value instanceof Map ) + for( Object item : ((Map)value).values() ) + with(item); + + else if( value instanceof Map.Entry ) { + Map.Entry entry = (Map.Entry)value; + with(entry.getKey()); + with(entry.getValue()); + } + + else if( value instanceof Bag || value instanceof Set ) + hashUnorderedCollection(hasher, (Collection) value, mode); + + else if( value instanceof Collection) + for( Object item : ((Collection)value) ) + with(item); + + else if( value instanceof FileHolder ) + with(((FileHolder) value).getSourceObj()); + + else if( value instanceof Path ) + hashFile(hasher, (Path)value, mode, basePath); + + else if( value instanceof java.io.File ) + hashFile(hasher, (java.io.File)value, mode, basePath); + + else if( value instanceof UUID ) { + UUID uuid = (UUID)value; + hasher.putLong(uuid.getMostSignificantBits()).putLong(uuid.getLeastSignificantBits()); + } + + else if( value instanceof VersionNumber ) + hasher.putInt( value.hashCode() ); + + else if( value instanceof SerializableMarker) + hasher.putInt( value.hashCode() ); + + else if( value instanceof CacheFunnel ) + ((CacheFunnel)value).funnel(hasher, mode); + + else if( value instanceof Enum ) + hasher.putUnencodedChars( value.getClass().getName() + "." + value ); + + else { + Bolts.debug1(log, FIRST_ONLY, "[WARN] Unknown hashing type: " + value.getClass()); + hasher.putInt( value.hashCode() ); + } + + return this; + } + + public Hasher getHasher() { + return hasher; + } + + public HashCode build() { + return hasher.hash(); + } + + public static Hasher hasher( Hasher hasher, Object value, HashMode mode ) { + + return new HashBuilder() + .withHasher(hasher) + .withMode(mode) + .with(value) + .getHasher(); + } + + /** + * Hash a file using only the relative file name instead of + * the absolute file path. + * + * @param path + * @param basePath + * @param mode + */ + public static HashCode hashPath(Path path, Path basePath, HashMode mode) { + return new HashBuilder().withMode(mode).withBasePath(basePath).with(path).build(); + } + + /** + * Hashes the specified file + * + * @param hasher The current {@code Hasher} object + * @param file The {@code File} object to hash + * @param mode When {@code mode} is equals to the string {@code deep} is used the file content + * in order to create the hash key for this file, otherwise just the file metadata information + * (full name, size and last update timestamp) + * @return The updated {@code Hasher} object + */ + static private Hasher hashFile( Hasher hasher, java.io.File file, HashMode mode, Path basePath ) { + return hashFile(hasher, file.toPath(), mode, basePath); + } + + /** + * Hashes the specified file + * + * @param hasher The current {@code Hasher} object + * @param path The {@code Path} object to hash + * @param mode When {@code mode} is equals to the string {@code deep} is used the file content + * in order to create the hash key for this file, otherwise just the file metadata information + * (full name, size and last update timestamp) + * @return The updated {@code Hasher} object + */ + static private Hasher hashFile( Hasher hasher, Path path, HashMode mode, Path basePath ) { + BasicFileAttributes attrs=null; + try { + attrs = Files.readAttributes(path, BasicFileAttributes.class); + } + catch(IOException e) { + log.debug("Unable to get file attributes file: {} -- Cause: {}", FilesEx.toUriString(path), e.toString()); + } + catch(ProviderMismatchException e) { + // see https://github.com/nextflow-io/nextflow/pull/1382 + log.warn("File system is unable to get file attributes file: {} -- Cause: {}", FilesEx.toUriString(path), e.toString()); + } + catch(Exception e) { + log.warn("Unable to get file attributes file: {} -- Cause: {}", FilesEx.toUriString(path), e.toString()); + } + + if( (mode==HashMode.STANDARD || mode==HashMode.LENIENT) && isAssetFile(path) ) { + if( attrs==null ) { + // when file attributes are not avail or it's a directory + // hash the file using the file name path and the repository + log.warn("Unable to fetch attribute for file: {} - Hash is inferred from Git repository commit Id", FilesEx.toUriString(path)); + return hashFileAsset(hasher, path); + } + final Path base = Global.getSession().getBaseDir(); + if( attrs.isDirectory() ) { + // hash all the directory content + return hashDirSha256(hasher, path, base); + } + else { + // hash the content being an asset file + // (i.e. included in the project repository) it's expected to small file + // which makes the content hashing doable + return hashFileSha256(hasher, path, base); + } + } + + if( mode==HashMode.DEEP && attrs!=null && attrs.isRegularFile() ) + return hashFileContent(hasher, path); + if( mode==HashMode.SHA256 && attrs!=null && attrs.isRegularFile() ) + return hashFileSha256(hasher, path, null); + // default + return hashFileMetadata(hasher, path, attrs, mode, basePath); + } + + + static private LoadingCache sha256Cache = CacheBuilder + .newBuilder() + .maximumSize(10_000) + .build(new CacheLoader() { + @Override + public String load(Path key) throws Exception { + return hashFileSha256Impl0(key); + } + }); + + static protected Hasher hashFileSha256( Hasher hasher, Path path, Path base ) { + try { + log.trace("Hash sha-256 file content path={} - base={}", path, base); + // the file relative base + if( base!=null ) + hasher.putUnencodedChars(base.relativize(path).toString()); + // file content hash + String sha256 = sha256Cache.get(path); + hasher.putUnencodedChars(sha256); + } + catch (ExecutionException t) { + Throwable err = t.getCause()!=null ? t.getCause() : t; + String msg = err.getMessage()!=null ? err.getMessage() : err.toString(); + log.warn("Unable to compute sha-256 hashing for file: {} - Cause: {}", FilesEx.toUriString(path), msg); + } + return hasher; + } + + static protected Hasher hashDirSha256( Hasher hasher, Path dir, Path base ) { + try { + Files.walkFileTree(dir, new SimpleFileVisitor() { + public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException { + log.trace("Hash sha-256 dir content [FILE] path={} - base={}", path, base); + try { + // the file relative base + if( base!=null ) + hasher.putUnencodedChars(base.relativize(path).toString()); + // the file content sha-256 checksum + String sha256 = sha256Cache.get(path); + hasher.putUnencodedChars(sha256); + return FileVisitResult.CONTINUE; + } + catch (ExecutionException t) { + throw new IOException(t); + } + } + + public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) { + log.trace("Hash sha-256 dir content [DIR] path={} - base={}", path, base); + // the file relative base + if( base!=null ) + hasher.putUnencodedChars(base.relativize(path).toString()); + hasher.putUnencodedChars(base.relativize(path).toString()); + return FileVisitResult.CONTINUE; + } + }); + } + catch (IOException t) { + Throwable err = t.getCause()!=null ? t.getCause() : t; + String msg = err.getMessage()!=null ? err.getMessage() : err.toString(); + log.warn("Unable to compute sha-256 hashing for directory: {} - Cause: {}", FilesEx.toUriString(dir), msg); + } + return hasher; + } + + static protected String hashFileSha256Impl0(Path path) throws IOException { + log.debug("Hash asset file sha-256: {}", path); + Hasher hasher = Hashing.sha256().newHasher(); + ByteStreams.copy(Files.newInputStream(path), Funnels.asOutputStream(hasher)); + return hasher.hash().toString(); + } + + static private Hasher hashFileAsset( Hasher hasher, Path path ) { + log.debug("Hash asset file: {}", path); + hasher.putUnencodedChars( Global.getSession().getCommitId() ); + return hasher; + } + + /** + * Hashes the file by using the metadata information: full path string, size and last update timestamp + * + * @param hasher The current {@code Hasher} object + * @param file file The {@code Path} object to hash + * @return The updated {@code Hasher} object + */ + static private Hasher hashFileMetadata( Hasher hasher, Path file, BasicFileAttributes attrs, HashMode mode, Path basePath ) { + + String filename = basePath != null && file.startsWith(basePath) + ? basePath.relativize(file).toString() + : file.toAbsolutePath().toString(); + + hasher = hasher.putUnencodedChars( filename ); + if( attrs != null ) { + hasher = hasher.putLong(attrs.size()); + if( attrs.lastModifiedTime() != null && mode != HashMode.LENIENT ) { + hasher = hasher.putLong( attrs.lastModifiedTime().toMillis() ); + } + } + + if( log.isTraceEnabled() ) { + log.trace("Hashing file meta: path={}; size={}, lastModified={}, mode={}", + file.toAbsolutePath().toString(), + attrs!=null ? attrs.size() : "--", + attrs!=null && attrs.lastModifiedTime() != null && mode != HashMode.LENIENT ? attrs.lastModifiedTime().toMillis() : "--", + mode + ); + } + return hasher; + } + + /** + * Hashes the file by reading file content + * + * @param hasher The current {@code Hasher} object + * @param path file The {@code Path} object to hash + * @return The updated {@code Hasher} object + */ + static private Hasher hashFileContent( Hasher hasher, Path path ) { + + OutputStream output = Funnels.asOutputStream(hasher); + try { + Files.copy(path, output); + } + catch( IOException e ) { + throw new IllegalStateException("Unable to hash content: " + FilesEx.toUriString(path), e); + } + finally { + FilesEx.closeQuietly(output); + } + + return hasher; + } + + static HashCode hashContent( Path file ) { + return hashContent(file, null); + } + + static HashCode hashContent( Path file, HashFunction function ) { + + if( function == null ) + function = DEFAULT_HASHING; + + Hasher hasher = function.newHasher(); + return hashFileContent(hasher, file).hash(); + } + + static private Hasher hashUnorderedCollection(Hasher hasher, Collection collection, HashMode mode) { + + byte[] resultBytes = new byte[HASH_BYTES]; + for (Object item : collection) { + byte[] nextBytes = HashBuilder.hasher(defaultHasher(), item, mode).hash().asBytes(); + if( nextBytes.length != resultBytes.length ) + throw new IllegalStateException("All hash codes must have the same bit length"); + + for (int i = 0; i < nextBytes.length; i++) { + resultBytes[i] += nextBytes[i]; + } + } + + return hasher.putBytes(resultBytes); + } + + /** + * Check if the argument is an asset file i.e. a file that makes part of the + * pipeline Git repository + * + * @param path + * @return + */ + static protected boolean isAssetFile(Path path) { + final ISession session = Global.getSession(); + if( session==null ) + return false; + // if the commit ID is null the current run is not launched from a repo + if( session.getCommitId()==null ) + return false; + // if the file belong to different file system, cannot be a file belonging to the repo + if( session.getBaseDir().getFileSystem()!=path.getFileSystem() ) + return false; + // if the file is in the same directory as the base dir it's a asset by definition + return path.startsWith(session.getBaseDir()); + } + +} diff --git a/modules/nf-commons/src/test/nextflow/util/CacheHelperTest.groovy b/modules/nf-commons/src/test/nextflow/util/CacheHelperTest.groovy index ea1e2dc479..9ccaef47b4 100644 --- a/modules/nf-commons/src/test/nextflow/util/CacheHelperTest.groovy +++ b/modules/nf-commons/src/test/nextflow/util/CacheHelperTest.groovy @@ -17,7 +17,6 @@ package nextflow.util import java.nio.file.Files -import java.nio.file.Paths import java.nio.file.attribute.FileTime import com.google.common.hash.Hashing @@ -85,45 +84,6 @@ class CacheHelperTest extends Specification { } - def testHashContent() { - setup: - def path1 = Files.createTempFile('test-hash-content',null) - def path2 = Files.createTempFile('test-hash-content',null) - def path3 = Files.createTempFile('test-hash-content',null) - - path1.text = ''' - line 1 - line 2 - line 3 the file content - ''' - - - path2.text = ''' - line 1 - line 2 - line 3 the file content - ''' - - path3.text = ''' - line 1 - line 1 - line 1 the file content - ''' - - expect: - CacheHelper.hashContent(path1) == CacheHelper.hashContent(path2) - CacheHelper.hashContent(path1) != CacheHelper.hashContent(path3) - CacheHelper.hashContent(path1, Hashing.md5()) == CacheHelper.hashContent(path2,Hashing.md5()) - CacheHelper.hashContent(path1, Hashing.md5()) != CacheHelper.hashContent(path3,Hashing.md5()) - - cleanup: - path1.delete() - path2.delete() - path3.delete() - - } - - def testHashOrder () { when: @@ -236,37 +196,6 @@ class CacheHelperTest extends Specification { 'lenient' | CacheHelper.HashMode.LENIENT 'sha256' | CacheHelper.HashMode.SHA256 } - - def 'should validate is asset file'() { - when: - def BASE = Paths.get("/some/pipeline/dir") - and: - Global.session = Mock(Session) { getBaseDir() >> BASE } - then: - !CacheHelper.isAssetFile(BASE.resolve('foo')) - - - when: - Global.session = Mock(Session) { - getBaseDir() >> BASE - getCommitId() >> '123456' - } - then: - CacheHelper.isAssetFile(BASE.resolve('foo')) - and: - !CacheHelper.isAssetFile(Paths.get('/other/dir')) - } - - - def 'should hash file content'() { - given: - def EXPECTED = '64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c' - def file = TestHelper.createInMemTempFile('foo', 'Hello world') - expect: - CacheHelper.hashFileSha256Impl0(file) == EXPECTED - and: - CacheHelper.hashFileSha256Impl0(file) == DigestUtils.sha256Hex(file.bytes) - } def 'should hash content with sha256' () { given: @@ -283,31 +212,4 @@ class CacheHelperTest extends Specification { CacheHelper.hasher(file, CacheHelper.HashMode.SHA256).hash().toString() == 'd29e7ba0fbcc617ab8e1e44e81381aed' } - def 'should hash dir content with sha256'() { - given: - def folder = TestHelper.createInMemTempDir() - folder.resolve('dir1').mkdir() - folder.resolve('dir2').mkdir() - and: - folder.resolve('dir1/foo').text = "I'm foo" - folder.resolve('dir1/bar').text = "I'm bar" - folder.resolve('dir1/xxx/yyy').mkdirs() - folder.resolve('dir1/xxx/foo1').text = "I'm foo within xxx" - folder.resolve('dir1/xxx/yyy/bar1').text = "I'm bar within yyy" - and: - folder.resolve('dir2/foo').text = "I'm foo" - folder.resolve('dir2/bar').text = "I'm bar" - folder.resolve('dir2/xxx/yyy').mkdirs() - folder.resolve('dir2/xxx/foo1').text = "I'm foo within xxx" - folder.resolve('dir2/xxx/yyy/bar1').text = "I'm bar within yyy" - - when: - def hash1 = CacheHelper.hashDirSha256(CacheHelper.defaultHasher().newHasher(), folder.resolve('dir1'), folder.resolve('dir1')) - and: - def hash2 = CacheHelper.hashDirSha256(CacheHelper.defaultHasher().newHasher(), folder.resolve('dir2'), folder.resolve('dir2')) - - then: - hash1.hash() == hash2.hash() - - } } diff --git a/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy b/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy new file mode 100644 index 0000000000..79c380ae94 --- /dev/null +++ b/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.util + +import java.nio.file.Files +import java.nio.file.Paths + +import com.google.common.hash.Hashing +import nextflow.Global +import nextflow.Session +import org.apache.commons.codec.digest.DigestUtils +import spock.lang.Specification +import test.TestHelper +/** + * + * @author Paolo Di Tommaso + */ +class HashBuilderTest extends Specification { + + + def testHashContent() { + setup: + def path1 = Files.createTempFile('test-hash-content',null) + def path2 = Files.createTempFile('test-hash-content',null) + def path3 = Files.createTempFile('test-hash-content',null) + + path1.text = ''' + line 1 + line 2 + line 3 the file content + ''' + + + path2.text = ''' + line 1 + line 2 + line 3 the file content + ''' + + path3.text = ''' + line 1 + line 1 + line 1 the file content + ''' + + expect: + HashBuilder.hashContent(path1) == HashBuilder.hashContent(path2) + HashBuilder.hashContent(path1) != HashBuilder.hashContent(path3) + HashBuilder.hashContent(path1, Hashing.md5()) == HashBuilder.hashContent(path2,Hashing.md5()) + HashBuilder.hashContent(path1, Hashing.md5()) != HashBuilder.hashContent(path3,Hashing.md5()) + + cleanup: + path1.delete() + path2.delete() + path3.delete() + + } + + def 'should validate is asset file'() { + when: + def BASE = Paths.get("/some/pipeline/dir") + and: + Global.session = Mock(Session) { getBaseDir() >> BASE } + then: + !HashBuilder.isAssetFile(BASE.resolve('foo')) + + + when: + Global.session = Mock(Session) { + getBaseDir() >> BASE + getCommitId() >> '123456' + } + then: + HashBuilder.isAssetFile(BASE.resolve('foo')) + and: + !HashBuilder.isAssetFile(Paths.get('/other/dir')) + } + + def 'should hash file content'() { + given: + def EXPECTED = '64ec88ca00b268e5ba1a35678a1b5316d212f4f366b2477232534a8aeca37f3c' + def file = TestHelper.createInMemTempFile('foo', 'Hello world') + expect: + HashBuilder.hashFileSha256Impl0(file) == EXPECTED + and: + HashBuilder.hashFileSha256Impl0(file) == DigestUtils.sha256Hex(file.bytes) + } + + def 'should hash dir content with sha256'() { + given: + def folder = TestHelper.createInMemTempDir() + folder.resolve('dir1').mkdir() + folder.resolve('dir2').mkdir() + and: + folder.resolve('dir1/foo').text = "I'm foo" + folder.resolve('dir1/bar').text = "I'm bar" + folder.resolve('dir1/xxx/yyy').mkdirs() + folder.resolve('dir1/xxx/foo1').text = "I'm foo within xxx" + folder.resolve('dir1/xxx/yyy/bar1').text = "I'm bar within yyy" + and: + folder.resolve('dir2/foo').text = "I'm foo" + folder.resolve('dir2/bar').text = "I'm bar" + folder.resolve('dir2/xxx/yyy').mkdirs() + folder.resolve('dir2/xxx/foo1').text = "I'm foo within xxx" + folder.resolve('dir2/xxx/yyy/bar1').text = "I'm bar within yyy" + + when: + def hash1 = HashBuilder.hashDirSha256(HashBuilder.defaultHasher(), folder.resolve('dir1'), folder.resolve('dir1')) + and: + def hash2 = HashBuilder.hashDirSha256(HashBuilder.defaultHasher(), folder.resolve('dir2'), folder.resolve('dir2')) + + then: + hash1.hash() == hash2.hash() + + } +} diff --git a/tests/output-dsl.nf b/tests/output-dsl.nf new file mode 100644 index 0000000000..22d9cea365 --- /dev/null +++ b/tests/output-dsl.nf @@ -0,0 +1,90 @@ +#!/usr/bin/env nextflow +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +nextflow.preview.output = true + +params.save_foo = true + +process align { + input: + val(x) + + output: + path("*.bam") + path("${x}.bai") + + """ + echo ${x} > ${x}.bam + echo ${x} | rev > ${x}.bai + """ +} + +process my_combine { + input: + path(bamfile) + path(baifile) + + output: + path 'result.txt' + + """ + cat $bamfile > result.txt + cat $baifile >> result.txt + """ +} + +process foo { + output: + path 'xxx' + + ''' + mkdir xxx + touch xxx/A + touch xxx/B + touch xxx/C + ''' +} + +workflow { + def input = Channel.of('alpha','beta','delta') + align(input) + + def bam = align.out[0].toSortedList { it.name } + def bai = align.out[1].toSortedList { it.name } + my_combine( bam, bai ) + my_combine.out.view{ it.text } + + foo() + + publish: + align.out >> 'data' + my_combine.out >> 'more/data' + foo.out >> (params.save_foo ? 'data' : null) +} + +output { + directory 'results' + mode 'copy' + + 'data' { + index { + path 'index.csv' + mapper { val -> [filename: val] } + header true + sep ',' + } + } +}