-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmultixstar.py
316 lines (252 loc) · 8.93 KB
/
multixstar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#! /usr/bin/env python3
"""
Based on the code pvm_xstar using multiprocessing module instead of pvm.
Written by Michael S. Noble ([email protected])
Copyright (c) 2008-2009, Massachusetts Institute of Technology
multixstar: Manages parallel execution of multiple XSTAR runs,
using the multiprocessing python module. XSTAR is part of the LHEASOFT astronomy toolset from
HEASARC, and is used to for calculating the physical conditions and
emission spectra of photoionized gases (Kallman & Bautista 2001).
"""
import argparse
import logging
import multiprocessing as mp
import os
import subprocess
import sys
from distutils.dir_util import copy_tree
from pathlib import Path
from shutil import copy as copy_file
import re
from datetime import datetime
from time import sleep
__version__ = "0.2.1"
def run(cmd, env=None, shell=None, return_stdout=True, **extra_config):
""" runs cmds in csh"""
config = {
"shell": True,
"executable": shell or os.getenv("SHELL"),
"stdout": subprocess.PIPE,
}
if env:
config["env"] = env
if extra_config:
config.update(extra_config)
proc = subprocess.Popen(cmd, **config)
if return_stdout:
return proc.communicate()[0]
return proc
def setup_pfiles():
dst = Path("pfiles")
dst.mkdir()
src = Path(os.getenv("HEADAS")).joinpath("syspfiles/xstar.par")
copy_file(src, dst)
os.environ["PFILES"] = str(dst)
return dst
def get_executeable_dir():
return Path(os.getenv("FTOOLS")).joinpath("bin")
def get_xstar_output(xstar):
return map(lambda l: "XSTAR OUTPUT: {}".format(l.decode("utf-8")), xstar.stdout.readlines(),)
def run_xstar(args):
dir, cmd = args
result = []
previous_dir = os.getcwd()
os.chdir(dir)
result.append("Running: {}".format(cmd))
pfiles_dir = setup_pfiles()
result.append("Copied pfiles to local folder: {}".format(pfiles_dir))
xstar = run(
"{exe_dir}/{cmd}".format(exe_dir=get_executeable_dir(), cmd=cmd), return_stdout=False
)
result.append("Process ID: {}".format(xstar.pid))
xstar.wait()
result.extend(get_xstar_output(xstar))
os.chdir(previous_dir)
return "\n".join(result)
def get_new_dir(dir, num, extra):
return dir.joinpath("mxstar.{}".format("{0}_{1}".format(extra, num) if extra else num))
def make_new_dir(dir, extra=None):
""" generates a unqie suffix"""
i = 0
new_dir = get_new_dir(dir, i, extra)
while new_dir.exists():
i += 1
new_dir = get_new_dir(dir, i, extra)
new_dir.mkdir()
return new_dir
def process_flags(argv=None):
"""
processing script arguments
"""
usage = "multixstar [options] <joblist|params>"
description = """
multixstar: manages parallel execution of multiple XSTAR
jobs, with python's multiprocessing module.
Version: {version}
""".format(
version=__version__
)
epilogue = """
Normally xstinitable will be launched to prompt for XSTAR
physical parameters and generate a list of XSTAR jobs to run in parallel.
This can be customized by supplying xstinitable parameters on the command
line (such as mode=h) OR by supplying the name of an existing joblist
file, in which case xstinitable will not be run nor will the generated
spectra be collated into a single table model with xstar2table
"""
parser = argparse.ArgumentParser(usage=usage, description=description, epilog=epilogue)
parser.add_argument(
"-w",
"--workdir",
dest="workdir",
default="./",
metavar="WorkDir",
type=lambda x: Path(x).absolute(),
help="Work directory to save results of the run",
)
parser.add_argument(
"-k", action="store_true", dest="keeplog", default=False, help="keep log file",
)
parser.add_argument(
"-l",
"--logfile",
dest="log_file",
default="mxstar.log",
type=lambda x: Path(x).absolute(),
metavar="LOGFILE",
help="specify file to save log",
)
parser.add_argument(
"-n",
"--nproc",
type=int,
dest="nproc",
default=mp.cpu_count(),
metavar="NUMPROC",
help="Max number of processors per host",
)
# options stores known arguments and
# args stores potential xstinitable arguments
options, args = parser.parse_known_args()
return options, args
def check_enviroment(dir):
"""Checks heasoft is running and that dir exist and is writable"""
if "FTOOLS" not in os.environ:
raise OSError("$FTOOLS not set!\n please run heainit and rerun")
if not dir.is_dir():
raise IOError("{} is not a dir".format(dir))
testfile = dir.joinpath("write_check.test")
testfile.touch()
testfile.unlink()
def get_xstar_cmds(args=None, binpath=None):
print("Making XSTAR commands")
joblist = None
if args and len(args) > 0:
joblist = Path(args[0])
joblist_local = Path(joblist.name)
if joblist.exists():
print("Joblist {} found".format(joblist))
copy_file(joblist, joblist_local)
copy_file(joblist.with_suffix(".fits"), joblist_local.with_suffix(".fits"))
joblist = joblist_local
elif Path("..").joinpath(joblist).exists():
joblist = Path("..").joinpath(joblist)
print("Joblist {} found".format(joblist))
copy_file(joblist, joblist_local)
copy_file(joblist.with_suffix(".fits"), joblist_local.with_suffix(".fits"))
joblist = joblist_local
else:
args = []
if not joblist:
print("No joblist found: runing xstinitable to make joblist")
run(
"{exe} {args}".format(
exe=binpath.joinpath("xstinitable"), args=" ".join(map(str, args))
),
os.environ,
stdout=None,
)
joblist = Path("xstinitable.lis")
return joblist.read_text().splitlines()
def make_jobs(cmds):
print("generating jobs from commands")
padding = "".join(["%0", str(len(str(len(cmds)))), "d"])
return {padding % n: x for n, x in enumerate(cmds, start=1)}
def check_results(result_dirs):
fault = []
for dir in result_dirs:
if not dir.joinpath("xout_spect1.fits").exists():
fault.append(dir)
return fault
def get_model_name(jobs):
"""Get a job and find the model name"""
return re.search("modelname='(.*?)'", next(iter(jobs.values()))).group(1)
def make_run_dirs(run_dirs):
print("Making run dirs:", list(str(d) for d in run_dirs))
for dir in run_dirs:
dir.mkdir()
def setup_logging(log_file):
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)
def make_xstable(args, run_dirs, model_dir):
"""Build table models from xstar runs"""
if args:
base_file = Path(args[0]).with_suffix("fits")
base_file = Path(base_file.name) # make sure we get the local version
else:
base_file = Path("xstinitable.fits")
for dest_file in ["xout_ain.fits", "xout_aout.fits", "xout_mtable.fits"]:
copy_file(base_file, model_dir.joinpath(dest_file))
for run_dir in sorted(run_dirs):
run(
"{exe} xstarspec={spec}".format(
exe=get_executeable_dir().joinpath("xstar2table"),
spec=run_dir.joinpath("xout_spect1.fits"),
),
os.environ,
)
def process_jobs(pool, jobs, chunksize=1):
"""Run jobs in xstar"""
logging.info("Using Dir " + os.getcwd())
start_time = datetime.now()
logging.info("Start time: {}".format(start_time))
runs_return = pool.map(run_xstar, jobs.items(), chunksize)
for ret in runs_return:
logging.info(ret.strip())
end_time = datetime.now()
logging.info("End time: {}".format(end_time))
logging.info("Duration {}".format(end_time - start_time))
def main(options, args):
print("Checking enviroment")
check_enviroment(options.workdir)
workdir = make_new_dir(options.workdir)
print("New dir:", workdir)
os.chdir(workdir)
print("Getting jobs")
jobs = make_jobs(get_xstar_cmds(args, get_executeable_dir()))
model_dir = workdir.joinpath(get_model_name(jobs))
print("Model dir {}".format(model_dir))
if not model_dir.exists():
model_dir.mkdir()
os.chdir(model_dir)
run_dirs = [model_dir.joinpath(run_dir) for run_dir in jobs.keys()]
make_run_dirs(run_dirs)
# setup logging
setup_logging(options.log_file)
print("Starting jobs")
process_jobs(mp.Pool(processes=options.nproc), jobs)
failed = check_results(run_dirs)
if len(failed) > 0:
logging.info("Somethings not right in {}".format(",".join(map(str, failed))))
# Exit with non-zero code
sys.exit(1)
else:
os.chdir(workdir)
make_xstable(args, run_dirs, model_dir)
if not options.keeplog:
options.log_file.unlink()
if __name__ == "__main__":
main(*process_flags())