Skip to content

Commit

Permalink
Merge pull request #230 from conductor-sdk/sigkill
Browse files Browse the repository at this point in the history
Added logging and gracefully terminating workers
  • Loading branch information
v1r3n authored Dec 3, 2023
2 parents 5607fe7 + cd7ba20 commit 6ca03fd
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 16 deletions.
81 changes: 67 additions & 14 deletions src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.worker.worker import Worker
from conductor.client.worker.worker_interface import WorkerInterface
from multiprocessing import Process, freeze_support
from multiprocessing import Process, freeze_support, Queue
from configparser import ConfigParser
from logging.handlers import QueueHandler
from typing import List
import ast
import astor
Expand All @@ -20,7 +21,6 @@
)
)


def get_annotated_workers():
pkg = __get_client_topmost_package_filepath()
workers = __get_annotated_workers_from_subtree(pkg)
Expand All @@ -36,6 +36,7 @@ def __init__(
metrics_settings: MetricsSettings = None,
scan_for_annotated_workers: bool = None,
):
self.logger_process, self.queue = setup_logging_queue(configuration)
self.worker_config = load_worker_config()
if workers is None:
workers = []
Expand All @@ -61,7 +62,10 @@ def __exit__(self, exc_type, exc_value, traceback):
def stop_processes(self) -> None:
self.__stop_task_runner_processes()
self.__stop_metrics_provider_process()
logger.debug('stopped processes')
logger.info('Stopped worker processes...')
logger.info('Stopping logger process...')
self.queue.put(None)
self.logger_process.terminate()

def start_processes(self) -> None:
logger.info('Starting worker processes...')
Expand All @@ -71,9 +75,13 @@ def start_processes(self) -> None:
logger.info('Started all processes')

def join_processes(self) -> None:
self.__join_task_runner_processes()
self.__join_metrics_provider_process()
logger.info('Joined all processes')
try:
self.__join_task_runner_processes()
self.__join_metrics_provider_process()
logger.info('Joined all processes')
except KeyboardInterrupt:
logger.info('KeyboardInterrupt: Stopping all processes')
self.stop_processes()

def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -> None:
if metrics_settings == None:
Expand Down Expand Up @@ -108,7 +116,7 @@ def __create_task_runner_process(
worker, configuration, metrics_settings, self.worker_config
)
process = Process(
target=task_runner.run
target=task_runner.run, args=(self.queue,)
)
self.task_runner_processes.append(process)

Expand Down Expand Up @@ -145,13 +153,12 @@ def __stop_process(self, process: Process):
if process == None:
return
try:
process.kill()
logger.debug(f'Killed process: {process}')
except Exception as e:
logger.debug(f'Failed to kill process: {process}, reason: {e}')
logger.debug(f'Terminating process: {process.pid}')
process.terminate()
logger.debug('Terminated process: {process}')

except Exception as e:
logger.debug(f'Failed to terminate process: {process.pid}, reason: {e}')
process.kill()
logger.debug(f'Killed process: {process.pid}')

def __get_client_topmost_package_filepath():
module = inspect.getmodule(inspect.stack()[-1][0])
Expand Down Expand Up @@ -242,4 +249,50 @@ def load_worker_config():
return worker_config

def __get_config_file_path() -> str:
return os.getcwd() + "/worker.ini"
return os.getcwd() + "/worker.ini"

# Setup centralized logging queue
def setup_logging_queue(configuration):
queue = Queue()
logger.addHandler(QueueHandler(queue))
if configuration:
configuration.apply_logging_config()
log_level = configuration.log_level
logger_format = configuration.logger_format
else:
log_level = logging.DEBUG
logger_format = None

logger.setLevel(log_level)

# start the logger process
logger_p = Process(target=__logger_process, args=(queue, log_level, logger_format))
logger_p.start()
return logger_p, queue

# This process performs the centralized logging
def __logger_process(queue, log_level, logger_format=None):
c_logger = logging.getLogger(
Configuration.get_logging_formatted_name(
__name__
)
)

c_logger.setLevel(log_level)

# configure a stream handler
sh = logging.StreamHandler()
if logger_format:
formatter = logging.Formatter(logger_format)
sh.setFormatter(formatter)
c_logger.addHandler(sh)

# run forever
while True:
# consume a log message, block until one arrives
message = queue.get()
# check for shutdown
if message is None:
break
# log the message
c_logger.handle(message)
17 changes: 15 additions & 2 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL
from configparser import ConfigParser
from logging.handlers import QueueHandler
from multiprocessing import Queue
import logging
import sys
import time
Expand All @@ -20,7 +22,6 @@
)
)


class TaskRunner:
def __init__(
self,
Expand Down Expand Up @@ -48,9 +49,19 @@ def __init__(
)
)

def run(self) -> None:
def run(self, queue) -> None:
# Add a handler that uses the shared queue
if queue:
logger.addHandler(QueueHandler(queue))

if self.configuration != None:
self.configuration.apply_logging_config()
else:
logger.setLevel(logging.DEBUG)

task_names = ','.join(self.worker.task_definition_names)
logger.info(f'Started worker process for task(s): {task_names}')

while True:
try:
self.run_once()
Expand Down Expand Up @@ -209,6 +220,8 @@ def __wait_for_polling_interval(self) -> None:
time.sleep(polling_interval)

def __set_worker_properties(self) -> None:
# If multiple tasks are supplied to the same worker, then only first
# task will be considered for setting worker properties
task_type = self.worker.get_task_definition_name()

# Fetch from ENV Variables if present
Expand Down
11 changes: 11 additions & 0 deletions src/conductor/client/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ def logger_format(self, value):
"""
self.__logger_format = value

@property
def log_level(self):
"""The log level.
The log_level will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
return self.__log_level

def apply_logging_config(self):
logging.basicConfig(
format=self.logger_format,
Expand Down
7 changes: 7 additions & 0 deletions src/conductor/client/worker/worker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def get_task_definition_name(self) -> str:
"""
return self.task_definition_name_cache

@property
def task_definition_names(self):
if isinstance(self.task_definition_name, list):
return self.task_definition_name
else:
return [self.task_definition_name]

@property
def task_definition_name_cache(self):
if self._task_definition_name_cache is None:
Expand Down

0 comments on commit 6ca03fd

Please sign in to comment.