Skip to content

Commit

Permalink
Added tests and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
coderabhigupta committed Nov 29, 2023
1 parent 6bb20ba commit 1b13911
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 14 deletions.
10 changes: 4 additions & 6 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,14 @@ def __set_worker_properties(self) -> None:
# Override polling interval if present in config and not in ENV
if not polling_interval_initialized:
# Setting to fallback poll interval before reading config
polling_interval = self.worker.poll_interval if self.worker.poll_interval else DEFAULT_POLLING_INTERVAL
default_polling_interval = self.worker.poll_interval

try:
# Read polling interval from config
polling_interval = float(section.get("polling_interval", polling_interval))
self.worker.poll_interval = float(section.get("polling_interval", default_polling_interval))
logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval))
except Exception as e:
logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), polling_interval))
finally:
self.worker.poll_interval = polling_interval

logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval))

def __get_property_value_from_env(self, prop, task_type):
prefix = "conductor_worker"
Expand Down
6 changes: 4 additions & 2 deletions src/conductor/client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL
from typing import Any, Callable, Union
from typing_extensions import Self
import inspect
Expand Down Expand Up @@ -37,7 +37,9 @@ def __init__(self,
worker_id: str = None,
) -> Self:
super().__init__(task_definition_name)
if poll_interval:
if poll_interval == None:
self.poll_interval = DEFAULT_POLLING_INTERVAL
else:
self.poll_interval = deepcopy(poll_interval)
self.domain = deepcopy(domain)
if worker_id is None:
Expand Down
4 changes: 2 additions & 2 deletions src/conductor/client/worker/worker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, task_definition_name: Union[str, list]):
self.next_task_index = 0
self._task_definition_name_cache = None
self._domain = None
self._poll_interval = None
self._poll_interval = DEFAULT_POLLING_INTERVAL

@abc.abstractmethod
def execute(self, task: Task) -> TaskResult:
Expand Down Expand Up @@ -41,7 +41,7 @@ def get_polling_interval_in_seconds(self) -> float:
:return: float
Default: 100ms
"""
return (self.poll_interval if self.poll_interval else DEFAULT_POLLING_INTERVAL) / 1000
return (self.poll_interval if self.poll_interval else DEFAULT_POLLING_INTERVAL) / 1000

def get_task_definition_name(self) -> str:
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/automator/test_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ def test_initialize_with_worker_config(self):
configParser = ConfigParser()
configParser.add_section('task')
configParser.set('task', 'domain', 'test')
configParser.set('task', 'pollingInterval', '2.0')
configParser.set('task', 'polling_interval', '200.0')
configParser.add_section('task2')
configParser.set('task2', 'domain', 'test2')
configParser.set('task2', 'pollingInterval', '3.0')
configParser.set('task2', 'polling_interval', '300.0')
configParser.write(tf)
tf.seek(0)

Expand All @@ -72,9 +72,9 @@ def get_config_file_path_mock():
self.assertIsInstance(config, ConfigParser)
self.assertEqual(len(config.sections()), 2)
self.assertEqual(config.get('task', 'domain'), "test")
self.assertEqual(config.get('task', 'pollingInterval'), "2.0")
self.assertEqual(config.get('task', 'polling_interval'), "200.0")
self.assertEqual(config.get('task2', 'domain'), "test2")
self.assertEqual(config.get('task2', 'pollingInterval'), "3.0")
self.assertEqual(config.get('task2', 'polling_interval'), "300.0")

def _get_valid_task_handler():
return TaskHandler(
Expand Down
37 changes: 37 additions & 0 deletions tests/unit/automator/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from conductor.client.worker.worker_interface import DEFAULT_POLLING_INTERVAL
from configparser import ConfigParser
from unittest.mock import patch, ANY, Mock
import os
import logging
import time
import unittest
Expand Down Expand Up @@ -72,6 +73,24 @@ def test_initialization_with_specific_domain_in_worker_config(self):
task_runner = self.__get_valid_task_runner_with_worker_config_and_domain(config, "passed")
self.assertEqual(task_runner.worker.domain, 'test')

@unittest.mock.patch.dict(os.environ, {"CONDUCTOR_WORKER_DOMAIN": "cool"}, clear=True)
def test_initialization_with_generic_domain_in_env_var(self):
config = ConfigParser()
config.set('DEFAULT', 'domain', 'generic')
config.add_section('task')
config.set('task', 'domain', 'test')
task_runner = self.__get_valid_task_runner_with_worker_config_and_domain(config, "passed")
self.assertEqual(task_runner.worker.domain, 'cool')

@unittest.mock.patch.dict(os.environ, {"conductor_worker_task_domain": "hot"}, clear=True)
def test_initialization_with_specific_domain_in_env_var(self):
config = ConfigParser()
config.set('DEFAULT', 'domain', 'generic')
config.add_section('task')
config.set('task', 'domain', 'test')
task_runner = self.__get_valid_task_runner_with_worker_config_and_domain(config, "passed")
self.assertEqual(task_runner.worker.domain, 'hot')

def test_initialization_with_default_polling_interval(self):
task_runner = self.__get_valid_task_runner()
self.assertEqual(task_runner.worker.get_polling_interval_in_seconds() * 1000, DEFAULT_POLLING_INTERVAL)
Expand All @@ -95,6 +114,24 @@ def test_initialization_with_specific_polling_interval_in_worker_config(self):
task_runner = self.__get_valid_task_runner_with_worker_config_and_poll_interval(config, 3000)
self.assertEqual(task_runner.worker.get_polling_interval_in_seconds(), 5.0)

@unittest.mock.patch.dict(os.environ, {"conductor_worker_polling_interval": "1000.0"}, clear=True)
def test_initialization_with_generic_polling_interval_in_env_var(self):
config = ConfigParser()
config.set('DEFAULT', 'polling_interval', '2000')
config.add_section('task')
config.set('task', 'polling_interval', '5000')
task_runner = self.__get_valid_task_runner_with_worker_config_and_poll_interval(config, 3000)
self.assertEqual(task_runner.worker.get_polling_interval_in_seconds(), 1.0)

@unittest.mock.patch.dict(os.environ, {"CONDUCTOR_WORKER_task_POLLING_INTERVAL": "250.0"}, clear=True)
def test_initialization_with_specific_polling_interval_in_env_var(self):
config = ConfigParser()
config.set('DEFAULT', 'polling_interval', '2000')
config.add_section('task')
config.set('task', 'polling_interval', '5000')
task_runner = self.__get_valid_task_runner_with_worker_config_and_poll_interval(config, 3000)
self.assertEqual(task_runner.worker.get_polling_interval_in_seconds(), 0.25)

def test_run_once(self):
expected_time = self.__get_valid_worker().get_polling_interval_in_seconds()
with patch.object(
Expand Down
1 change: 1 addition & 0 deletions tests/unit/resources/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def get_domain(self) -> str:
class ClassWorker(WorkerInterface):
def __init__(self, task_definition_name: str):
super().__init__(task_definition_name)
self.poll_interval = 50.0

def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
Expand Down

0 comments on commit 1b13911

Please sign in to comment.