Skip to content

Commit

Permalink
scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Dec 2, 2023
1 parent c19afd8 commit b189e48
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 113 deletions.
38 changes: 25 additions & 13 deletions docs/schedule/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ scheduler_client = OrkesSchedulerClient(configuration)
```

### Saving Schedule

```python
from conductor.client.http.models.save_schedule_request import SaveScheduleRequest
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
Expand All @@ -28,77 +29,88 @@ startWorkflowRequest = StartWorkflowRequest(
saveScheduleRequest = SaveScheduleRequest(
name="SCHEDULE_NAME",
start_workflow_request=startWorkflowRequest,
cron_expression= "0 */5 * ? * *"
cron_expression="0 */5 * ? * *"
)

scheduler_client.saveSchedule(saveScheduleRequest)
scheduler_client.save_schedule(saveScheduleRequest)
```

### Get Schedule

#### Get a specific schedule

```python
scheduler_client.getSchedule("SCHEDULE_NAME")
scheduler_client.get_schedule("SCHEDULE_NAME")
```

#### Get all schedules

```python
scheduler_client.getAllSchedules()
scheduler_client.get_all_schedules()
```

#### Get all schedules for a workflow

```python
scheduler_client.getAllSchedules("WORKFLOW_NAME")
scheduler_client.get_all_schedules("WORKFLOW_NAME")
```

### Delete Schedule

```python
scheduler_client.deleteSchedule("SCHEDULE_NAME")
scheduler_client.delete_schedule("SCHEDULE_NAME")
```

### Pause and Resume Schedules

#### Pause a schedule

```python
scheduler_client.pauseSchedule("SCHEDULE_NAME")
scheduler_client.pause_schedule("SCHEDULE_NAME")
```

#### Pause all schedules

```python
scheduler_client.pauseAllSchedules()
scheduler_client.pause_all_schedules()
```

#### Resume a scheduler

```python
scheduler_client.resumeSchedule("SCHEDULE_NAME")
scheduler_client.resume_schedule("SCHEDULE_NAME")
```

#### Resume all schedules

```python
scheduler_client.resumeAllSchedules()
scheduler_client.resume_all_schedules()
```

### Scheduler Tag Management

#### Set scheduler tags

```python
from conductor.client.orkes.models.metadata_tag import MetadataTag

tags = [
MetadataTag("sch_tag", "val"), MetadataTag("sch_tag_2", "val2")
]
scheduler_client.setSchedulerTags(tags, "SCHEDULE_NAME")
scheduler_client.set_scheduler_tags(tags, "SCHEDULE_NAME")
```

#### Get scheduler tags

```python
tags = scheduler_client.getSchedulerTags("SCHEDULE_NAME")
tags = scheduler_client.get_scheduler_tags("SCHEDULE_NAME")
```

#### Delete scheduler tags

```python
tags = [
MetadataTag("sch_tag", "val"), MetadataTag("sch_tag_2", "val2")
]
scheduler_client.deleteSchedulerTags(tags, "SCHEDULE_NAME")
scheduler_client.delete_scheduler_tags(tags, "SCHEDULE_NAME")
```
1 change: 1 addition & 0 deletions src/conductor/client/http/models/tag_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import six


class TagObject(object):
"""NOTE: This class is auto generated by the swagger code generator program.
Expand Down
1 change: 1 addition & 0 deletions src/conductor/client/orkes/models/metadata_tag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from conductor.client.http.models.tag_object import TagObject
from typing_extensions import Self


class MetadataTag(TagObject):
def __init__(self, key: str, value: str) -> Self:
super().__init__(
Expand Down
70 changes: 35 additions & 35 deletions src/conductor/client/orkes/orkes_scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,77 +13,77 @@ class OrkesSchedulerClient(OrkesBaseClient, SchedulerClient):
def __init__(self, configuration: Configuration):
super(OrkesSchedulerClient, self).__init__(configuration)

def saveSchedule(self, saveScheduleRequest: SaveScheduleRequest):
self.schedulerResourceApi.save_schedule(saveScheduleRequest)
def save_schedule(self, save_schedule_request: SaveScheduleRequest):
self.schedulerResourceApi.save_schedule(save_schedule_request)

def getSchedule(self, name: str) -> WorkflowSchedule:
def get_schedule(self, name: str) -> WorkflowSchedule:
return self.schedulerResourceApi.get_schedule(name)

def getAllSchedules(self, workflowName: Optional[str] = None) -> List[WorkflowSchedule]:
def get_all_schedules(self, workflow_name: Optional[str] = None) -> List[WorkflowSchedule]:
kwargs = {}
if workflowName:
kwargs.update({"workflow_name": workflowName})
if workflow_name:
kwargs.update({"workflow_name": workflow_name})

return self.schedulerResourceApi.get_all_schedules(**kwargs)

def getNextFewScheduleExecutionTimes(self,
cronExpression: str,
scheduleStartTime: Optional[int] = None,
scheduleEndTime: Optional[int] = None,
limit: Optional[int] = None,
) -> List[int]:
def get_next_few_schedule_execution_times(self,
cron_expression: str,
schedule_start_time: Optional[int] = None,
schedule_end_time: Optional[int] = None,
limit: Optional[int] = None,
) -> List[int]:
kwargs = {}
if scheduleStartTime:
kwargs.update({"schedule_start_time": scheduleStartTime})
if scheduleEndTime:
kwargs.update({"schedule_end_time": scheduleEndTime})
if schedule_start_time:
kwargs.update({"schedule_start_time": schedule_start_time})
if schedule_end_time:
kwargs.update({"schedule_end_time": schedule_end_time})
if limit:
kwargs.update({"limit": limit})
return self.schedulerResourceApi.get_next_few_schedules(cronExpression, **kwargs)
return self.schedulerResourceApi.get_next_few_schedules(cron_expression, **kwargs)

def deleteSchedule(self, name: str):
def delete_schedule(self, name: str):
self.schedulerResourceApi.delete_schedule(name)

def pauseSchedule(self, name: str):
def pause_schedule(self, name: str):
self.schedulerResourceApi.pause_schedule(name)

def pauseAllSchedules(self):
def pause_all_schedules(self):
self.schedulerResourceApi.pause_all_schedules()

def resumeSchedule(self, name: str):
def resume_schedule(self, name: str):
self.schedulerResourceApi.resume_schedule(name)

def resumeAllSchedules(self):
def resume_all_schedules(self):
self.schedulerResourceApi.resume_all_schedules()

def searchScheduleExecutions(self,
start: Optional[int] = None,
size: Optional[int] = None,
sort: Optional[str] = None,
freeText: Optional[str] = None,
query: Optional[str] = None,
) -> SearchResultWorkflowScheduleExecutionModel:
def search_schedule_executions(self,
start: Optional[int] = None,
size: Optional[int] = None,
sort: Optional[str] = None,
free_text: Optional[str] = None,
query: Optional[str] = None,
) -> SearchResultWorkflowScheduleExecutionModel:
kwargs = {}
if start:
kwargs.update({"start": start})
if size:
kwargs.update({"size": size})
if sort:
kwargs.update({"sort": sort})
if freeText:
kwargs.update({"freeText": freeText})
if free_text:
kwargs.update({"freeText": free_text})
if query:
kwargs.update({"query": query})
return self.schedulerResourceApi.search_v21(**kwargs)

def requeueAllExecutionRecords(self):
def requeue_all_execution_records(self):
self.schedulerResourceApi.requeue_all_execution_records()

def setSchedulerTags(self, tags: List[MetadataTag], name: str):
def set_scheduler_tags(self, tags: List[MetadataTag], name: str):
self.schedulerResourceApi.put_tag_for_schedule(tags, name)

def getSchedulerTags(self, name: str) -> List[MetadataTag]:
def get_scheduler_tags(self, name: str) -> List[MetadataTag]:
return self.schedulerResourceApi.get_tags_for_schedule(name)

def deleteSchedulerTags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]:
def delete_scheduler_tags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]:
self.schedulerResourceApi.delete_tag_for_schedule(tags, name)
71 changes: 36 additions & 35 deletions src/conductor/client/scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,75 @@
from typing import Optional, List
from conductor.client.http.models.workflow_schedule import WorkflowSchedule
from conductor.client.http.models.save_schedule_request import SaveScheduleRequest
from conductor.client.http.models.search_result_workflow_schedule_execution_model import SearchResultWorkflowScheduleExecutionModel
from conductor.client.http.models.search_result_workflow_schedule_execution_model import \
SearchResultWorkflowScheduleExecutionModel
from conductor.client.orkes.models.metadata_tag import MetadataTag


class SchedulerClient(ABC):
@abstractmethod
def saveSchedule(self, saveScheduleRequest: SaveScheduleRequest):
def save_schedule(self, save_schedule_request: SaveScheduleRequest):
pass

@abstractmethod
def getSchedule(self, name: str) -> (Optional[WorkflowSchedule], str):
def get_schedule(self, name: str) -> (Optional[WorkflowSchedule], str):
pass

@abstractmethod
def getAllSchedules(self, workflowName: Optional[str] = None) -> List[WorkflowSchedule]:
def get_all_schedules(self, workflow_name: Optional[str] = None) -> List[WorkflowSchedule]:
pass

@abstractmethod
def getNextFewScheduleExecutionTimes(self,
cronExpression: str,
scheduleStartTime: Optional[int] = None,
scheduleEndTime: Optional[int] = None,
limit: Optional[int] = None,
) -> List[int]:
def get_next_few_schedule_execution_times(self,
cron_expression: str,
schedule_start_time: Optional[int] = None,
schedule_end_time: Optional[int] = None,
limit: Optional[int] = None,
) -> List[int]:
pass

@abstractmethod
def deleteSchedule(self, name: str):
def delete_schedule(self, name: str):
pass

@abstractmethod
def pauseSchedule(self, name: str):
def pause_schedule(self, name: str):
pass

@abstractmethod
def pauseAllSchedules(self):
def pause_all_schedules(self):
pass

@abstractmethod
def resumeSchedule(self, name: str):
def resume_schedule(self, name: str):
pass

@abstractmethod
def resumeAllSchedules(self):
def resume_all_schedules(self):
pass

@abstractmethod
def searchScheduleExecutions(self,
start: Optional[int] = None,
size: Optional[int] = None,
sort: Optional[str] = None,
freeText: Optional[str] = None,
query: Optional[str] = None,
) -> SearchResultWorkflowScheduleExecutionModel:
def search_schedule_executions(self,
start: Optional[int] = None,
size: Optional[int] = None,
sort: Optional[str] = None,
free_text: Optional[str] = None,
query: Optional[str] = None,
) -> SearchResultWorkflowScheduleExecutionModel:
pass

@abstractmethod
def requeueAllExecutionRecords(self):
def requeue_all_execution_records(self):
pass

@abstractmethod
def setSchedulerTags(self, tags: List[MetadataTag], name: str):
def set_scheduler_tags(self, tags: List[MetadataTag], name: str):
pass

@abstractmethod
def getSchedulerTags(self, name: str) -> List[MetadataTag]:
def get_scheduler_tags(self, name: str) -> List[MetadataTag]:
pass

@abstractmethod
def deleteSchedulerTags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]:
def delete_scheduler_tags(self, tags: List[MetadataTag], name: str) -> List[MetadataTag]:
pass

Loading

0 comments on commit b189e48

Please sign in to comment.