forked from mtokuyama/ERVmap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.nf
123 lines (98 loc) · 3.38 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env nextflow
// check parameters
if (!params.inputDir) {
exit 1, 'inputDir parameter is missing.'
}
if (!params.skipAlignment) {
params.skipAlignment=false
}
if (!new File(params.inputDir).exists()) {
exit 1, "The input folder does not exists."+params.inputDir+"\n"
}
if (!params.outputDir) {
exit 1, "outputDir parameter is missing."
}
if (!params.starTmpDir) {
exit 1, "starTmpDir parameter is missing."
}
if (!new File(params.starTmpDir).exists()) {
exit 1, 'The STAR temporary folder does not exists. ('+params.starTmpDir+')\n'
}
if (!params.localOutputDir) {
params.localOutputDir='bam'
}
if (!params.debug) {
exit 1, "Debug prefix parameter is missing."
}
// Adding the option to skip the alignment if skipAlignment is set to true.
(fastqPairs_ch, bamPairs_ch) = (params.skipAlignment
? [Channel.empty(), Channel.fromFilePairs(params.inputDir+'/'+params.inputPattern, size: 2, checkIfExists: true) ]
: [Channel.fromFilePairs(params.inputDir+'/'+params.inputPattern, size: 2, checkIfExists: true), Channel.empty() ] )
process ERValign {
tag "${sample}"
// executor configuration
time '8h'
memory '35 GB'
scratch true
cpus params.cpus
publishDir params.outputDir, mode: 'copy'
stageOutMode 'rsync'
// other configuration
echo true
errorStrategy 'terminate'
input:
tuple val (sample), path (reads) from fastqPairs_ch
val (localOutputDir) from params.localOutputDir
val (limitMemory) from params.limitMemory
val (debug) from params.debug
output:
tuple sample, path ("${localOutputDir}/${sample}.Aligned.sortedByCoord.out.bam*") into star_bam_ch
when:
!params.skipAlignment
shell:
template 'ERValign.sh'
}
process ERVcount {
tag "${sample}"
// executor configuration
time '3h'
memory '8.GB'
scratch true
storeDir params.outputDir
// other configuration
echo true
errorStrategy 'terminate'
stageInMode 'symlink'
input:
tuple val (sample), path (bam) from bamPairs_ch.mix(star_bam_ch) // mixing with the channel from ERValign if started from FASTQs and skipAlignment=false
val (debug) from params.debug
output:
path ("${sample}"+'.ERVresults.txt') into final_results_ch
shell:
template 'ERVcount.sh'
}
// ~~~~~~~~~~~~~~~ PIPELINE COMPLETION EVENTS ~~~~~~~~~~~~~~~~~~~ //
workflow.onComplete {
log.info "Pipeline completed at: $workflow.complete"
log.info "Execution status: ${workflow.success ? 'OK' : 'FAILED'}"
// getting the default data from the workflow
// PipelineMessage.completed(workflow).forTopic(pipeline_topic_completed).data(runIDKey, "${runID}").send()
}
workflow.onError {
log.info "Pipeline execution stopped with the following message: ${workflow.errorMessage}"
log.info "Pipeline execution stopped with the following report: ${workflow.errorReport}"
// getting the default data from the workflow
// PipelineMessage.progress().forTopic(pipeline_topic_failure).data(runIDKey, "${runID}").data('onError', 'inside').send()
// PipelineMessage.failed(workflow).forTopic(pipeline_topic_failure).data(runIDKey, "${runID}").send()
}
// ~~~~~~~~~~~~~~~ HELPER FUNCTIONS ~~~~~~~~~~~~~~~~~~~ //
/**
* Create a dir.
*
* @param dir
* @return
*/
def mkDir(def dir) {
"mkdir -p ${dir}".execute()
"chmod -R a+r+w ${dir}".execute()
}