From b189e487c2a350524cc2fe011bace2344479ff2e Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sat, 2 Dec 2023 00:10:58 -0800 Subject: [PATCH] scheduler --- docs/schedule/README.md | 38 ++++++---- .../client/http/models/tag_object.py | 1 + .../client/orkes/models/metadata_tag.py | 1 + .../client/orkes/orkes_scheduler_client.py | 70 +++++++++--------- src/conductor/client/scheduler_client.py | 71 ++++++++++--------- .../client/orkes/test_orkes_clients.py | 26 +++---- tests/unit/orkes/test_scheduler_client.py | 34 ++++----- 7 files changed, 128 insertions(+), 113 deletions(-) diff --git a/docs/schedule/README.md b/docs/schedule/README.md index 798a3a68..0eb8ec43 100644 --- a/docs/schedule/README.md +++ b/docs/schedule/README.md @@ -18,6 +18,7 @@ scheduler_client = OrkesSchedulerClient(configuration) ``` ### Saving Schedule + ```python from conductor.client.http.models.save_schedule_request import SaveScheduleRequest from conductor.client.http.models.start_workflow_request import StartWorkflowRequest @@ -28,77 +29,88 @@ startWorkflowRequest = StartWorkflowRequest( saveScheduleRequest = SaveScheduleRequest( name="SCHEDULE_NAME", start_workflow_request=startWorkflowRequest, - cron_expression= "0 */5 * ? * *" + cron_expression="0 */5 * ? * *" ) -scheduler_client.saveSchedule(saveScheduleRequest) +scheduler_client.save_schedule(saveScheduleRequest) ``` ### Get Schedule #### Get a specific schedule + ```python -scheduler_client.getSchedule("SCHEDULE_NAME") +scheduler_client.get_schedule("SCHEDULE_NAME") ``` #### Get all schedules + ```python -scheduler_client.getAllSchedules() +scheduler_client.get_all_schedules() ``` #### Get all schedules for a workflow + ```python -scheduler_client.getAllSchedules("WORKFLOW_NAME") +scheduler_client.get_all_schedules("WORKFLOW_NAME") ``` ### Delete Schedule + ```python -scheduler_client.deleteSchedule("SCHEDULE_NAME") +scheduler_client.delete_schedule("SCHEDULE_NAME") ``` ### Pause and Resume Schedules #### Pause a schedule + ```python -scheduler_client.pauseSchedule("SCHEDULE_NAME") +scheduler_client.pause_schedule("SCHEDULE_NAME") ``` #### Pause all schedules + ```python -scheduler_client.pauseAllSchedules() +scheduler_client.pause_all_schedules() ``` #### Resume a scheduler + ```python -scheduler_client.resumeSchedule("SCHEDULE_NAME") +scheduler_client.resume_schedule("SCHEDULE_NAME") ``` #### Resume all schedules + ```python -scheduler_client.resumeAllSchedules() +scheduler_client.resume_all_schedules() ``` ### Scheduler Tag Management #### Set scheduler tags + ```python from conductor.client.orkes.models.metadata_tag import MetadataTag tags = [ MetadataTag("sch_tag", "val"), MetadataTag("sch_tag_2", "val2") ] -scheduler_client.setSchedulerTags(tags, "SCHEDULE_NAME") +scheduler_client.set_scheduler_tags(tags, "SCHEDULE_NAME") ``` #### Get scheduler tags + ```python -tags = scheduler_client.getSchedulerTags("SCHEDULE_NAME") +tags = scheduler_client.get_scheduler_tags("SCHEDULE_NAME") ``` #### Delete scheduler tags + ```python tags = [ MetadataTag("sch_tag", "val"), MetadataTag("sch_tag_2", "val2") ] -scheduler_client.deleteSchedulerTags(tags, "SCHEDULE_NAME") +scheduler_client.delete_scheduler_tags(tags, "SCHEDULE_NAME") ``` diff --git a/src/conductor/client/http/models/tag_object.py b/src/conductor/client/http/models/tag_object.py index e1bca403..f138ca60 100644 --- a/src/conductor/client/http/models/tag_object.py +++ b/src/conductor/client/http/models/tag_object.py @@ -5,6 +5,7 @@ import six + class TagObject(object): """NOTE: This class is auto generated by the swagger code generator program. diff --git a/src/conductor/client/orkes/models/metadata_tag.py b/src/conductor/client/orkes/models/metadata_tag.py index 4a5a9958..f42e2e34 100644 --- a/src/conductor/client/orkes/models/metadata_tag.py +++ b/src/conductor/client/orkes/models/metadata_tag.py @@ -1,6 +1,7 @@ from conductor.client.http.models.tag_object import TagObject from typing_extensions import Self + class MetadataTag(TagObject): def __init__(self, key: str, value: str) -> Self: super().__init__( diff --git a/src/conductor/client/orkes/orkes_scheduler_client.py b/src/conductor/client/orkes/orkes_scheduler_client.py index f8502d57..8a19ce65 100644 --- a/src/conductor/client/orkes/orkes_scheduler_client.py +++ b/src/conductor/client/orkes/orkes_scheduler_client.py @@ -13,56 +13,56 @@ class OrkesSchedulerClient(OrkesBaseClient, SchedulerClient): def __init__(self, configuration: Configuration): super(OrkesSchedulerClient, self).__init__(configuration) - def saveSchedule(self, saveScheduleRequest: SaveScheduleRequest): - self.schedulerResourceApi.save_schedule(saveScheduleRequest) + def save_schedule(self, save_schedule_request: SaveScheduleRequest): + self.schedulerResourceApi.save_schedule(save_schedule_request) - def getSchedule(self, name: str) -> WorkflowSchedule: + def get_schedule(self, name: str) -> WorkflowSchedule: return self.schedulerResourceApi.get_schedule(name) - def getAllSchedules(self, workflowName: Optional[str] = None) -> List[WorkflowSchedule]: + def get_all_schedules(self, workflow_name: Optional[str] = None) -> List[WorkflowSchedule]: kwargs = {} - if workflowName: - kwargs.update({"workflow_name": workflowName}) + if workflow_name: + kwargs.update({"workflow_name": workflow_name}) return self.schedulerResourceApi.get_all_schedules(**kwargs) - def getNextFewScheduleExecutionTimes(self, - cronExpression: str, - scheduleStartTime: Optional[int] = None, - scheduleEndTime: Optional[int] = None, - limit: Optional[int] = None, - ) -> List[int]: + def get_next_few_schedule_execution_times(self, + cron_expression: str, + schedule_start_time: Optional[int] = None, + schedule_end_time: Optional[int] = None, + limit: Optional[int] = None, + ) -> List[int]: kwargs = {} - if scheduleStartTime: - kwargs.update({"schedule_start_time": scheduleStartTime}) - if scheduleEndTime: - kwargs.update({"schedule_end_time": scheduleEndTime}) + if schedule_start_time: + kwargs.update({"schedule_start_time": schedule_start_time}) + if schedule_end_time: + kwargs.update({"schedule_end_time": schedule_end_time}) if limit: kwargs.update({"limit": limit}) - return self.schedulerResourceApi.get_next_few_schedules(cronExpression, **kwargs) + return self.schedulerResourceApi.get_next_few_schedules(cron_expression, **kwargs) - def deleteSchedule(self, name: str): + def delete_schedule(self, name: str): self.schedulerResourceApi.delete_schedule(name) - def pauseSchedule(self, name: str): + def pause_schedule(self, name: str): self.schedulerResourceApi.pause_schedule(name) - def pauseAllSchedules(self): + def pause_all_schedules(self): self.schedulerResourceApi.pause_all_schedules() - def resumeSchedule(self, name: str): + def resume_schedule(self, name: str): self.schedulerResourceApi.resume_schedule(name) - def resumeAllSchedules(self): + def resume_all_schedules(self): self.schedulerResourceApi.resume_all_schedules() - def searchScheduleExecutions(self, - start: Optional[int] = None, - size: Optional[int] = None, - sort: Optional[str] = None, - freeText: Optional[str] = None, - query: Optional[str] = None, - ) -> SearchResultWorkflowScheduleExecutionModel: + def search_schedule_executions(self, + start: Optional[int] = None, + size: Optional[int] = None, + sort: Optional[str] = None, + free_text: Optional[str] = None, + query: Optional[str] = None, + ) -> SearchResultWorkflowScheduleExecutionModel: kwargs = {} if start: kwargs.update({"start": start}) @@ -70,20 +70,20 @@ def searchScheduleExecutions(self, kwargs.update({"size": size}) if sort: kwargs.update({"sort": sort}) - if freeText: - kwargs.update({"freeText": freeText}) + if free_text: + kwargs.update({"freeText": free_text}) if query: kwargs.update({"query": query}) return self.schedulerResourceApi.search_v21(**kwargs) - def requeueAllExecutionRecords(self): + def requeue_all_execution_records(self): self.schedulerResourceApi.requeue_all_execution_records() - def setSchedulerTags(self, tags: List[MetadataTag], name: str): + def set_scheduler_tags(self, tags: List[MetadataTag], name: str): self.schedulerResourceApi.put_tag_for_schedule(tags, name) - def getSchedulerTags(self, name: str) -> List[MetadataTag]: + def get_scheduler_tags(self, name: str) -> List[MetadataTag]: return self.schedulerResourceApi.get_tags_for_schedule(name) - def deleteSchedulerTags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]: + def delete_scheduler_tags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]: self.schedulerResourceApi.delete_tag_for_schedule(tags, name) diff --git a/src/conductor/client/scheduler_client.py b/src/conductor/client/scheduler_client.py index 85fb7660..f507d78d 100644 --- a/src/conductor/client/scheduler_client.py +++ b/src/conductor/client/scheduler_client.py @@ -2,74 +2,75 @@ from typing import Optional, List from conductor.client.http.models.workflow_schedule import WorkflowSchedule from conductor.client.http.models.save_schedule_request import SaveScheduleRequest -from conductor.client.http.models.search_result_workflow_schedule_execution_model import SearchResultWorkflowScheduleExecutionModel +from conductor.client.http.models.search_result_workflow_schedule_execution_model import \ + SearchResultWorkflowScheduleExecutionModel from conductor.client.orkes.models.metadata_tag import MetadataTag + class SchedulerClient(ABC): @abstractmethod - def saveSchedule(self, saveScheduleRequest: SaveScheduleRequest): + def save_schedule(self, save_schedule_request: SaveScheduleRequest): pass - + @abstractmethod - def getSchedule(self, name: str) -> (Optional[WorkflowSchedule], str): + def get_schedule(self, name: str) -> (Optional[WorkflowSchedule], str): pass - + @abstractmethod - def getAllSchedules(self, workflowName: Optional[str] = None) -> List[WorkflowSchedule]: + def get_all_schedules(self, workflow_name: Optional[str] = None) -> List[WorkflowSchedule]: pass - + @abstractmethod - def getNextFewScheduleExecutionTimes(self, - cronExpression: str, - scheduleStartTime: Optional[int] = None, - scheduleEndTime: Optional[int] = None, - limit: Optional[int] = None, - ) -> List[int]: + def get_next_few_schedule_execution_times(self, + cron_expression: str, + schedule_start_time: Optional[int] = None, + schedule_end_time: Optional[int] = None, + limit: Optional[int] = None, + ) -> List[int]: pass @abstractmethod - def deleteSchedule(self, name: str): + def delete_schedule(self, name: str): pass @abstractmethod - def pauseSchedule(self, name: str): + def pause_schedule(self, name: str): pass - + @abstractmethod - def pauseAllSchedules(self): + def pause_all_schedules(self): pass - + @abstractmethod - def resumeSchedule(self, name: str): + def resume_schedule(self, name: str): pass - + @abstractmethod - def resumeAllSchedules(self): + def resume_all_schedules(self): pass @abstractmethod - def searchScheduleExecutions(self, - start: Optional[int] = None, - size: Optional[int] = None, - sort: Optional[str] = None, - freeText: Optional[str] = None, - query: Optional[str] = None, - ) -> SearchResultWorkflowScheduleExecutionModel: + def search_schedule_executions(self, + start: Optional[int] = None, + size: Optional[int] = None, + sort: Optional[str] = None, + free_text: Optional[str] = None, + query: Optional[str] = None, + ) -> SearchResultWorkflowScheduleExecutionModel: pass - + @abstractmethod - def requeueAllExecutionRecords(self): + def requeue_all_execution_records(self): pass @abstractmethod - def setSchedulerTags(self, tags: List[MetadataTag], name: str): + def set_scheduler_tags(self, tags: List[MetadataTag], name: str): pass @abstractmethod - def getSchedulerTags(self, name: str) -> List[MetadataTag]: + def get_scheduler_tags(self, name: str) -> List[MetadataTag]: pass - + @abstractmethod - def deleteSchedulerTags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]: + def delete_scheduler_tags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]: pass - diff --git a/tests/integration/client/orkes/test_orkes_clients.py b/tests/integration/client/orkes/test_orkes_clients.py index b7461b88..4815b51f 100644 --- a/tests/integration/client/orkes/test_orkes_clients.py +++ b/tests/integration/client/orkes/test_orkes_clients.py @@ -152,41 +152,41 @@ def test_scheduler_lifecycle(self, workflowDef): cron_expression= "0 */5 * ? * *" ) - self.scheduler_client.saveSchedule(saveScheduleRequest) + self.scheduler_client.save_schedule(saveScheduleRequest) - schedule = self.scheduler_client.getSchedule(SCHEDULE_NAME) + schedule = self.scheduler_client.get_schedule(SCHEDULE_NAME) assert schedule['name'] == SCHEDULE_NAME - self.scheduler_client.pauseSchedule(SCHEDULE_NAME) + self.scheduler_client.pause_schedule(SCHEDULE_NAME) - schedules = self.scheduler_client.getAllSchedules(WORKFLOW_NAME) + schedules = self.scheduler_client.get_all_schedules(WORKFLOW_NAME) assert len(schedules) == 1 assert schedules[0].name == SCHEDULE_NAME assert schedules[0].paused - self.scheduler_client.resumeSchedule(SCHEDULE_NAME) - schedule = self.scheduler_client.getSchedule(SCHEDULE_NAME) + self.scheduler_client.resume_schedule(SCHEDULE_NAME) + schedule = self.scheduler_client.get_schedule(SCHEDULE_NAME) assert not schedule['paused'] - times = self.scheduler_client.getNextFewScheduleExecutionTimes("0 */5 * ? * *", limit=1) + times = self.scheduler_client.get_next_few_schedule_execution_times("0 */5 * ? * *", limit=1) assert(len(times) == 1) tags = [ MetadataTag("sch_tag", "val"), MetadataTag("sch_tag_2", "val2") ] - self.scheduler_client.setSchedulerTags(tags, SCHEDULE_NAME) - fetched_tags = self.scheduler_client.getSchedulerTags(SCHEDULE_NAME) + self.scheduler_client.set_scheduler_tags(tags, SCHEDULE_NAME) + fetched_tags = self.scheduler_client.get_scheduler_tags(SCHEDULE_NAME) assert len(fetched_tags) == 2 - self.scheduler_client.deleteSchedulerTags(tags, SCHEDULE_NAME) - fetched_tags = self.scheduler_client.getSchedulerTags(SCHEDULE_NAME) + self.scheduler_client.delete_scheduler_tags(tags, SCHEDULE_NAME) + fetched_tags = self.scheduler_client.get_scheduler_tags(SCHEDULE_NAME) assert len(fetched_tags) == 0 - self.scheduler_client.deleteSchedule(SCHEDULE_NAME) + self.scheduler_client.delete_schedule(SCHEDULE_NAME) try: - schedule = self.scheduler_client.getSchedule(SCHEDULE_NAME) + schedule = self.scheduler_client.get_schedule(SCHEDULE_NAME) except APIError as e: assert e.code == APIErrorCode.NOT_FOUND assert e.message == "Schedule '{0}' not found".format(SCHEDULE_NAME) diff --git a/tests/unit/orkes/test_scheduler_client.py b/tests/unit/orkes/test_scheduler_client.py index cf2ea77c..c0e5ea9d 100644 --- a/tests/unit/orkes/test_scheduler_client.py +++ b/tests/unit/orkes/test_scheduler_client.py @@ -39,14 +39,14 @@ def test_init(self): @patch.object(SchedulerResourceApi, 'save_schedule') def test_saveSchedule(self, mock): - self.scheduler_client.saveSchedule(self.saveScheduleRequest) + self.scheduler_client.save_schedule(self.saveScheduleRequest) self.assertTrue(mock.called) mock.assert_called_with(self.saveScheduleRequest) @patch.object(SchedulerResourceApi, 'get_schedule') def test_getSchedule(self, mock): mock.return_value = self.workflowSchedule - schedule = self.scheduler_client.getSchedule(SCHEDULE_NAME) + schedule = self.scheduler_client.get_schedule(SCHEDULE_NAME) self.assertEqual(schedule, self.workflowSchedule) self.assertTrue(mock.called) mock.assert_called_with(SCHEDULE_NAME) @@ -56,20 +56,20 @@ def test_getSchedule_non_existing(self, mock): error_body = { 'status': 404, 'message': 'Schedule not found' } mock.side_effect = MagicMock(side_effect=ApiException(status=404, body=json.dumps(error_body))) with self.assertRaises(APIError): - self.scheduler_client.getSchedule("WRONG_SCHEDULE") + self.scheduler_client.get_schedule("WRONG_SCHEDULE") mock.assert_called_with("WRONG_SCHEDULE") @patch.object(SchedulerResourceApi, 'get_all_schedules') def test_getAllSchedules(self, mock): mock.return_value = [self.workflowSchedule] - schedules = self.scheduler_client.getAllSchedules() + schedules = self.scheduler_client.get_all_schedules() self.assertEqual(schedules, [self.workflowSchedule]) self.assertTrue(mock.called) @patch.object(SchedulerResourceApi, 'get_all_schedules') def test_getAllSchedules_with_workflow_name(self, mock): mock.return_value = [self.workflowSchedule] - schedules = self.scheduler_client.getAllSchedules(WORKFLOW_NAME) + schedules = self.scheduler_client.get_all_schedules(WORKFLOW_NAME) self.assertEqual(schedules, [self.workflowSchedule]) mock.assert_called_with(workflow_name=WORKFLOW_NAME) @@ -77,7 +77,7 @@ def test_getAllSchedules_with_workflow_name(self, mock): def test_getNextFewScheduleExecutionTimes(self, mock): cronExpression = "0 */5 * ? * *" mock.return_value = [1698093000000, 1698093300000, 1698093600000] - times = self.scheduler_client.getNextFewScheduleExecutionTimes(cronExpression) + times = self.scheduler_client.get_next_few_schedule_execution_times(cronExpression) self.assertEqual(len(times), 3) mock.assert_called_with(cronExpression) @@ -85,7 +85,7 @@ def test_getNextFewScheduleExecutionTimes(self, mock): def test_getNextFewScheduleExecutionTimes_with_optional_params(self, mock): cronExpression = "0 */5 * ? * *" mock.return_value = [1698093300000, 1698093600000] - times = self.scheduler_client.getNextFewScheduleExecutionTimes( + times = self.scheduler_client.get_next_few_schedule_execution_times( cronExpression, 1698093300000, 1698093600000, 2 ) self.assertEqual(len(times), 2) @@ -98,32 +98,32 @@ def test_getNextFewScheduleExecutionTimes_with_optional_params(self, mock): @patch.object(SchedulerResourceApi, 'delete_schedule') def test_deleteSchedule(self, mock): - self.scheduler_client.deleteSchedule(SCHEDULE_NAME) + self.scheduler_client.delete_schedule(SCHEDULE_NAME) mock.assert_called_with(SCHEDULE_NAME) @patch.object(SchedulerResourceApi, 'pause_schedule') def test_pauseSchedule(self, mock): - self.scheduler_client.pauseSchedule(SCHEDULE_NAME) + self.scheduler_client.pause_schedule(SCHEDULE_NAME) mock.assert_called_with(SCHEDULE_NAME) @patch.object(SchedulerResourceApi, 'pause_all_schedules') def test_pauseAllSchedules(self, mock): - self.scheduler_client.pauseAllSchedules() + self.scheduler_client.pause_all_schedules() self.assertTrue(mock.called) @patch.object(SchedulerResourceApi, 'resume_schedule') def test_resumeSchedule(self, mock): - self.scheduler_client.resumeSchedule(SCHEDULE_NAME) + self.scheduler_client.resume_schedule(SCHEDULE_NAME) mock.assert_called_with(SCHEDULE_NAME) @patch.object(SchedulerResourceApi, 'resume_all_schedules') def test_resumeAllSchedules(self, mock): - self.scheduler_client.resumeAllSchedules() + self.scheduler_client.resume_all_schedules() self.assertTrue(mock.called) @patch.object(SchedulerResourceApi, 'requeue_all_execution_records') def test_requeueAllExecutionRecords(self, mock): - self.scheduler_client.requeueAllExecutionRecords() + self.scheduler_client.requeue_all_execution_records() self.assertTrue(mock.called) @patch.object(SchedulerResourceApi, 'search_v21') @@ -134,7 +134,7 @@ def test_searchScheduleExecutions(self, mock): sort = "name&sort=workflowId:DESC" freeText = "abc" query="workflowId=abc" - searchResult = self.scheduler_client.searchScheduleExecutions( + searchResult = self.scheduler_client.search_schedule_executions( start, 2, sort, freeText, query ) mock.assert_called_with( @@ -151,7 +151,7 @@ def test_setSchedulerTags(self, mock): tag1 = MetadataTag("tag1", "val1") tag2 = MetadataTag("tag2", "val2") tags = [tag1, tag2] - self.scheduler_client.setSchedulerTags(tags, SCHEDULE_NAME) + self.scheduler_client.set_scheduler_tags(tags, SCHEDULE_NAME) mock.assert_called_with(tags, SCHEDULE_NAME) @patch.object(SchedulerResourceApi, 'get_tags_for_schedule') @@ -159,7 +159,7 @@ def test_getSchedulerTags(self, mock): tag1 = MetadataTag("tag1", "val1") tag1 = MetadataTag("tag2", "val2") mock.return_value = [tag1, tag1] - tags = self.scheduler_client.getSchedulerTags(SCHEDULE_NAME) + tags = self.scheduler_client.get_scheduler_tags(SCHEDULE_NAME) mock.assert_called_with(SCHEDULE_NAME) self.assertEqual(len(tags), 2) @@ -168,5 +168,5 @@ def test_deleteSchedulerTags(self, mock): tag1 = MetadataTag("tag1", "val1") tag2 = MetadataTag("tag2", "val2") tags = [tag1, tag2] - self.scheduler_client.deleteSchedulerTags(tags, SCHEDULE_NAME) + self.scheduler_client.delete_scheduler_tags(tags, SCHEDULE_NAME) mock.assert_called_with(tags, SCHEDULE_NAME) \ No newline at end of file