-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathplotman.py
201 lines (159 loc) · 6.58 KB
/
plotman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/python3
from datetime import datetime
from subprocess import call
import argparse
import os
import re
import threading
import random
#import readline
import readline # For nice CLI
import sys
import time
import yaml
import shutil
# Plotman libraries
from job import Job
import analyzer
import archive
import interactive
import manager
import plot_util
import reporting
class PlotmanArgParser:
def add_idprefix_arg(self, subparser):
subparser.add_argument(
'idprefix',
type=str,
nargs='+',
help='disambiguating prefix of plot ID')
def parse_args(self):
parser = argparse.ArgumentParser(description='Chia plotting manager.')
sp = parser.add_subparsers(dest='cmd')
p_status = sp.add_parser('status', help='show current plotting status')
p_dirs = sp.add_parser('dirs', help='show directories info')
p_interactive = sp.add_parser('interactive', help='run interactive control/montioring mode')
p_dst_sch = sp.add_parser('dsched', help='print destination dir schedule')
p_plot = sp.add_parser('plot', help='run plotting loop')
p_archive = sp.add_parser('archive',
help='move completed plots to farming location')
p_details = sp.add_parser('details', help='show details for job')
self.add_idprefix_arg(p_details)
p_files = sp.add_parser('files', help='show temp files associated with job')
self.add_idprefix_arg(p_files)
p_kill = sp.add_parser('kill', help='kill job (and cleanup temp files)')
self.add_idprefix_arg(p_kill)
p_suspend = sp.add_parser('suspend', help='suspend job')
self.add_idprefix_arg(p_suspend)
p_resume = sp.add_parser('resume', help='resume suspended job')
self.add_idprefix_arg(p_resume)
p_analyze = sp.add_parser('analyze',
help='analyze timing stats of completed jobs')
p_analyze.add_argument('logfile', type=str, nargs='+',
help='logfile(s) to analyze')
args = parser.parse_args()
return args
if __name__ == "__main__":
random.seed()
pm_parser = PlotmanArgParser()
args = pm_parser.parse_args()
with open('config.yaml', 'r') as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
dir_cfg = cfg['directories']
sched_cfg = cfg['scheduling']
plotting_cfg = cfg['plotting']
#
# Stay alive, spawning plot jobs
#
if args.cmd == 'plot':
print('...starting plot loop')
while True:
wait_reason = manager.maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg)
# TODO: report this via a channel that can be polled on demand, so we don't spam the console
sleep_s = int(sched_cfg['polling_time_s'])
if wait_reason:
print('...sleeping %d s: %s' % (sleep_s, wait_reason))
time.sleep(sleep_s)
#
# Analysis of completed jobs
#
elif args.cmd == 'analyze':
analyzer = analyzer.LogAnalyzer()
analyzer.analyze(args.logfile)
else:
# print('...scanning process tables')
jobs = Job.get_running_jobs(dir_cfg['log'])
# Status report
if args.cmd == 'status':
(columns, rows) = shutil.get_terminal_size()
print(reporting.status_report(jobs, int(columns)))
# Directories report
elif args.cmd == 'dirs':
(columns, rows) = shutil.get_terminal_size()
print(reporting.dirs_report(jobs, dir_cfg, sched_cfg, int(columns)))
elif args.cmd == 'interactive':
interactive.run_interactive()
# Start running archival
elif args.cmd == 'archive':
print('...starting archive loop')
firstit = True
while True:
if not firstit:
print('Sleeping 60s until next iteration...')
time.sleep(60)
jobs = Job.get_running_jobs(dir_cfg['log'])
firstit = False
archive.archive(dir_cfg, jobs)
# Debugging: show the destination drive usage schedule
elif args.cmd == 'dsched':
dstdirs = dir_cfg['dst']
for (d, ph) in manager.dstdirs_to_furthest_phase(jobs).items():
print(' %s : %s' % (d, str(ph)))
#
# Job control commands
#
elif args.cmd in [ 'details', 'files', 'kill', 'suspend', 'resume' ]:
print(args)
selected = []
# TODO: clean up treatment of wildcard
if args.idprefix[0] == 'all':
selected = jobs
else:
# TODO: allow multiple idprefixes, not just take the first
selected = manager.select_jobs_by_partial_id(jobs, args.idprefix[0])
if (len(selected) == 0):
print('Error: %s matched no jobs.' % id_spec)
elif len(selected) > 1:
print('Error: "%s" matched multiple jobs:' % id_spec)
for j in selected:
print(' %s' % j.plot_id)
selected = []
for job in selected:
if args.cmd == 'details':
print(job.status_str_long())
elif args.cmd == 'files':
temp_files = job.get_temp_files()
for f in temp_files:
print(' %s' % f)
elif args.cmd == 'kill':
# First suspend so job doesn't create new files
print('Pausing PID %d, plot id %s' % (job.proc.pid, job.plot_id))
job.suspend()
temp_files = job.get_temp_files()
print('Will kill pid %d, plot id %s' % (job.proc.pid, job.plot_id))
print('Will delete %d temp files' % len(temp_files))
conf = input('Are you sure? ("y" to confirm): ')
if (conf != 'y'):
print('canceled. If you wish to resume the job, do so manually.')
else:
print('killing...')
job.cancel()
print('cleaing up temp files...')
for f in temp_files:
os.remove(f)
elif args.cmd == 'suspend':
print('Suspending ' + job.plot_id)
job.suspend()
elif args.cmd == 'resume':
print('Resuming ' + job.plot_id)
job.resume()