Skip to content

Commit

Permalink
Presumably a fix for deadlock (see official-stockfish#2041).
Browse files Browse the repository at this point in the history
We use a separate lock to update aggregates. To this end we
extend self.active_run_lock() with an optional argument
"name" to be able to have different locks associated with the same
run_id.
  • Loading branch information
vdbergh committed May 29, 2024
1 parent dac6960 commit 2b6315d
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):

def set_inactive_run(self, run):
run_id = run["_id"]
with self.active_run_lock(str(run_id)):
with self.active_run_lock(run_id, name="aggregates"):
run["workers"] = run["cores"] = 0
for task in run["tasks"]:
task["active"] = False

def set_inactive_task(self, task_id, run):
run_id = run["_id"]
with self.active_run_lock(str(run_id)):
with self.active_run_lock(run_id, name="aggregates"):
task = run["tasks"][task_id]
if task["active"]:
run["workers"] -= 1
Expand All @@ -98,7 +98,8 @@ def update_workers_cores(self):
workers = cores = 0
run_id = r["_id"]
run = self.get_run(run_id)
with self.active_run_lock(str(run_id)):
changed = False
with self.active_run_lock(run_id, name="aggregates"):
for task in run["tasks"]:
if task["active"]:
workers += 1
Expand All @@ -110,7 +111,9 @@ def update_workers_cores(self):
flush=True,
)
run["workers"], run["cores"] = workers, cores
self.buffer(run, False)
changed = True
if changed:
self.buffer(run, False)

def new_run(
self,
Expand Down Expand Up @@ -1015,7 +1018,7 @@ def priority(run): # lower is better

# Now we create a new task for this run.
run_id = run["_id"]
with self.active_run_lock(str(run_id)):
with self.active_run_lock(run_id):
# It may happen that the run we have selected is now finished.
# Since this is very rare we just return instead of cluttering the
# code with remedial actions.
Expand All @@ -1038,6 +1041,7 @@ def priority(run): # lower is better
)

task_size = min(self.worker_cap(run, worker_info), remaining)
task_id = len(run["tasks"])
task = {
"num_games": task_size,
"active": True,
Expand All @@ -1053,12 +1057,11 @@ def priority(run): # lower is better
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)

task_id = len(run["tasks"]) - 1

run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]
with self.active_run_lock(run_id, name="aggregates"):
run["tasks"].append(task)
run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]

self.buffer(run, False)

Expand All @@ -1081,7 +1084,10 @@ def priority(run): # lower is better
active_runs = {}
purge_count = 0

def active_run_lock(self, id):
def active_run_lock(self, id, name="run"):
valid_names = {"run", "aggregates"}
assert name in valid_names
id = str(id)
with self.run_lock:
self.purge_count = self.purge_count + 1
if self.purge_count > 100000:
Expand All @@ -1090,12 +1096,13 @@ def active_run_lock(self, id):
(k, v) for k, v in self.active_runs.items() if v["time"] >= old
)
self.purge_count = 0
if id in self.active_runs:
active_lock = self.active_runs[id]["lock"]
self.active_runs[id]["time"] = time.time()
key = (id, name)
if key in self.active_runs:
active_lock = self.active_runs[key]["lock"]
self.active_runs[key]["time"] = time.time()
else:
active_lock = threading.RLock()
self.active_runs[id] = {"time": time.time(), "lock": active_lock}
active_lock = threading.Lock()
self.active_runs[key] = {"time": time.time(), "lock": active_lock}
return active_lock

def finished_run_message(self, run):
Expand Down Expand Up @@ -1168,7 +1175,7 @@ def handle_crash_or_time(self, run, task_id):
)

def update_task(self, worker_info, run_id, task_id, stats, spsa):
lock = self.active_run_lock(str(run_id))
lock = self.active_run_lock(run_id)
with lock:
return self.sync_update_task(worker_info, run_id, task_id, stats, spsa)

Expand Down

0 comments on commit 2b6315d

Please sign in to comment.