-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaddpvs.py
231 lines (202 loc) · 7.62 KB
/
addpvs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import argparse, chess, chess.engine, concurrent.futures, random, re
from multiprocessing import freeze_support, cpu_count
from time import time
from tqdm import tqdm
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def pv_status(fen, mate, pv):
# check if the given pv (list of uci moves) leads to checkmate #mate
losing_side = 1 if mate > 0 else 0
try:
board = chess.Board(fen)
for ply, move in enumerate(pv):
if ply % 2 == losing_side and board.can_claim_draw():
return "draw"
uci = chess.Move.from_uci(move)
if not uci in board.legal_moves:
raise Exception(f"illegal move {move} at position {board.epd()}")
board.push(uci)
except Exception as ex:
return f'error "{ex}"'
plies_to_checkmate = 2 * mate - 1 if mate > 0 else -2 * mate
if len(pv) < plies_to_checkmate:
return "short"
if len(pv) > plies_to_checkmate:
return "long"
if board.is_checkmate():
return "ok"
return "wrong"
def filtered_analysis(engine, board, limit=None, game=None):
info = {}
with engine.analysis(board, limit, game=game) as analysis:
for line in analysis:
if "score" in line and not ("upperbound" in line or "lowerbound" in line):
info = line
return info
class Analyser:
def __init__(self, args):
self.engine = args.engine
self.limit = chess.engine.Limit(
nodes=args.nodes, depth=args.depth, time=args.time, mate=args.mate
)
self.hash = args.hash
self.threads = args.threads
self.syzygyPath = args.syzygyPath
def analyze_fens(self, fens):
result_fens = []
engine = chess.engine.SimpleEngine.popen_uci(self.engine)
if self.hash is not None:
engine.configure({"Hash": self.hash})
if self.threads is not None:
engine.configure({"Threads": self.threads})
if self.syzygyPath is not None:
engine.configure({"SyzygyPath": self.syzygyPath})
for fen, bm, pvlength, line in fens:
pv = None
board = chess.Board(fen)
info = filtered_analysis(engine, board, self.limit, game=board)
m = info["score"].pov(board.turn).mate() if "score" in info else None
if m is not None and abs(m) <= abs(bm) and "pv" in info:
pv = [m.uci() for m in info["pv"]]
if m == bm and len(pv) <= pvlength: # no improvement
pv = None
result_fens.append((fen, bm, m, pv))
engine.quit()
return result_fens
if __name__ == "__main__":
freeze_support()
parser = argparse.ArgumentParser(
description="Add PVs for mates found by local analysis for positions in e.g. matetrack.epd.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--engine",
default="./stockfish",
help="name of the engine binary",
)
parser.add_argument(
"--nodes",
type=str,
help="nodes limit per position, default: 10**6 without other limits, otherwise None",
)
parser.add_argument("--depth", type=int, help="depth limit per position")
parser.add_argument("--mate", type=int, help="mate limit per position")
parser.add_argument(
"--time", type=float, help="time limit (in seconds) per position"
)
parser.add_argument("--hash", type=int, help="hash table size in MB")
parser.add_argument(
"--threads",
type=int,
help="number of threads per position",
)
parser.add_argument("--syzygyPath", help="path to syzygy EGTBs")
parser.add_argument(
"--concurrency",
type=int,
default=cpu_count(),
help="total number of threads script may use, default: cpu_count()",
)
parser.add_argument(
"--epdFile",
default="matetrack.epd",
help="file containing the positions, their mate scores, and possibly PVs",
)
parser.add_argument(
"--outFile",
default="matetrackpv.epd",
help="output file for mates with their mate scores and existing complete as well as newly found PVs",
)
parser.add_argument(
"--mateType",
choices=["all", "won", "lost"],
default="won",
help="type of positions to find PVs for (WARNING: use all or lost only for reliable engines!)",
)
args = parser.parse_args()
if args.nodes is None and args.depth is None and args.time is None:
args.nodes = 10**6
elif args.nodes is not None:
args.nodes = eval(args.nodes)
ana = Analyser(args)
p = re.compile("([0-9a-zA-Z/\- ]*) bm #([0-9\-]*);")
print("Loading FENs ...")
fens, ana_fens = [], []
with open(args.epdFile) as f:
for line in f:
m = p.match(line)
if not m:
print("---------------------> IGNORING : ", line)
else:
fen, bm = m.group(1), int(m.group(2))
_, _, pv = line.partition("; PV: ")
pv, _, _ = pv[:-1].partition(";") # remove '\n'
pv = pv.split()
if (
args.mateType == "all"
or args.mateType == "won"
and bm > 0
or args.mateType == "lost"
and bm < 0
) and pv_status(fen, bm, pv) != "ok":
ana_fens.append((fen, bm, len(pv), line))
fens.append((fen, line))
random.seed(42)
random.shuffle(ana_fens) # try to balance the analysis time across chunks
print(f"{len(fens)} FENs loaded, {len(ana_fens)} need analysis ...")
numfen = len(ana_fens)
workers = args.concurrency // (args.threads if args.threads else 1)
assert (
workers > 0
), f"Need concurrency >= threads, but concurrency = {args.concurrency} and threads = {args.threads}."
fw_ratio = numfen // (4 * workers)
fenschunked = list(chunks(ana_fens, max(1, fw_ratio)))
limits = [
("nodes", args.nodes),
("depth", args.depth),
("time", args.time),
("mate", args.mate),
("hash", args.hash),
("threads", args.threads),
("syzygyPath", args.syzygyPath),
]
msg = (
args.engine
+ " on "
+ args.epdFile
+ " with "
+ " ".join([f"--{k} {v}" for k, v in limits if v is not None])
)
print(f"\nMate search started for {msg} ...")
res = []
futures = []
with tqdm(total=len(fenschunked), smoothing=0, miniters=1) as pbar:
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as e:
for entry in fenschunked:
futures.append(e.submit(ana.analyze_fens, entry))
for future in concurrent.futures.as_completed(futures):
pbar.update(1)
res += future.result()
print("")
d = {}
count_found = better = 0
for fen, bm, m, pv in res:
if m is not None and abs(m) < abs(bm):
print(f"Found better mate #{m} for FEN {fen} bm #{bm}")
bm = m
better += 1
d[fen] = bm, pv
count_found += bool(pv is not None)
print(f"\nUsing {msg}")
print("Total PVs found: ", count_found)
if better:
print("Total better: ", better)
with open(args.outFile, "w") as f:
for fen, line in fens:
bm, pv = d.get(fen, (0, None))
if pv is not None:
f.write(f"{fen} bm #{bm}; PV: {' '.join(pv)};\n")
else:
f.write(line)