-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathjob.py
322 lines (273 loc) · 11.5 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#!/usr/bin/python3
# TODO do we use all these?
from datetime import datetime
from enum import Enum, auto
from subprocess import call
import argparse
import contextlib
import logging
import os
import re
import threading
import time
import psutil
import random
import sys
class UnmatchedJobError(Exception):
pass
def job_phases_for_tmpdir(d, all_jobs):
'''Return phase 2-tuples for jobs running on tmpdir d'''
return sorted([j.progress() for j in all_jobs if j.tmpdir == d])
def job_phases_for_dstdir(d, all_jobs):
'''Return phase 2-tuples for jobs outputting to dstdir d'''
return sorted([j.progress() for j in all_jobs if j.dstdir == d])
def is_plotting_cmdline(cmdline):
return (
len(cmdline) >= 4
#and 'python' in cmdline[0]
and 'chia2.exe' in cmdline[0]
and 'plots' == cmdline[1]
and 'create' == cmdline[2]
)
# TODO: be more principled and explicit about what we cache vs. what we look up
# dynamically from the logfile
class Job:
'Represents a plotter job'
# These are constants, not updated during a run.
k = 0
r = 0
u = 0
b = 0
n = 0 # probably not used
tmpdir = ''
tmp2dir = ''
dstdir = ''
logfile = ''
jobfile = ''
job_id = 0
plot_id = '--------'
proc = None # will get a psutil.Process
# These are dynamic, cached, and need to be udpated periodically
phase = (None, None) # Phase/subphase
def get_running_jobs(logroot, cached_jobs=()):
'''Return a list of running plot jobs. If a cache of preexisting jobs is provided,
reuse those previous jobs without updating their information. Always look for
new jobs not already in the cache.'''
jobs = []
cached_jobs_by_pid = { j.proc.pid: j for j in cached_jobs }
for proc in psutil.process_iter(['pid', 'cmdline']):
try:
# Ignore processes which most likely have terminated between the time of
# iteration and data access.
with contextlib.suppress(psutil.NoSuchProcess):
if is_plotting_cmdline(proc.cmdline()):
if proc.pid in cached_jobs_by_pid.keys():
jobs.append(cached_jobs_by_pid[proc.pid]) # Copy from cache
else:
with contextlib.suppress(UnmatchedJobError):
jobs.append(Job(proc, logroot))
except (PermissionError, psutil.AccessDenied):
test=0
#print ("Permission error or access denied on process")
return jobs
def __init__(self, proc, logroot):
'''Initialize from an existing psutil.Process object. must know logroot in order to understand open files'''
self.proc = proc
with self.proc.oneshot():
# Parse command line args
args = self.proc.cmdline()
assert len(args) > 4
assert 'chia2.exe' in args[0]
assert 'plots' == args[1]
assert 'create' == args[2]
args_iter = iter(args[3:])
for arg in args_iter:
val = None if arg in ['-e'] else next(args_iter)
if arg == '-k':
self.k = val
elif arg == '-r':
self.r = val
elif arg == '-b':
self.b = val
elif arg == '-u':
self.u = val
elif arg == '-t':
self.tmpdir = val
elif arg == '-2':
self.tmp2dir = val
elif arg == '-d':
self.dstdir = val
elif arg == '-n':
self.n = val
elif arg == '-e':
pass
# TODO: keep track of -e
else:
print('Warning: unrecognized args: %s %s' % (arg, val))
# Find logfile (whatever file is open under the log root). The
# file may be open more than once, e.g. for STDOUT and STDERR.
for f in self.proc.open_files():
if logroot in f.path:
if self.logfile:
assert self.logfile == f.path
else:
self.logfile = f.path
break
# Initialize data that needs to be loaded from the logfile
self.init_from_logfile()
def init_from_logfile(self):
'''Read plot ID and job start time from logfile. Return true if we
find all the info as expected, false otherwise'''
if not self.logfile:
raise UnmatchedJobError()
# Try reading for a while; it can take a while for the job to get started as it scans
# existing plot dirs (especially if they are NFS).
found_id = False
found_log = False
for attempt_number in range(3):
with open(self.logfile, 'r') as f:
for line in f:
m = re.match('^ID: ([0-9a-f]*)', line)
if m:
self.plot_id = m.group(1)
found_id = True
m = re.match(r'^Starting phase 1/4:.*\.\.\. (.*)', line)
if m:
# Mon Nov 2 08:39:53 2020
self.start_time = datetime.strptime(m.group(1), '%a %b %d %H:%M:%S %Y')
found_log = True
break # Stop reading lines in file
if found_id and found_log:
break # Stop trying
else:
time.sleep(1) # Sleep and try again
# If we couldn't find the line in the logfile, the job is probably just getting started
# (and being slow about it). In this case, use the last metadata change as the start time.
# TODO: we never come back to this; e.g. plot_id may remain uninitialized.
if not found_log:
self.start_time = datetime.fromtimestamp(os.path.getctime(self.logfile))
# Load things from logfile that are dynamic
self.update_from_logfile()
def update_from_logfile(self):
self.set_phase_from_logfile()
def set_phase_from_logfile(self):
assert self.logfile
# Map from phase number to subphase number reached in that phase.
# Phase 1 subphases are <started>, table1, table2, ...
# Phase 2 subphases are <started>, table7, table6, ...
# Phase 3 subphases are <started>, tables1&2, tables2&3, ...
# Phase 4 subphases are <started>
phase_subphases = {}
with open(self.logfile, 'r') as f:
for line in f:
# "Starting phase 1/4: Forward Propagation into tmp files... Sat Oct 31 11:27:04 2020"
m = re.match(r'^Starting phase (\d).*', line)
if m:
phase = int(m.group(1))
phase_subphases[phase] = 0
# Phase 1: "Computing table 2"
m = re.match(r'^Computing table (\d).*', line)
if m:
phase_subphases[1] = max(phase_subphases[1], int(m.group(1)))
# Phase 2: "Backpropagating on table 2"
m = re.match(r'^Backpropagating on table (\d).*', line)
if m:
phase_subphases[2] = max(phase_subphases[2], 7 - int(m.group(1)))
# Phase 3: "Compressing tables 4 and 5"
m = re.match(r'^Compressing tables (\d) and (\d).*', line)
if m:
phase_subphases[3] = max(phase_subphases[3], int(m.group(1)))
# TODO also collect timing info:
# "Time for phase 1 = 22796.7 seconds. CPU (98%) Tue Sep 29 17:57:19 2020"
# for phase in ['1', '2', '3', '4']:
# m = re.match(r'^Time for phase ' + phase + ' = (\d+.\d+) seconds..*', line)
# data.setdefault....
# Total time = 49487.1 seconds. CPU (97.26%) Wed Sep 30 01:22:10 2020
# m = re.match(r'^Total time = (\d+.\d+) seconds.*', line)
# if m:
# data.setdefault(key, {}).setdefault('total time', []).append(float(m.group(1)))
if phase_subphases:
phase = max(phase_subphases.keys())
self.phase = (phase, phase_subphases[phase])
else:
self.phase = (0, 0)
def progress(self):
'''Return a 2-tuple with the job phase and subphase (by reading the logfile)'''
return self.phase
def plot_id_prefix(self):
return self.plot_id[:8]
# TODO: make this more useful and complete, and/or make it configurable
def status_str_long(self):
return '{plot_id}\nk={k} r={r} b={b} u={u}\npid:{pid}\ntmp:{tmp}\ntmp2:{tmp2}\ndst:{dst}\nlogfile:{logfile}'.format(
plot_id = self.plot_id,
k = self.k,
r = self.r,
b = self.b,
u = self.u,
pid = self.proc.pid,
tmp = self.tmpdir,
tmp2 = self.tmp2dir,
dst = self.dstdir,
plotid = self.plot_id,
logfile = self.logfile
)
def get_mem_usage(self):
return self.proc.memory_info().vms # Total, inc swapped
def get_tmp_usage(self):
total_bytes = 0
with os.scandir(self.tmpdir) as it:
for entry in it:
if self.plot_id in entry.name:
try:
total_bytes += entry.stat().st_size
except FileNotFoundError:
# The file might disappear; this being an estimate we don't care
pass
return total_bytes
def get_run_status(self):
'''Running, suspended, etc.'''
status = self.proc.status()
if status == psutil.STATUS_RUNNING:
return 'RUN'
elif status == psutil.STATUS_SLEEPING:
return 'SLP'
elif status == psutil.STATUS_DISK_SLEEP:
return 'DSK'
elif status == psutil.STATUS_STOPPED:
return 'STP'
else:
return self.proc.status()
def get_time_wall(self):
return int((datetime.now() - self.start_time).total_seconds())
def get_time_user(self):
return int(self.proc.cpu_times().user)
def get_time_sys(self):
return int(self.proc.cpu_times().system)
def get_time_iowait(self):
return 0
#return int(self.proc.cpu_times().iowait)
def suspend(self, reason=''):
self.proc.suspend()
self.status_note = reason
def resume(self):
self.proc.resume()
def get_temp_files(self):
temp_files = []
for f in self.proc.open_files():
if self.tmpdir in f.path or self.tmp2dir in f.path or self.dstdir in f.path:
temp_files.append(f.path)
return temp_files
def cancel(self):
'Cancel an already running job'
# We typically suspend the job as the first action in killing it, so it
# doesn't create more tmp files during death. However, terminate() won't
# complete if the job is supsended, so we also need to resume it.
# TODO: check that this is best practice for killing a job.
self.proc.resume()
self.proc.terminate()
def check_status(self, expected_status):
if (self.status == expected_status):
return 1
else:
print('Expected status %s, actual %s', expected_status, self.status)
return 0