Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/dask/distributed into 313
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Nov 14, 2024
2 parents d49e34a + d7eff77 commit bbb96f1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
30 changes: 15 additions & 15 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7443,7 +7443,7 @@ async def retire_workers(
close_workers: bool = False,
remove: bool = True,
stimulus_id: str | None = None,
) -> dict[str, Any]: ...
) -> list[str]: ...

@overload
async def retire_workers(
Expand All @@ -7453,7 +7453,7 @@ async def retire_workers(
close_workers: bool = False,
remove: bool = True,
stimulus_id: str | None = None,
) -> dict[str, Any]: ...
) -> list[str]: ...

@overload
async def retire_workers(
Expand All @@ -7469,7 +7469,7 @@ async def retire_workers(
minimum: int | None = None,
target: int | None = None,
attribute: str = "address",
) -> dict[str, Any]: ...
) -> list[str]: ...

@log_errors
async def retire_workers(
Expand All @@ -7481,7 +7481,7 @@ async def retire_workers(
remove: bool = True,
stimulus_id: str | None = None,
**kwargs: Any,
) -> dict[str, Any]:
) -> list[str]:
"""Gracefully retire workers from cluster. Any key that is in memory exclusively
on the retired workers is replicated somewhere else.
Expand Down Expand Up @@ -7559,7 +7559,7 @@ async def retire_workers(
self.workers[address] for address in self.workers_to_close(**kwargs)
}
if not wss:
return {}
return []

stop_amm = False
amm: ActiveMemoryManagerExtension | None = self.extensions.get("amm")
Expand Down Expand Up @@ -7609,13 +7609,13 @@ async def retire_workers(
# time (depending on interval settings)
amm.run_once()

workers_info_ok = {}
workers_info_abort = {}
for addr, result, info in await asyncio.gather(*coros):
workers_info_ok = []
workers_info_abort = []
for addr, result in await asyncio.gather(*coros):
if result == "OK":
workers_info_ok[addr] = info
workers_info_ok.append(addr)
else:
workers_info_abort[addr] = info
workers_info_abort.append(addr)

finally:
if stop_amm:
Expand All @@ -7625,8 +7625,8 @@ async def retire_workers(
"all",
{
"action": "retire-workers",
"retired": workers_info_ok,
"could-not-retire": workers_info_abort,
"retired": list(workers_info_ok),
"could-not-retire": list(workers_info_abort),
"stimulus_id": stimulus_id,
},
)
Expand All @@ -7649,7 +7649,7 @@ async def _track_retire_worker(
close: bool,
remove: bool,
stimulus_id: str,
) -> tuple[str, Literal["OK", "no-recipients"], dict]:
) -> tuple[str, Literal["OK", "no-recipients"]]:
while not policy.done():
# Sleep 0.01s when there are 4 tasks or less
# Sleep 0.5s when there are 200 or more
Expand All @@ -7671,7 +7671,7 @@ async def _track_retire_worker(
f"Could not retire worker {ws.address!r}: unique data could not be "
f"moved to any other worker ({stimulus_id=!r})"
)
return ws.address, "no-recipients", ws.identity()
return ws.address, "no-recipients"

logger.debug(
f"All unique keys on worker {ws.address!r} have been replicated elsewhere"
Expand All @@ -7685,7 +7685,7 @@ async def _track_retire_worker(
self.close_worker(ws.address)

logger.info(f"Retired worker {ws.address!r} ({stimulus_id=!r})")
return ws.address, "OK", ws.identity()
return ws.address, "OK"

def add_keys(
self, worker: str, keys: Collection[Key] = (), stimulus_id: str | None = None
Expand Down
8 changes: 5 additions & 3 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b):

x = await c.scatter("x", workers=[a.address])
out = await c.retire_workers([a.address])
assert out == {}
assert not out
assert not s.extensions["amm"].policies
assert set(s.workers) == {a.address, b.address}

Expand Down Expand Up @@ -1230,7 +1230,7 @@ async def test_RetireWorker_with_actor(c, s, a, b, has_proxy):

with captured_logger("distributed.active_memory_manager", logging.WARNING) as log:
out = await c.retire_workers([a.address])
assert out == {}
assert not out
assert "it holds actor(s)" in log.getvalue()
assert "x" in a.state.actors

Expand All @@ -1250,7 +1250,7 @@ async def test_RetireWorker_with_actor_proxy(c, s, a, b):
assert "y" in b.data

out = await c.retire_workers([b.address])
assert out.keys() == {b.address}
assert out == (b.address,)
assert "x" in a.state.actors
assert "y" in a.data

Expand Down Expand Up @@ -1301,6 +1301,7 @@ async def tensordot_stress(c, s):
assert sum(t.start == "memory" for t in s.transition_log) == expected_tasks


@pytest.mark.slow
@gen_cluster(
client=True,
nthreads=[("", 1)] * 4,
Expand Down Expand Up @@ -1356,6 +1357,7 @@ async def test_ReduceReplicas_stress(c, s, *workers):
await tensordot_stress(c, s)


@pytest.mark.slow
@pytest.mark.parametrize("use_ReduceReplicas", [False, True])
@gen_cluster(
client=True,
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,6 @@ async def test_retire_workers(c, s, a, b):

workers = await s.retire_workers()
assert list(workers) == [a.address]
assert workers[a.address]["nthreads"] == a.state.nthreads
assert list(s.workers) == [b.address]

assert s.workers_to_close() == []
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,7 @@ async def test_execute_preamble_abort_retirement(c, s):

# b has shut down. There's nowhere to replicate x to anymore, so retire_workers
# will give up and reinstate a to running status.
assert await retire_fut == {}
assert not await retire_fut
while a.status != Status.running:
await asyncio.sleep(0.01)

Expand Down

0 comments on commit bbb96f1

Please sign in to comment.