Skip to content

Commit

Permalink
Fixes #33: rewrite processor initialization, fix task queue and synch…
Browse files Browse the repository at this point in the history
…ronization issue on Linux (#40)

* Fixes a bug in the async priority queue when trying to remove a suspended task.

Signed-off-by: rafa-be <[email protected]>

* Fixes a worker agent crash when trying to profile a zombie process.

Signed-off-by: rafa-be <[email protected]>

* Fixes #33: processors can be suspended during the initialization phase.

Signed-off-by: rafa-be <[email protected]>

* The worker's heart-beat manager watches all worker processes, not only the active one.

Signed-off-by: rafa-be <[email protected]>

* Task priorities are now positive numbers.

Signed-off-by: rafa-be <[email protected]>

---------

Signed-off-by: rafa-be <[email protected]>
  • Loading branch information
rafa-be authored Oct 29, 2024
1 parent df9931e commit a770072
Show file tree
Hide file tree
Showing 16 changed files with 204 additions and 134 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.12"
__version__ = "1.8.13"
2 changes: 1 addition & 1 deletion scaler/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def __get_task_flags(self) -> TaskFlags:
parent_task_priority = self.__get_parent_task_priority()

if parent_task_priority is not None:
task_priority = parent_task_priority - 1
task_priority = parent_task_priority + 1
else:
task_priority = 0

Expand Down
2 changes: 1 addition & 1 deletion scaler/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

# if didn't receive heartbeat for following seconds, then scheduler will treat client as dead and cancel remaining
# tasks for this client
DEFAULT_CLIENT_TIMEOUT_SECONDS = 600
DEFAULT_CLIENT_TIMEOUT_SECONDS = 60

# number of seconds for load balance, if value is 0 means disable load balance
DEFAULT_LOAD_BALANCE_SECONDS = 1
Expand Down
3 changes: 2 additions & 1 deletion scaler/utility/queues/async_priority_queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import heapq
import sys
from asyncio import Queue
from typing import Dict, List, Tuple, Union

Expand Down Expand Up @@ -59,7 +60,7 @@ def __to_lowest_priority(cls, original_priority: PriorityType) -> PriorityType:
if isinstance(original_priority, tuple):
return tuple(cls.__to_lowest_priority(value) for value in original_priority)
else:
return -1
return -sys.maxsize - 1

@classmethod
def __to_lower_priority(cls, original_priority: PriorityType) -> PriorityType:
Expand Down
27 changes: 17 additions & 10 deletions scaler/worker/agent/heartbeat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
class VanillaHeartbeatManager(Looper, HeartbeatManager):
def __init__(self):
self._agent_process = psutil.Process()
self._worker_process: Optional[psutil.Process] = None

self._connector_external: Optional[AsyncConnector] = None
self._worker_task_manager: Optional[TaskManager] = None
Expand All @@ -36,9 +35,6 @@ def register(
self._timeout_manager = timeout_manager
self._processor_manager = processor_manager

def set_processor_pid(self, process_id: int):
self._worker_process = psutil.Process(process_id)

async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho):
if self._start_timestamp_ns == 0:
# not handling echo if we didn't send out heartbeat
Expand All @@ -49,17 +45,21 @@ async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho):
self._timeout_manager.update_last_seen_time()

async def routine(self):
if self._worker_process is None:
processors = self._processor_manager.processors()

if len(processors) == 0:
return

if self._start_timestamp_ns != 0:
# already sent heartbeat, expecting heartbeat echo, so not sending
return

if self._worker_process.status() in {psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD}:
await self._processor_manager.on_failing_task(self._worker_process.status())
for processor_holder in processors:
status = processor_holder.process().status()
if status in {psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD}:
await self._processor_manager.on_failing_processor(processor_holder.processor_id(), status)

processors = self._processor_manager.processors()
processors = self._processor_manager.processors() # refreshes for removed dead and zombie processors
num_suspended_processors = self._processor_manager.num_suspended_processors()

await self._connector_external.send(
Expand All @@ -68,7 +68,7 @@ async def routine(self):
psutil.virtual_memory().available,
self._worker_task_manager.get_queued_size() - num_suspended_processors,
self._latency_us,
self._processor_manager.task_lock(),
self._processor_manager.can_accept_task(),
[self.__get_processor_status_from_holder(processor) for processor in processors],
)
)
Expand All @@ -77,10 +77,17 @@ async def routine(self):
@staticmethod
def __get_processor_status_from_holder(processor: ProcessorHolder) -> ProcessorStatus:
process = processor.process()

try:
resource = Resource.new_msg(int(process.cpu_percent() * 10), process.memory_info().rss)
except psutil.ZombieProcess:
# Assumes dead processes do not use any resources
resource = Resource.new_msg(0, 0)

return ProcessorStatus.new_msg(
processor.pid(),
processor.initialized(),
processor.task() is not None,
processor.suspended(),
Resource.new_msg(int(process.cpu_percent() * 10), process.memory_info().rss),
resource,
)
22 changes: 7 additions & 15 deletions scaler/worker/agent/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@


class HeartbeatManager(metaclass=abc.ABCMeta):
@abc.abstractmethod
def set_processor_pid(self, process_id: int):
raise NotImplementedError()

@abc.abstractmethod
async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho):
raise NotImplementedError()
Expand Down Expand Up @@ -58,31 +54,31 @@ def on_object_response(self, request: ObjectResponse):
raise NotImplementedError()

