diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index 1714320a..5b4ba525 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -4,8 +4,9 @@ from conductor.client.telemetry.metrics_collector import MetricsCollector from conductor.client.worker.worker import Worker from conductor.client.worker.worker_interface import WorkerInterface -from multiprocessing import Process, freeze_support +from multiprocessing import Process, freeze_support, Queue from configparser import ConfigParser +from logging.handlers import QueueHandler from typing import List import ast import astor @@ -20,7 +21,6 @@ ) ) - def get_annotated_workers(): pkg = __get_client_topmost_package_filepath() workers = __get_annotated_workers_from_subtree(pkg) @@ -36,6 +36,7 @@ def __init__( metrics_settings: MetricsSettings = None, scan_for_annotated_workers: bool = None, ): + self.logger_process, self.queue = setup_logging_queue(configuration) self.worker_config = load_worker_config() if workers is None: workers = [] @@ -61,7 +62,10 @@ def __exit__(self, exc_type, exc_value, traceback): def stop_processes(self) -> None: self.__stop_task_runner_processes() self.__stop_metrics_provider_process() - logger.debug('stopped processes') + logger.info('Stopped worker processes...') + logger.info('Stopping logger process...') + self.queue.put(None) + self.logger_process.terminate() def start_processes(self) -> None: logger.info('Starting worker processes...') @@ -71,9 +75,13 @@ def start_processes(self) -> None: logger.info('Started all processes') def join_processes(self) -> None: - self.__join_task_runner_processes() - self.__join_metrics_provider_process() - logger.info('Joined all processes') + try: + self.__join_task_runner_processes() + self.__join_metrics_provider_process() + logger.info('Joined all processes') + except KeyboardInterrupt: + logger.info('KeyboardInterrupt: Stopping all processes') + self.stop_processes() def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -> None: if metrics_settings == None: @@ -108,7 +116,7 @@ def __create_task_runner_process( worker, configuration, metrics_settings, self.worker_config ) process = Process( - target=task_runner.run + target=task_runner.run, args=(self.queue,) ) self.task_runner_processes.append(process) @@ -145,13 +153,12 @@ def __stop_process(self, process: Process): if process == None: return try: - process.kill() - logger.debug(f'Killed process: {process}') - except Exception as e: - logger.debug(f'Failed to kill process: {process}, reason: {e}') + logger.debug(f'Terminating process: {process.pid}') process.terminate() - logger.debug('Terminated process: {process}') - + except Exception as e: + logger.debug(f'Failed to terminate process: {process.pid}, reason: {e}') + process.kill() + logger.debug(f'Killed process: {process.pid}') def __get_client_topmost_package_filepath(): module = inspect.getmodule(inspect.stack()[-1][0]) @@ -242,4 +249,50 @@ def load_worker_config(): return worker_config def __get_config_file_path() -> str: - return os.getcwd() + "/worker.ini" \ No newline at end of file + return os.getcwd() + "/worker.ini" + +# Setup centralized logging queue +def setup_logging_queue(configuration): + queue = Queue() + logger.addHandler(QueueHandler(queue)) + if configuration: + configuration.apply_logging_config() + log_level = configuration.log_level + logger_format = configuration.logger_format + else: + log_level = logging.DEBUG + logger_format = None + + logger.setLevel(log_level) + + # start the logger process + logger_p = Process(target=__logger_process, args=(queue, log_level, logger_format)) + logger_p.start() + return logger_p, queue + +# This process performs the centralized logging +def __logger_process(queue, log_level, logger_format=None): + c_logger = logging.getLogger( + Configuration.get_logging_formatted_name( + __name__ + ) + ) + + c_logger.setLevel(log_level) + + # configure a stream handler + sh = logging.StreamHandler() + if logger_format: + formatter = logging.Formatter(logger_format) + sh.setFormatter(formatter) + c_logger.addHandler(sh) + + # run forever + while True: + # consume a log message, block until one arrives + message = queue.get() + # check for shutdown + if message is None: + break + # log the message + c_logger.handle(message) \ No newline at end of file diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index a8da405d..970564f6 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -8,6 +8,8 @@ from conductor.client.telemetry.metrics_collector import MetricsCollector from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL from configparser import ConfigParser +from logging.handlers import QueueHandler +from multiprocessing import Queue import logging import sys import time @@ -20,7 +22,6 @@ ) ) - class TaskRunner: def __init__( self, @@ -48,9 +49,19 @@ def __init__( ) ) - def run(self) -> None: + def run(self, queue) -> None: + # Add a handler that uses the shared queue + if queue: + logger.addHandler(QueueHandler(queue)) + if self.configuration != None: self.configuration.apply_logging_config() + else: + logger.setLevel(logging.DEBUG) + + task_names = ','.join(self.worker.task_definition_names) + logger.info(f'Started worker process for task(s): {task_names}') + while True: try: self.run_once() @@ -209,6 +220,8 @@ def __wait_for_polling_interval(self) -> None: time.sleep(polling_interval) def __set_worker_properties(self) -> None: + # If multiple tasks are supplied to the same worker, then only first + # task will be considered for setting worker properties task_type = self.worker.get_task_definition_name() # Fetch from ENV Variables if present diff --git a/src/conductor/client/configuration/configuration.py b/src/conductor/client/configuration/configuration.py index 0a312ffd..cde836cb 100644 --- a/src/conductor/client/configuration/configuration.py +++ b/src/conductor/client/configuration/configuration.py @@ -93,6 +93,17 @@ def logger_format(self, value): """ self.__logger_format = value + @property + def log_level(self): + """The log level. + + The log_level will be updated when sets logger_format. + + :param value: The format string. + :type: str + """ + return self.__log_level + def apply_logging_config(self): logging.basicConfig( format=self.logger_format, diff --git a/src/conductor/client/worker/worker_interface.py b/src/conductor/client/worker/worker_interface.py index 2ff98331..72512fc6 100644 --- a/src/conductor/client/worker/worker_interface.py +++ b/src/conductor/client/worker/worker_interface.py @@ -51,6 +51,13 @@ def get_task_definition_name(self) -> str: """ return self.task_definition_name_cache + @property + def task_definition_names(self): + if isinstance(self.task_definition_name, list): + return self.task_definition_name + else: + return [self.task_definition_name] + @property def task_definition_name_cache(self): if self._task_definition_name_cache is None: