From 2b6315dd9b5349d173e6e5ef1886d7726671bce7 Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Wed, 29 May 2024 11:37:50 +0000 Subject: [PATCH] Presumably a fix for deadlock (see #2041). We use a separate lock to update aggregates. To this end we extend self.active_run_lock() with an optional argument "name" to be able to have different locks associated with the same run_id. --- server/fishtest/rundb.py | 41 +++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 3dfd2c794..772c394ae 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -76,14 +76,14 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): def set_inactive_run(self, run): run_id = run["_id"] - with self.active_run_lock(str(run_id)): + with self.active_run_lock(run_id, name="aggregates"): run["workers"] = run["cores"] = 0 for task in run["tasks"]: task["active"] = False def set_inactive_task(self, task_id, run): run_id = run["_id"] - with self.active_run_lock(str(run_id)): + with self.active_run_lock(run_id, name="aggregates"): task = run["tasks"][task_id] if task["active"]: run["workers"] -= 1 @@ -98,7 +98,8 @@ def update_workers_cores(self): workers = cores = 0 run_id = r["_id"] run = self.get_run(run_id) - with self.active_run_lock(str(run_id)): + changed = False + with self.active_run_lock(run_id, name="aggregates"): for task in run["tasks"]: if task["active"]: workers += 1 @@ -110,7 +111,9 @@ def update_workers_cores(self): flush=True, ) run["workers"], run["cores"] = workers, cores - self.buffer(run, False) + changed = True + if changed: + self.buffer(run, False) def new_run( self, @@ -1015,7 +1018,7 @@ def priority(run): # lower is better # Now we create a new task for this run. run_id = run["_id"] - with self.active_run_lock(str(run_id)): + with self.active_run_lock(run_id): # It may happen that the run we have selected is now finished. # Since this is very rare we just return instead of cluttering the # code with remedial actions. @@ -1038,6 +1041,7 @@ def priority(run): # lower is better ) task_size = min(self.worker_cap(run, worker_info), remaining) + task_id = len(run["tasks"]) task = { "num_games": task_size, "active": True, @@ -1053,12 +1057,11 @@ def priority(run): # lower is better "pentanomial": 5 * [0], }, } - run["tasks"].append(task) - task_id = len(run["tasks"]) - 1 - - run["workers"] += 1 - run["cores"] += task["worker_info"]["concurrency"] + with self.active_run_lock(run_id, name="aggregates"): + run["tasks"].append(task) + run["workers"] += 1 + run["cores"] += task["worker_info"]["concurrency"] self.buffer(run, False) @@ -1081,7 +1084,10 @@ def priority(run): # lower is better active_runs = {} purge_count = 0 - def active_run_lock(self, id): + def active_run_lock(self, id, name="run"): + valid_names = {"run", "aggregates"} + assert name in valid_names + id = str(id) with self.run_lock: self.purge_count = self.purge_count + 1 if self.purge_count > 100000: @@ -1090,12 +1096,13 @@ def active_run_lock(self, id): (k, v) for k, v in self.active_runs.items() if v["time"] >= old ) self.purge_count = 0 - if id in self.active_runs: - active_lock = self.active_runs[id]["lock"] - self.active_runs[id]["time"] = time.time() + key = (id, name) + if key in self.active_runs: + active_lock = self.active_runs[key]["lock"] + self.active_runs[key]["time"] = time.time() else: - active_lock = threading.RLock() - self.active_runs[id] = {"time": time.time(), "lock": active_lock} + active_lock = threading.Lock() + self.active_runs[key] = {"time": time.time(), "lock": active_lock} return active_lock def finished_run_message(self, run): @@ -1168,7 +1175,7 @@ def handle_crash_or_time(self, run, task_id): ) def update_task(self, worker_info, run_id, task_id, stats, spsa): - lock = self.active_run_lock(str(run_id)) + lock = self.active_run_lock(run_id) with lock: return self.sync_update_task(worker_info, run_id, task_id, stats, spsa)