@abc.abstractmethod
async def acquire_task_active_lock(self):
def can_accept_task(self) -> bool:
raise NotImplementedError()

@abc.abstractmethod
async def on_task(self, task: Task) -> bool:
async def wait_until_can_accept_task(self):
raise NotImplementedError()

@abc.abstractmethod
def on_cancel_task(self, task_id: bytes) -> Optional[Task]:
async def on_task(self, task: Task) -> bool:
raise NotImplementedError()

@abc.abstractmethod
async def on_failing_task(self, error: str):
async def on_cancel_task(self, task_id: bytes) -> Optional[Task]:
raise NotImplementedError()

@abc.abstractmethod
def on_suspend_task(self, task_id: bytes) -> bool:
async def on_failing_processor(self, processor_id: bytes, process_status: str):
raise NotImplementedError()

@abc.abstractmethod
def on_resume_task(self, task_id: bytes) -> bool:
async def on_suspend_task(self, task_id: bytes) -> bool:
raise NotImplementedError()

@abc.abstractmethod
def restart_current_processor(self, reason: str):
def on_resume_task(self, task_id: bytes) -> bool:
raise NotImplementedError()

@abc.abstractmethod
Expand All @@ -109,10 +105,6 @@ def processors(self) -> List[ProcessorHolder]:
def num_suspended_processors(self) -> int:
raise NotImplementedError()

@abc.abstractmethod
def task_lock(self) -> bool:
raise NotImplementedError()


class ProfilingManager(metaclass=abc.ABCMeta):
@abc.abstractmethod
Expand Down
8 changes: 8 additions & 0 deletions scaler/worker/agent/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
event_loop: str,
address: ZMQConfig,
resume_event: Optional[EventType],
resumed_event: Optional[EventType],
garbage_collect_interval_seconds: int,
trim_memory_threshold_bytes: int,
logging_paths: Tuple[str, ...],
Expand All @@ -52,6 +53,7 @@ def __init__(
self._address = address

self._resume_event = resume_event
self._resumed_event = resumed_event

self._garbage_collect_interval_seconds = garbage_collect_interval_seconds
self._trim_memory_threshold_bytes = trim_memory_threshold_bytes
Expand Down Expand Up @@ -108,8 +110,14 @@ def __interrupt(self, *args):

def __suspend(self, *args):
assert self._resume_event is not None
assert self._resumed_event is not None

self._resume_event.wait() # stops any computation in the main thread until the event is triggered

# Ensures the processor agent knows we stopped waiting on `_resume_event`, as to avoid re-entrant wait on the
# event.
self._resumed_event.set()

def __run_forever(self):
try:
self._connector.send(ProcessorInitialized.new_msg())
Expand Down
27 changes: 17 additions & 10 deletions scaler/worker/agent/processor_holder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import asyncio
import logging
import os
import signal
from multiprocessing import Event
from typing import Optional, Tuple

import multiprocessing
import psutil

from scaler.io.config import DEFAULT_PROCESSOR_KILL_DELAY_SECONDS
Expand All @@ -26,19 +25,22 @@ def __init__(
):
self._processor_id: Optional[bytes] = None
self._task: Optional[Task] = None
self._initialized = asyncio.Event()
self._suspended = False

self._hard_suspend = hard_suspend
if hard_suspend:
self._resume_event = None
self._resumed_event = None
else:
self._resume_event = Event()
context = multiprocessing.get_context("spawn")
self._resume_event = context.Event()
self._resumed_event = context.Event()

self._processor = Processor(
event_loop=event_loop,
address=address,
resume_event=self._resume_event,
resumed_event=self._resumed_event,
garbage_collect_interval_seconds=garbage_collect_interval_seconds,
trim_memory_threshold_bytes=trim_memory_threshold_bytes,
logging_paths=logging_paths,
Expand All @@ -59,14 +61,10 @@ def processor_id(self) -> bytes:
return self._processor_id

def initialized(self) -> bool:
return self._initialized.is_set()
return self._processor_id is not None

def wait_initialized(self):
return self._initialized.wait()

def set_initialized(self, processor_id: bytes):
def initialize(self, processor_id: bytes):
self._processor_id = processor_id
self._initialized.set()

def task(self) -> Optional[Task]:
return self._task
Expand All @@ -81,6 +79,7 @@ def suspend(self):
assert self._processor is not None
assert self._task is not None
assert self._suspended is False
assert self.initialized()

if self._hard_suspend:
self.__send_signal("SIGSTOP")
Expand All @@ -92,7 +91,9 @@ def suspend(self):
# See https://github.com/Citi/scaler/issues/14

assert self._resume_event is not None
assert self._resumed_event is not None
self._resume_event.clear()
self._resumed_event.clear()

self.__send_signal(SUSPEND_SIGNAL)

Expand All @@ -106,8 +107,14 @@ def resume(self):
self.__send_signal("SIGCONT")
else:
assert self._resume_event is not None
assert self._resumed_event is not None

self._resume_event.set()

# Waits until the processor resumes processing. This avoids any future call to `suspend()` while the
# processor hasn't returned from the `_resumed_event.wait()` call yet (causes a re-entrant error on Linux).
self._resumed_event.wait()

self._suspended = False

def kill(self):
Expand Down
Loading

0 comments on commit a770072

Please sign in to comment.