-
Notifications
You must be signed in to change notification settings - Fork 8
/
make_chains.py
executable file
·264 lines (235 loc) · 11.2 KB
/
make_chains.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#!/usr/bin/env python3
"""Make LASTZ chains master script."""
import argparse
import shutil
import sys
import os
import subprocess
from datetime import datetime as dt
from constants import Constants
from modules.step_executables import StepExecutables
from modules.project_paths import ProjectPaths
from modules.project_directory import OutputDirectoryManager
from modules.step_manager import StepManager
from modules.parameters import PipelineParameters
from modules.pipeline_steps import PipelineSteps
from modules.make_chains_logging import setup_logger
from modules.make_chains_logging import to_log
from modules.project_setup_procedures import setup_genome_sequences
from version import __version__
__author__ = "Bogdan M. Kirilenko, Michael Hiller, Virag Sharma, Ekaterina Osipova"
__maintainer__ = "Bogdan M. Kirilenko"
SCRIPT_LOCATION = os.path.abspath(os.path.dirname(__file__))
def parse_args():
app = argparse.ArgumentParser(description=Constants.DESCRIPTION)
app.add_argument(
"target_name", help="Target genome identifier, e.g. hg38, human, etc."
)
app.add_argument(
"query_name", help="Query genome identifier, e.g. mm10, mm39, mouse, etc."
)
app.add_argument(
"target_genome", help="Target genome. Accepted formats are: fasta and 2bit."
)
app.add_argument(
"query_genome", help="Query genome. Accepted formats are: fasta and 2bit."
)
app.add_argument("--project_dir", "--pd", help="Project directory. By default: pwd")
app.add_argument(
"--continue_from_step",
"--cfs",
help="Continue pipeline execution from this step",
choices=PipelineSteps.ORDER,
default=None,
)
app.add_argument(
"--force",
"-f",
action="store_true",
dest="force",
help="Overwrite output directory if exists"
)
app.add_argument("--cluster_executor",
"--executor",
default="local",
help="Nextflow executor parameter")
app.add_argument("--cluster_queue",
default="batch",
help="Queue/Partition label to run cluster jobs")
app.add_argument("--keep_temp",
"--kt",
dest="keep_temp",
action="store_true",
help="Do not remove temp files")
app.add_argument("--params_from_file",
default=None,
help="Read parameters from a specified config file")
app.add_argument("--lastz_executable",
default=None,
help="Set up the path to the lastz executable manually")
app.add_argument("--nextflow_executable",
default=None,
help="Set up the path to the nextflow executable manually")
# Pipeline parameters group
pipeline_params = app.add_argument_group('Pipeline Parameters')
pipeline_params.add_argument("--skip_fill_chain", dest="skip_fill_chains", action="store_true")
pipeline_params.add_argument("--skip_fill_unmask", dest="skip_fill_unmask", action="store_true")
pipeline_params.add_argument("--skip_clean_chain", dest="skip_clean_chain", action="store_true")
pipeline_params.add_argument("--lastz_y", default=Constants.DEFAULT_LASTZ_Y, type=int)
pipeline_params.add_argument("--lastz_h", default=Constants.DEFAULT_LASTZ_H, type=int)
pipeline_params.add_argument("--lastz_l", default=Constants.DEFAULT_LASTZ_L, type=int)
pipeline_params.add_argument("--lastz_k", default=Constants.DEFAULT_LASTZ_K, type=int)
pipeline_params.add_argument("--seq1_chunk", default=Constants.DEFAULT_SEQ1_CHUNK, type=int)
pipeline_params.add_argument("--seq1_lap", default=Constants.DEFAULT_SEQ1_LAP, type=int)
pipeline_params.add_argument("--seq1_limit", default=Constants.DEFAULT_SEQ1_LIMIT, type=int)
pipeline_params.add_argument("--seq2_chunk", default=Constants.DEFAULT_SEQ2_CHUNK, type=int)
pipeline_params.add_argument("--seq2_lap", default=Constants.DEFAULT_SEQ2_LAP, type=int)
pipeline_params.add_argument("--seq2_limit", default=Constants.DEFAULT_SEQ2_LIMIT, type=int)
pipeline_params.add_argument("--min_chain_score",
default=Constants.DEFAULT_MIN_CHAIN_SCORE,
type=int)
pipeline_params.add_argument("--chain_linear_gap",
default=Constants.DEFAULT_CHAIN_LINEAR_GAP,
choices=["loose", "medium"],
type=str)
pipeline_params.add_argument("--num_fill_jobs", default=Constants.DEFAULT_NUM_FILL_JOBS, type=int)
pipeline_params.add_argument("--fill_chain_min_score",
default=Constants.DEFAULT_FILL_CHAIN_MIN_SCORE,
type=int)
pipeline_params.add_argument("--fill_insert_chain_min_score",
default=Constants.DEFAULT_INSERT_CHAIN_MIN_SCORE,
type=int)
pipeline_params.add_argument("--fill_gap_max_size_t",
default=Constants.DEFAULT_FILL_GAP_MAX_SIZE_T,
type=int)
pipeline_params.add_argument("--fill_gap_max_size_q",
default=Constants.DEFAULT_FILL_GAP_MAX_SIZE_Q,
type=int)
pipeline_params.add_argument("--fill_gap_min_size_t",
default=Constants.DEFAULT_FILL_GAP_MIN_SIZE_T,
type=int)
pipeline_params.add_argument("--fill_gap_min_size_q",
default=Constants.DEFAULT_FILL_GAP_MIN_SIZE_Q,
type=int)
pipeline_params.add_argument("--fill_lastz_k", default=Constants.DEFAULT_FILL_LASTZ_K, type=int)
pipeline_params.add_argument("--fill_lastz_l", default=Constants.DEFAULT_FILL_LASTZ_L, type=int)
pipeline_params.add_argument("--fill_memory", default=Constants.DEFAULT_FILL_MEMORY, type=int)
pipeline_params.add_argument("--fill_prepare_memory",
default=Constants.DEFAULT_FILL_PREPARE_MEMORY,
type=int)
pipeline_params.add_argument("--chaining_memory",
default=Constants.DEFAULT_CHAINING_MEMORY,
type=int)
pipeline_params.add_argument("--chain_clean_memory",
default=Constants.DEFAULT_CHAIN_CLEAN_MEMORY,
type=int)
pipeline_params.add_argument("--clean_chain_parameters",
default=Constants.DEFAULT_CLEAN_CHAIN_PARAMS)
pipeline_params.add_argument("--job_time_req",
default=Constants.DEFAULT_JOB_TIME_REQ, type=str,
help="Maximum time allocated per Nextflow job")
if len(sys.argv) < 5:
app.print_help()
sys.exit(1)
args = app.parse_args()
return args
def log_version():
"""Get git hash and current branch if possible."""
cmd_hash = "git rev-parse HEAD"
cmd_branch = "git rev-parse --abbrev-ref HEAD"
try:
git_hash = subprocess.check_output(
cmd_hash, shell=True, cwd=SCRIPT_LOCATION
).decode("utf-8").strip()
git_branch = subprocess.check_output(
cmd_branch, shell=True, cwd=SCRIPT_LOCATION
).decode("utf-8").strip()
except subprocess.CalledProcessError:
git_hash = "unknown"
git_branch = "unknown"
version = f"Version {__version__}\nCommit: {git_hash}\nBranch: {git_branch}\n"
to_log("# Make Lastz Chains #")
to_log(version)
return version
def save_final_chain(parameters: PipelineParameters, project_paths: ProjectPaths):
# get final result chain
if parameters.fill_chain is True:
last_chain_file = project_paths.filled_chain
to_log(f"Chains were filled, using {last_chain_file} as the last output file.")
else:
last_chain_file = project_paths.merged_chain
to_log(f"Chains were NOT filled, using {last_chain_file} as the last output file.")
if not os.path.isfile(last_chain_file):
raise ValueError(
f"Critical! Output chain file {last_chain_file} is absent!"
f"Please check the logs, probably one of the pipeline steps failed."
)
# save it to the root project dir
shutil.move(last_chain_file, project_paths.final_chain)
to_log(f"Saved final chains file to {project_paths.final_chain}")
def _del_file_and_log(path):
os.remove(path)
to_log(f"x {path}")
def cleanup(parameters: PipelineParameters, project_paths: ProjectPaths):
"""Perform the cleanup."""
if parameters.keep_temp:
to_log("Temp files are not deleted because of the --keep_temp flag")
return # cleanup is not necessary
dirs_to_del = [
project_paths.chain_run_dir,
project_paths.cat_out_dirname,
project_paths.lastz_output_dir,
project_paths.lastz_working_dir,
project_paths.fill_chain_run_dir,
project_paths.kent_temp_dir
]
to_log("Cleaning up the following directories")
for dirname in dirs_to_del:
to_log(f"x {dirname}")
shutil.rmtree(dirname)
# remove individual temp files
to_log("And the following files:")
_del_file_and_log(project_paths.reference_genome)
_del_file_and_log(project_paths.query_genome)
_del_file_and_log(project_paths.reference_partitions)
_del_file_and_log(project_paths.query_partitions)
_del_file_and_log(project_paths.ref_chrom_sizes)
_del_file_and_log(project_paths.query_chrom_sizes)
def run_pipeline(args):
# setup project dir, parameters and step manager
start_time = dt.now()
project_dir = OutputDirectoryManager(args).project_dir
setup_logger(os.path.join(project_dir, "run.log"))
log_version()
parameters = PipelineParameters(args)
project_paths = ProjectPaths(project_dir, SCRIPT_LOCATION, parameters)
step_executables = StepExecutables(SCRIPT_LOCATION, args)
step_manager = StepManager(project_paths, args)
to_log(f"Making chains for {args.target_genome} and {args.query_genome} files, saving results to {project_dir}")
to_log(f"Pipeline started at {start_time}")
parameters.dump_to_json(project_dir)
# initiate input files
setup_genome_sequences(args.target_genome,
args.target_name,
Constants.TARGET_LABEL,
project_paths,
step_executables,
parameters)
setup_genome_sequences(args.query_genome,
args.query_name,
Constants.QUERY_LABEL,
project_paths,
step_executables,
parameters)
# now execute steps
step_manager.execute_steps(parameters, step_executables, project_paths)
# finalise the run
save_final_chain(parameters, project_paths)
cleanup(parameters, project_paths)
tot_runtime = dt.now() - start_time
to_log(f"make_lastz_chains run done in {tot_runtime}")
def main():
args = parse_args()
run_pipeline(args)
if __name__ == "__main__":
main()