Skip to content

Commit

Permalink
task cache and http poll
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed May 1, 2024
1 parent 9f48978 commit 0e7caa2
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 14 deletions.
26 changes: 26 additions & 0 deletions examples/orkes/http_poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import uuid

from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_poll_task import HttpPollTask, HttpPollInput


def main():
workflow_executor = OrkesClients().get_workflow_executor()
workflow = ConductorWorkflow(executor=workflow_executor, name='http_poll_example_' + str(uuid.uuid4()))
http_poll = HttpPollTask(task_ref_name='http_poll_ref',
http_input=HttpPollInput(
uri='https://orkes-api-tester.orkesconductor.com/api',
polling_strategy='EXPONENTIAL_BACKOFF',
polling_interval=1000,
termination_condition='(function(){ return $.output.response.body.randomInt < 10;})();'),
)
workflow >> http_poll

# execute the workflow to get the results
result = workflow.execute(workflow_input={}, wait_for_seconds=10)
print(f'result: {result.output}')


if __name__ == '__main__':
main()
57 changes: 49 additions & 8 deletions src/conductor/client/http/models/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,39 @@
from conductor.client.http.models.state_change_event import StateChangeConfig, StateChangeEventType, StateChangeEvent


class WorkflowTask(object):
"""NOTE: This class is auto generated by the swagger code generator program.
class CacheConfig(object):
swagger_types = {
'key': 'str',
'ttl_in_second': 'int'
}

Do not edit the class manually.
"""
attribute_map = {
'key': 'key',
'ttl_in_second': 'ttlInSecond'
}

def __init__(self, key: str, ttl_in_second: int):
self._key = key
self._ttl_in_second = ttl_in_second

@property
def key(self):
return self._key

@key.setter
def key(self, key):
self._key = key

@property
def ttl_in_second(self):
return self._ttl_in_second

@ttl_in_second.setter
def ttl_in_second(self, ttl_in_second):
self._ttl_in_second = ttl_in_second


