diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index c056ddec..f0416143 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -242,16 +242,14 @@ def __set_worker_properties(self) -> None: # Override polling interval if present in config and not in ENV if not polling_interval_initialized: # Setting to fallback poll interval before reading config - polling_interval = self.worker.poll_interval if self.worker.poll_interval else DEFAULT_POLLING_INTERVAL + default_polling_interval = self.worker.poll_interval try: # Read polling interval from config - polling_interval = float(section.get("polling_interval", polling_interval)) + self.worker.poll_interval = float(section.get("polling_interval", default_polling_interval)) + logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval)) except Exception as e: - logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), polling_interval)) - finally: - self.worker.poll_interval = polling_interval - + logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval)) def __get_property_value_from_env(self, prop, task_type): prefix = "conductor_worker" diff --git a/src/conductor/client/worker/worker.py b/src/conductor/client/worker/worker.py index 589a6a6e..f10a41d1 100644 --- a/src/conductor/client/worker/worker.py +++ b/src/conductor/client/worker/worker.py @@ -2,7 +2,7 @@ from conductor.client.http.models.task import Task from conductor.client.http.models.task_result import TaskResult from conductor.client.http.models.task_result_status import TaskResultStatus -from conductor.client.worker.worker_interface import WorkerInterface +from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL from typing import Any, Callable, Union from typing_extensions import Self import inspect @@ -37,7 +37,9 @@ def __init__(self, worker_id: str = None, ) -> Self: super().__init__(task_definition_name) - if poll_interval: + if poll_interval == None: + self.poll_interval = DEFAULT_POLLING_INTERVAL + else: self.poll_interval = deepcopy(poll_interval) self.domain = deepcopy(domain) if worker_id is None: diff --git a/src/conductor/client/worker/worker_interface.py b/src/conductor/client/worker/worker_interface.py index 0d63b748..2ff98331 100644 --- a/src/conductor/client/worker/worker_interface.py +++ b/src/conductor/client/worker/worker_interface.py @@ -13,7 +13,7 @@ def __init__(self, task_definition_name: Union[str, list]): self.next_task_index = 0 self._task_definition_name_cache = None self._domain = None - self._poll_interval = None + self._poll_interval = DEFAULT_POLLING_INTERVAL @abc.abstractmethod def execute(self, task: Task) -> TaskResult: @@ -41,7 +41,7 @@ def get_polling_interval_in_seconds(self) -> float: :return: float Default: 100ms """ - return (self.poll_interval if self.poll_interval else DEFAULT_POLLING_INTERVAL) / 1000 + return (self.poll_interval if self.poll_interval else DEFAULT_POLLING_INTERVAL) / 1000 def get_task_definition_name(self) -> str: """ diff --git a/tests/unit/automator/test_task_handler.py b/tests/unit/automator/test_task_handler.py index 0690a766..16c529c9 100644 --- a/tests/unit/automator/test_task_handler.py +++ b/tests/unit/automator/test_task_handler.py @@ -56,10 +56,10 @@ def test_initialize_with_worker_config(self): configParser = ConfigParser() configParser.add_section('task') configParser.set('task', 'domain', 'test') - configParser.set('task', 'pollingInterval', '2.0') + configParser.set('task', 'polling_interval', '200.0') configParser.add_section('task2') configParser.set('task2', 'domain', 'test2') - configParser.set('task2', 'pollingInterval', '3.0') + configParser.set('task2', 'polling_interval', '300.0') configParser.write(tf) tf.seek(0) @@ -72,9 +72,9 @@ def get_config_file_path_mock(): self.assertIsInstance(config, ConfigParser) self.assertEqual(len(config.sections()), 2) self.assertEqual(config.get('task', 'domain'), "test") - self.assertEqual(config.get('task', 'pollingInterval'), "2.0") + self.assertEqual(config.get('task', 'polling_interval'), "200.0") self.assertEqual(config.get('task2', 'domain'), "test2") - self.assertEqual(config.get('task2', 'pollingInterval'), "3.0") + self.assertEqual(config.get('task2', 'polling_interval'), "300.0") def _get_valid_task_handler(): return TaskHandler( diff --git a/tests/unit/automator/test_task_runner.py b/tests/unit/automator/test_task_runner.py index bd7a2e57..c4ee8872 100644 --- a/tests/unit/automator/test_task_runner.py +++ b/tests/unit/automator/test_task_runner.py @@ -9,6 +9,7 @@ from conductor.client.worker.worker_interface import DEFAULT_POLLING_INTERVAL from configparser import ConfigParser from unittest.mock import patch, ANY, Mock +import os import logging import time import unittest @@ -72,6 +73,24 @@ def test_initialization_with_specific_domain_in_worker_config(self): task_runner = self.__get_valid_task_runner_with_worker_config_and_domain(config, "passed") self.assertEqual(task_runner.worker.domain, 'test') + @unittest.mock.patch.dict(os.environ, {"CONDUCTOR_WORKER_DOMAIN": "cool"}, clear=True) + def test_initialization_with_generic_domain_in_env_var(self): + config = ConfigParser() + config.set('DEFAULT', 'domain', 'generic') + config.add_section('task') + config.set('task', 'domain', 'test') + task_runner = self.__get_valid_task_runner_with_worker_config_and_domain(config, "passed") + self.assertEqual(task_runner.worker.domain, 'cool') + + @unittest.mock.patch.dict(os.environ, {"conductor_worker_task_domain": "hot"}, clear=True) + def test_initialization_with_specific_domain_in_env_var(self): + config = ConfigParser() + config.set('DEFAULT', 'domain', 'generic') + config.add_section('task') + config.set('task', 'domain', 'test') + task_runner = self.__get_valid_task_runner_with_worker_config_and_domain(config, "passed") + self.assertEqual(task_runner.worker.domain, 'hot') + def test_initialization_with_default_polling_interval(self): task_runner = self.__get_valid_task_runner() self.assertEqual(task_runner.worker.get_polling_interval_in_seconds() * 1000, DEFAULT_POLLING_INTERVAL) @@ -95,6 +114,24 @@ def test_initialization_with_specific_polling_interval_in_worker_config(self): task_runner = self.__get_valid_task_runner_with_worker_config_and_poll_interval(config, 3000) self.assertEqual(task_runner.worker.get_polling_interval_in_seconds(), 5.0) + @unittest.mock.patch.dict(os.environ, {"conductor_worker_polling_interval": "1000.0"}, clear=True) + def test_initialization_with_generic_polling_interval_in_env_var(self): + config = ConfigParser() + config.set('DEFAULT', 'polling_interval', '2000') + config.add_section('task') + config.set('task', 'polling_interval', '5000') + task_runner = self.__get_valid_task_runner_with_worker_config_and_poll_interval(config, 3000) + self.assertEqual(task_runner.worker.get_polling_interval_in_seconds(), 1.0) + + @unittest.mock.patch.dict(os.environ, {"CONDUCTOR_WORKER_task_POLLING_INTERVAL": "250.0"}, clear=True) + def test_initialization_with_specific_polling_interval_in_env_var(self): + config = ConfigParser() + config.set('DEFAULT', 'polling_interval', '2000') + config.add_section('task') + config.set('task', 'polling_interval', '5000') + task_runner = self.__get_valid_task_runner_with_worker_config_and_poll_interval(config, 3000) + self.assertEqual(task_runner.worker.get_polling_interval_in_seconds(), 0.25) + def test_run_once(self): expected_time = self.__get_valid_worker().get_polling_interval_in_seconds() with patch.object( diff --git a/tests/unit/resources/workers.py b/tests/unit/resources/workers.py index ae561bea..c2caa604 100644 --- a/tests/unit/resources/workers.py +++ b/tests/unit/resources/workers.py @@ -32,6 +32,7 @@ def get_domain(self) -> str: class ClassWorker(WorkerInterface): def __init__(self, task_definition_name: str): super().__init__(task_definition_name) + self.poll_interval = 50.0 def execute(self, task: Task) -> TaskResult: task_result = self.get_task_result_from_task(task)