class WorkflowTask(object):
"""
Attributes:
swagger_types (dict): The key is attribute name
Expand Down Expand Up @@ -50,7 +78,8 @@ class WorkflowTask(object):
'evaluator_type': 'str',
'expression': 'str',
'workflow_task_type': 'str',
'on_state_change': 'dict(str, StateChangeConfig)'
'on_state_change': 'dict(str, StateChangeConfig)',
'cache_config': 'CacheConfig'
}

attribute_map = {
Expand Down Expand Up @@ -84,7 +113,8 @@ class WorkflowTask(object):
'evaluator_type': 'evaluatorType',
'expression': 'expression',
'workflow_task_type': 'workflowTaskType',
'on_state_change': 'onStateChange'
'on_state_change': 'onStateChange',
'cache_config': 'cacheConfig'
}

def __init__(self, name=None, task_reference_name=None, description=None, input_parameters=None, type=None,
Expand All @@ -94,7 +124,8 @@ def __init__(self, name=None, task_reference_name=None, description=None, input_
sub_workflow_param=None, join_on=None, sink=None, optional=None, task_definition=None,
rate_limited=None, default_exclusive_join_task=None, async_complete=None, loop_condition=None,
loop_over=None, retry_count=None, evaluator_type=None, expression=None,
workflow_task_type=None, on_state_change: dict[str, StateChangeConfig] = None): # noqa: E501
workflow_task_type=None, on_state_change: dict[str, StateChangeConfig] = None,
cache_config: CacheConfig = None): # noqa: E501
"""WorkflowTask - a model defined in Swagger""" # noqa: E501
self._name = None
self._task_reference_name = None
Expand Down Expand Up @@ -128,6 +159,7 @@ def __init__(self, name=None, task_reference_name=None, description=None, input_
self._workflow_task_type = None
self.discriminator = None
self._on_state_change = None
self._cache_config = None
self.name = name
self.task_reference_name = task_reference_name
if description is not None:
Expand Down Expand Up @@ -188,6 +220,7 @@ def __init__(self, name=None, task_reference_name=None, description=None, input_
self.workflow_task_type = workflow_task_type
if on_state_change is not None:
self._on_state_change = on_state_change
self._cache_config = cache_config

@property
def name(self):
Expand Down Expand Up @@ -823,9 +856,17 @@ def on_state_change(self) -> dict[str, List[StateChangeEvent]]:
@on_state_change.setter
def on_state_change(self, state_change: StateChangeConfig):
self._on_state_change = {
state_change.type : state_change.events
state_change.type: state_change.events
}

@property
def cache_config(self) -> CacheConfig:
return self._cache_config

@cache_config.setter
def cache_config(self, cache_config: CacheConfig):
self._cache_config = cache_config

def to_dict(self):
"""Returns the model properties as a dict"""
result = {}
Expand Down
14 changes: 13 additions & 1 deletion src/conductor/client/workflow/task/do_while_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import deepcopy
from typing import List
from typing import List, Dict, Any

from typing_extensions import Self

Expand Down Expand Up @@ -43,3 +43,15 @@ def __init__(self, task_ref_name: str, iterations: int, tasks: List[TaskInterfac
),
tasks=tasks,
)


class ForEachTask(DoWhileTask):
def __init__(self, task_ref_name: str, tasks: List[TaskInterface], iterate_over:str, variables: List[str] = None) -> Self:
super().__init__(
task_ref_name=task_ref_name,
termination_condition=get_for_loop_condition(
task_ref_name, 0,
),
tasks=tasks,
)
super().input_parameter("items", iterate_over)
76 changes: 76 additions & 0 deletions src/conductor/client/workflow/task/http_poll_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from copy import deepcopy
from enum import Enum
from typing import Any, Dict, List, Union

from typing_extensions import Self

from conductor.client.workflow.task.http_task import HttpTask, HttpInput, HttpMethod
from conductor.client.workflow.task.task import TaskInterface
from conductor.client.workflow.task.task_type import TaskType


class HttpPollInput():
swagger_types = {
'_uri': 'str',
'_method': 'str',
'_accept': 'list[str]',
'_headers': 'dict[str, list[str]]',
'_content_type': 'str',
'_connection_time_out': 'int',
'_read_timeout': 'int',
'_body': 'str',
'_termination_condition': 'str',
'_polling_interval': 'int',
'_max_poll_count': 'int',
'_polling_strategy': str
}

attribute_map = {
'_uri': 'uri',
'_method': 'method',
'_accept': 'accept',
'_headers': 'headers',
'_content_type': 'contentType',
'_connection_time_out': 'connectionTimeOut',
'_read_timeout': 'readTimeOut',
'_body': 'body',
'_termination_condition': 'terminationCondition',
'_polling_interval': 'pollingInterval',
'_max_poll_count': 'maxPollCount',
'_polling_strategy': 'pollingStrategy'
}

def __init__(self,
termination_condition: str = None,
max_poll_count : int = 100,
polling_interval : int = 100,
polling_strategy: str = 'FIXED',
method: HttpMethod = HttpMethod.GET,
uri: str = None,
headers: Dict[str, List[str]] = None,
accept: str = None,
content_type: str = None,
connection_time_out: int = None,
read_timeout: int = None,
body: Any = None) -> Self:
self._method = deepcopy(method)
self._uri = deepcopy(uri)
self._headers = deepcopy(headers)
self._accept = deepcopy(accept)
self._content_type = deepcopy(content_type)
self._connection_time_out = deepcopy(connection_time_out)
self._read_timeout = deepcopy(read_timeout)
self._body = deepcopy(body)
self._termination_condition = termination_condition
self._max_poll_count = max_poll_count
self._polling_interval = polling_interval
self._polling_strategy = polling_strategy


class HttpPollTask(TaskInterface):
def __init__(self, task_ref_name: str, http_input: HttpPollInput) -> Self:
super().__init__(
task_reference_name=task_ref_name,
task_type=TaskType.HTTP_POLL,
input_parameters={'http_request': http_input}
)
2 changes: 0 additions & 2 deletions src/conductor/client/workflow/task/http_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class HttpInput:
'_method': 'str',
'_accept': 'list[str]',
'_headers': 'dict[str, list[str]]',
'_accept': 'str',
'_content_type': 'str',
'_connection_time_out': 'int',
'_read_timeout': 'int',
Expand All @@ -35,7 +34,6 @@ class HttpInput:
'_method': 'method',
'_accept': 'accept',
'_headers': 'headers',
'_accept': 'accept',
'_content_type': 'contentType',
'_connection_time_out': 'connectionTimeOut',
'_read_timeout': 'readTimeOut',
Expand Down
21 changes: 18 additions & 3 deletions src/conductor/client/workflow/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from typing_extensions import Self, Union

from conductor.client.http.models.workflow_task import WorkflowTask
from conductor.client.http.models.workflow_task import WorkflowTask, CacheConfig
from conductor.client.workflow.task.task_type import TaskType


Expand All @@ -23,13 +23,17 @@ def __init__(self,
task_name: str = None,
description: str = None,
optional: bool = None,
input_parameters: Dict[str, Any] = None) -> Self:
input_parameters: Dict[str, Any] = None,
cache_key : str = None,
cache_ttl_second : int = 0) -> Self:
self.task_reference_name = task_reference_name
self.task_type = task_type
self.name = task_name or task_reference_name
self.description = description
self.optional = optional
self.input_parameters = input_parameters
self._cache_key = cache_key
self._cache_ttl_second = cache_ttl_second

@property
def task_reference_name(self) -> str:
Expand Down Expand Up @@ -61,6 +65,10 @@ def name(self, name: str) -> None:
raise Exception('invalid type')
self._name = name

def cache(self, cache_key: str, cache_ttl_second: int):
self._cache_key = cache_key
self._cache_ttl_second = cache_ttl_second

@property
def description(self) -> str:
return self._description
Expand Down Expand Up @@ -104,20 +112,27 @@ def input_parameter(self, key: str, value: Any) -> Self:
return self

def to_workflow_task(self) -> WorkflowTask:
cache_config = None
if self._cache_ttl_second > 0 and self._cache_key is not None:
cache_config = CacheConfig(key=self._cache_key, ttl_in_second=self._cache_ttl_second)
return WorkflowTask(
name=self._name,
task_reference_name=self._task_reference_name,
type=self._task_type.value,
description=self._description,
input_parameters=self._input_parameters,
optional=self._optional,
cache_config=cache_config
)

def output(self, json_path: str = None) -> str:
if json_path is None:
return '${' + f'{self.task_reference_name}.output' + '}'
else:
return '${' + f'{self.task_reference_name}.output.{json_path}' + '}'
if json_path.startswith('.'):
return '${' + f'{self.task_reference_name}.output{json_path}' + '}'
else:
return '${' + f'{self.task_reference_name}.output.{json_path}' + '}'

def input(self, json_path: str = None, key : str = None, value : Any = None) -> Union[str, Self]:
if key is not None and value is not None:
Expand Down
1 change: 1 addition & 0 deletions src/conductor/client/workflow/task/task_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class TaskType(str, Enum):
HUMAN = 'HUMAN'
USER_DEFINED = 'USER_DEFINED'
HTTP = 'HTTP'
HTTP_POLL = 'HTTP_POLL'
LAMBDA = 'LAMBDA'
INLINE = 'INLINE'
EXCLUSIVE_JOIN = 'EXCLUSIVE_JOIN'
Expand Down

0 comments on commit 0e7caa2

Please sign in to comment.