From ff7f24e5b4c86e2e51720dd8c1174d8f9cd9f685 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 1 May 2024 22:28:39 -0700 Subject: [PATCH] Improvements and features (#260) --- examples/helloworld/greetings_worker.py | 2 +- examples/{ => orkes}/copilot/README.md | 0 examples/{ => orkes}/copilot/__init__.py | 0 examples/{ => orkes}/copilot/customer.py | 0 .../{ => orkes}/copilot/open_ai_copilot.py | 0 examples/orkes/http_poll.py | 26 +++ examples/orkes/multiagent_chat.py | 218 ++++++++++++++++++ examples/shell_worker.py | 36 +++ examples/untrusted_host.py | 48 ++++ src/conductor/client/http/models/__init__.py | 1 + .../client/http/models/workflow_status.py | 18 ++ .../client/http/models/workflow_task.py | 57 ++++- .../client/workflow/conductor_workflow.py | 4 +- .../client/workflow/task/do_while_task.py | 14 +- .../client/workflow/task/http_poll_task.py | 76 ++++++ .../client/workflow/task/http_task.py | 2 - src/conductor/client/workflow/task/task.py | 21 +- .../client/workflow/task/task_type.py | 1 + 18 files changed, 508 insertions(+), 16 deletions(-) rename examples/{ => orkes}/copilot/README.md (100%) rename examples/{ => orkes}/copilot/__init__.py (100%) rename examples/{ => orkes}/copilot/customer.py (100%) rename examples/{ => orkes}/copilot/open_ai_copilot.py (100%) create mode 100644 examples/orkes/http_poll.py create mode 100644 examples/orkes/multiagent_chat.py create mode 100644 examples/shell_worker.py create mode 100644 examples/untrusted_host.py create mode 100644 src/conductor/client/workflow/task/http_poll_task.py diff --git a/examples/helloworld/greetings_worker.py b/examples/helloworld/greetings_worker.py index 5747670b..2d2437a4 100644 --- a/examples/helloworld/greetings_worker.py +++ b/examples/helloworld/greetings_worker.py @@ -1,7 +1,7 @@ """ This file contains a Simple Worker that can be used in any workflow. For detailed information https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-2-write-worker -"""" +""" from conductor.client.worker.worker_task import worker_task diff --git a/examples/copilot/README.md b/examples/orkes/copilot/README.md similarity index 100% rename from examples/copilot/README.md rename to examples/orkes/copilot/README.md diff --git a/examples/copilot/__init__.py b/examples/orkes/copilot/__init__.py similarity index 100% rename from examples/copilot/__init__.py rename to examples/orkes/copilot/__init__.py diff --git a/examples/copilot/customer.py b/examples/orkes/copilot/customer.py similarity index 100% rename from examples/copilot/customer.py rename to examples/orkes/copilot/customer.py diff --git a/examples/copilot/open_ai_copilot.py b/examples/orkes/copilot/open_ai_copilot.py similarity index 100% rename from examples/copilot/open_ai_copilot.py rename to examples/orkes/copilot/open_ai_copilot.py diff --git a/examples/orkes/http_poll.py b/examples/orkes/http_poll.py new file mode 100644 index 00000000..83dfd921 --- /dev/null +++ b/examples/orkes/http_poll.py @@ -0,0 +1,26 @@ +import uuid + +from conductor.client.orkes_clients import OrkesClients +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.http_poll_task import HttpPollTask, HttpPollInput + + +def main(): + workflow_executor = OrkesClients().get_workflow_executor() + workflow = ConductorWorkflow(executor=workflow_executor, name='http_poll_example_' + str(uuid.uuid4())) + http_poll = HttpPollTask(task_ref_name='http_poll_ref', + http_input=HttpPollInput( + uri='https://orkes-api-tester.orkesconductor.com/api', + polling_strategy='EXPONENTIAL_BACKOFF', + polling_interval=1000, + termination_condition='(function(){ return $.output.response.body.randomInt < 10;})();'), + ) + workflow >> http_poll + + # execute the workflow to get the results + result = workflow.execute(workflow_input={}, wait_for_seconds=10) + print(f'result: {result.output}') + + +if __name__ == '__main__': + main() diff --git a/examples/orkes/multiagent_chat.py b/examples/orkes/multiagent_chat.py new file mode 100644 index 00000000..41714a1a --- /dev/null +++ b/examples/orkes/multiagent_chat.py @@ -0,0 +1,218 @@ +import time +import uuid +from typing import List + +from conductor.client.ai.orchestrator import AIOrchestrator +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.orkes_clients import OrkesClients +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.do_while_task import LoopTask +from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete, ChatMessage +from conductor.client.workflow.task.set_variable_task import SetVariableTask +from conductor.client.workflow.task.simple_task import SimpleTask +from conductor.client.workflow.task.switch_task import SwitchTask +from conductor.client.workflow.task.timeout_policy import TimeoutPolicy + + +def main(): + agent1_provider = 'openai_v1' + agent1_model = 'gpt-4' + + agent1_provider = 'mistral' + agent1_model = 'mistral-large-latest' + + agent2_provider = 'anthropic_cloud' + agent2_model = 'claude-3-sonnet-20240229' + # anthropic_model = 'claude-3-opus-20240229' + + moderator_provider = 'cohere_saas' + moderator_model = 'command-r' + + mistral = 'mistral' + mistral_model = 'mistral-large-latest' + + api_config = Configuration() + + clients = OrkesClients(configuration=api_config) + workflow_executor = clients.get_workflow_executor() + workflow_client = clients.get_workflow_client() + + moderator = 'moderator' + moderator_text = """You are very good at moderating the debates and discussions. In this discussion, there are 2 panelists, ${ua1} and ${ua2}. + As a moderator, you summarize the discussion so far, pick one of the panelist ${ua1} or ${ua2} and ask them a relevant question to continue the discussion. + You are also an expert in formatting the results into structured json format. You only output a valid JSON as a response. + You answer in RFC8259 compliant + JSON format ONLY with two fields result and user. You can effectively manage a hot discussion while keeping it + quite civil and also at the same time continue the discussion forward encouraging participants and their views. + Your answer MUST be in a JSON dictionary with keys "result" and "user". Before answer, check the output for correctness of the JSON format. + The values MUST not have new lines or special characters that are not escaped. The JSON must be RFC8259 compliant. + + You produce the output in the following JSON keys: + + { + "result": ACTUAL_MESSAGE + "user": USER_WHO_SOULD_RESPOND_NEXT --> One of ${ua1} or ${ua2} + } + + "result" should summarize the conversation so far and add the last message in the conversation. + "user" should be the one who should respond next. + You be fair in giving chance to all participants, alternating between ${ua1} and ${ua2}. + the last person to talk was ${last_user} + Do not repeat what you have said before and do not summarize the discussion each time, + just use first person voice to ask questions to move discussion forward. + Do not use filler sentences like 'in this discussion....' + JSON: + + """ + + agent1 = 'agent_1' + agent1_text = """ + You are ${ua1} and you reason and think like ${ua1}. Your language reflects your persona. + You are very good at analysis of the content and coming up with insights and questions on the subject and the context. + You are in a panel with other participants discussing a specific event/topic as set in the context. + You avoid any repetitive argument, discussion that you have already talked about. + Here is the context on the conversation, add a follow up with your insights and questions to the conversation: + Do not mention that you are an AI model. + ${context} + + You answer in a very clear way, do not add any preamble to the response: + """ + + agent2 = 'agent_2' + agent2_text = """ + You are ${ua2} and you reason and think like ${ua2}. Your language reflects your persona. + You are very good at continuing the conversation with more insightful question. + You are in a panel with other participants discussing a specific event/topic as set in the context. + You bring in your contrarian views to the conversation and always challenge the norms. + You avoid any repetitive argument, discussion that you have already talked about. + Your responses are times extreme and a bit hyperbolic. + When given the history of conversation, you ask a meaningful followup question that continues to conversation + and dives deeper into the topic. + Do not mention that you are an AI model. + Here is the context on the conversation: + ${context} + + You answer in a very clear way, do not add any preamble to the response: + """ + + orchestrator = AIOrchestrator(api_configuration=api_config) + + orchestrator.add_prompt_template(moderator, moderator_text, 'moderator instructions') + orchestrator.associate_prompt_template(moderator, moderator_provider, [moderator_model]) + + orchestrator.add_prompt_template(agent1, agent1_text, 'agent1 instructions') + orchestrator.associate_prompt_template(agent1, agent1_provider, [agent1_model]) + + orchestrator.add_prompt_template(agent2, agent2_text, 'agent2 instructions') + orchestrator.associate_prompt_template(agent2, agent2_provider, [agent2_model]) + + get_context = SimpleTask(task_reference_name='get_document', task_def_name='GET_DOCUMENT') + get_context.input_parameter('url','${workflow.input.url}') + + wf_input = {'ua1': 'donald trump', 'ua2': 'joe biden', 'last_user': '${workflow.variables.last_user}', + 'url': 'https://www.foxnews.com/media/billionaire-mark-cuban-dodges-question-asking-pays-fair-share-taxes-pay-owe'} + + template_vars = { + 'context': get_context.output('result'), + 'ua1': '${workflow.input.ua1}', + 'ua2': '${workflow.input.ua2}', + } + + max_tokens = 500 + moderator_task = LlmChatComplete(task_ref_name='moderator_ref', + max_tokens=2000, + llm_provider=moderator_provider, model=moderator_model, + instructions_template=moderator, + messages='${workflow.variables.history}', + template_variables={ + 'ua1': '${workflow.input.ua1}', + 'ua2': '${workflow.input.ua2}', + 'last_user': '${workflow.variables.last_user}' + }) + + agent1_task = LlmChatComplete(task_ref_name='agent1_ref', + max_tokens=max_tokens, + llm_provider=agent1_provider, model=agent1_model, + instructions_template=agent1, + messages=[ChatMessage(role='user', message=moderator_task.output('result'))], + template_variables=template_vars) + + set_variable1 = (SetVariableTask(task_ref_name='task_ref_name1') + .input_parameter('history', + [ + ChatMessage(role='assistant', message=moderator_task.output('result')), + ChatMessage(role='user', + message='[' + '${workflow.input.ua1}] ' + f'{agent1_task.output("result")}') + ]) + .input_parameter('_merge', True) + .input_parameter('last_user', "${workflow.input.ua1}")) + + agent2_task = LlmChatComplete(task_ref_name='agent2_ref', + max_tokens=max_tokens, + llm_provider=agent2_provider, model=agent2_model, + instructions_template=agent2, + messages=[ChatMessage(role='user', message=moderator_task.output('result'))], + template_variables=template_vars) + + set_variable2 = (SetVariableTask(task_ref_name='task_ref_name2') + .input_parameter('history', [ + ChatMessage(role='assistant', message=moderator_task.output('result')), + ChatMessage(role='user', message='[' + '${workflow.input.ua2}] ' + f'{agent2_task.output("result")}') + ]) + .input_parameter('_merge', True) + .input_parameter('last_user', "${workflow.input.ua2}")) + + init = SetVariableTask(task_ref_name='init_ref') + init.input_parameter('history', + [ChatMessage(role='user', + message="""analyze the following context: + BEGIN + ${get_document.output.result} + END """)] + ) + init.input_parameter('last_user', '') + + wf = ConductorWorkflow(name='multiparty_chat_tmp', version=1, executor=workflow_executor) + + script = """ + (function(){ + if ($.user == $.ua1) return 'ua1'; + if ($.user == $.ua2) return 'ua2'; + return 'ua1'; + })(); + """ + next_up = SwitchTask(task_ref_name='next_up_ref', case_expression=script, use_javascript=True) + next_up.switch_case('ua1', [agent1_task, set_variable1]) + next_up.switch_case('ua2', [agent2_task, set_variable2]) + next_up.input_parameter('user', moderator_task.output('user')) + next_up.input_parameter('ua1', '${workflow.input.ua1}') + next_up.input_parameter('ua2', '${workflow.input.ua2}') + + loop_tasks = [moderator_task, next_up] + chat_loop = LoopTask(task_ref_name='loop', iterations=6, tasks=loop_tasks) + wf >> get_context >> init >> chat_loop + + + + wf.timeout_seconds(1200).timeout_policy(timeout_policy=TimeoutPolicy.TIME_OUT_WORKFLOW) + wf.register(overwrite=True) + + result = wf.execute(wait_until_task_ref=agent1_task.task_reference_name, wait_for_seconds=1, + workflow_input=wf_input) + + result = workflow_client.get_workflow_status(result.workflow_id, include_output=True, include_variables=True) + print(f'started workflow {api_config.ui_host}/{result.workflow_id}') + while result.is_running(): + time.sleep(10) # wait for 10 seconds LLMs are slow! + result = workflow_client.get_workflow_status(result.workflow_id, include_output=True, include_variables=True) + op = result.variables['history'] + if len(op) > 1: + print('=======================================') + print(f'{op[len(op) - 1]["message"]}') + print('\n') + + +if __name__ == '__main__': + main() diff --git a/examples/shell_worker.py b/examples/shell_worker.py new file mode 100644 index 00000000..24b122f7 --- /dev/null +++ b/examples/shell_worker.py @@ -0,0 +1,36 @@ +import subprocess +from typing import List + +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.worker.worker_task import worker_task + + +# @worker_task(task_definition_name='shell') +def execute_shell(command: str, args: List[str]) -> str: + full_command = [command] + full_command = full_command + args + result = subprocess.run(full_command, stdout=subprocess.PIPE) + + return str(result.stdout) + +@worker_task(task_definition_name='task_with_retries2') +def execute_shell() -> str: + return "hello" + +def main(): + # defaults to reading the configuration using following env variables + # CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api + # CONDUCTOR_AUTH_KEY : API Authentication Key + # CONDUCTOR_AUTH_SECRET: API Auth Secret + api_config = Configuration() + + + task_handler = TaskHandler(configuration=api_config) + task_handler.start_processes() + + task_handler.join_processes() + + +if __name__ == '__main__': + main() diff --git a/examples/untrusted_host.py b/examples/untrusted_host.py new file mode 100644 index 00000000..002c81b9 --- /dev/null +++ b/examples/untrusted_host.py @@ -0,0 +1,48 @@ +import urllib3 + +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings +from conductor.client.http.api_client import ApiClient +from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient +from conductor.client.orkes.orkes_task_client import OrkesTaskClient +from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor +from greetings_workflow import greetings_workflow +import requests + + +def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow: + workflow = greetings_workflow(workflow_executor=workflow_executor) + workflow.register(True) + return workflow + + +@worker_task(task_definition_name='hello') +def hello(name: str) -> str: + print(f'executing.... {name}') + return f'Hello {name}' + + +def main(): + urllib3.disable_warnings() + + # points to http://localhost:8080/api by default + api_config = Configuration() + api_config.http_connection = requests.Session() + api_config.http_connection.verify = False + + metadata_client = OrkesMetadataClient(api_config) + task_client = OrkesTaskClient(api_config) + workflow_client = OrkesWorkflowClient(api_config) + + task_handler = TaskHandler(configuration=api_config) + task_handler.start_processes() + + # task_handler.stop_processes() + + +if __name__ == '__main__': + main() diff --git a/src/conductor/client/http/models/__init__.py b/src/conductor/client/http/models/__init__.py index 1d430fd5..68146aa4 100644 --- a/src/conductor/client/http/models/__init__.py +++ b/src/conductor/client/http/models/__init__.py @@ -52,3 +52,4 @@ from conductor.client.http.models.integration import Integration from conductor.client.http.models.integration_api import IntegrationApi from conductor.client.http.models.state_change_event import StateChangeEvent, StateChangeConfig, StateChangeEventType +from conductor.client.http.models.workflow_task import CacheConfig diff --git a/src/conductor/client/http/models/workflow_status.py b/src/conductor/client/http/models/workflow_status.py index 3fd60ba5..7f9dbf89 100644 --- a/src/conductor/client/http/models/workflow_status.py +++ b/src/conductor/client/http/models/workflow_status.py @@ -3,6 +3,9 @@ import six +terminal_status = ('COMPLETED', 'FAILED', 'TIMED_OUT', 'TERMINATED') +successful_status = ('PAUSED', 'COMPLETED') +running_status = ('RUNNING', 'PAUSED') class WorkflowStatus(object): """NOTE: This class is auto generated by the swagger code generator program. @@ -189,6 +192,21 @@ def to_dict(self): return result + def is_completed(self) -> bool: + """Checks if the workflow has completed + :return: True if the workflow status is COMPLETED, FAILED or TERMINATED + """ + return self._status in terminal_status + + def is_successful(self) -> bool: + """Checks if the workflow has completed in successful state (ie COMPLETED) + :return: True if the workflow status is COMPLETED + """ + return self._status in successful_status + + def is_running(self) -> bool: + return self.status in running_status + def to_str(self): """Returns the string representation of the model""" return pprint.pformat(self.to_dict()) diff --git a/src/conductor/client/http/models/workflow_task.py b/src/conductor/client/http/models/workflow_task.py index 9c8ea72c..35b00571 100644 --- a/src/conductor/client/http/models/workflow_task.py +++ b/src/conductor/client/http/models/workflow_task.py @@ -7,11 +7,39 @@ from conductor.client.http.models.state_change_event import StateChangeConfig, StateChangeEventType, StateChangeEvent -class WorkflowTask(object): - """NOTE: This class is auto generated by the swagger code generator program. +class CacheConfig(object): + swagger_types = { + 'key': 'str', + 'ttl_in_second': 'int' + } - Do not edit the class manually. - """ + attribute_map = { + 'key': 'key', + 'ttl_in_second': 'ttlInSecond' + } + + def __init__(self, key: str, ttl_in_second: int): + self._key = key + self._ttl_in_second = ttl_in_second + + @property + def key(self): + return self._key + + @key.setter + def key(self, key): + self._key = key + + @property + def ttl_in_second(self): + return self._ttl_in_second + + @ttl_in_second.setter + def ttl_in_second(self, ttl_in_second): + self._ttl_in_second = ttl_in_second + + +class WorkflowTask(object): """ Attributes: swagger_types (dict): The key is attribute name @@ -50,7 +78,8 @@ class WorkflowTask(object): 'evaluator_type': 'str', 'expression': 'str', 'workflow_task_type': 'str', - 'on_state_change': 'dict(str, StateChangeConfig)' + 'on_state_change': 'dict(str, StateChangeConfig)', + 'cache_config': 'CacheConfig' } attribute_map = { @@ -84,7 +113,8 @@ class WorkflowTask(object): 'evaluator_type': 'evaluatorType', 'expression': 'expression', 'workflow_task_type': 'workflowTaskType', - 'on_state_change': 'onStateChange' + 'on_state_change': 'onStateChange', + 'cache_config': 'cacheConfig' } def __init__(self, name=None, task_reference_name=None, description=None, input_parameters=None, type=None, @@ -94,7 +124,8 @@ def __init__(self, name=None, task_reference_name=None, description=None, input_ sub_workflow_param=None, join_on=None, sink=None, optional=None, task_definition=None, rate_limited=None, default_exclusive_join_task=None, async_complete=None, loop_condition=None, loop_over=None, retry_count=None, evaluator_type=None, expression=None, - workflow_task_type=None, on_state_change: dict[str, StateChangeConfig] = None): # noqa: E501 + workflow_task_type=None, on_state_change: dict[str, StateChangeConfig] = None, + cache_config: CacheConfig = None): # noqa: E501 """WorkflowTask - a model defined in Swagger""" # noqa: E501 self._name = None self._task_reference_name = None @@ -128,6 +159,7 @@ def __init__(self, name=None, task_reference_name=None, description=None, input_ self._workflow_task_type = None self.discriminator = None self._on_state_change = None + self._cache_config = None self.name = name self.task_reference_name = task_reference_name if description is not None: @@ -188,6 +220,7 @@ def __init__(self, name=None, task_reference_name=None, description=None, input_ self.workflow_task_type = workflow_task_type if on_state_change is not None: self._on_state_change = on_state_change + self._cache_config = cache_config @property def name(self): @@ -823,9 +856,17 @@ def on_state_change(self) -> dict[str, List[StateChangeEvent]]: @on_state_change.setter def on_state_change(self, state_change: StateChangeConfig): self._on_state_change = { - state_change.type : state_change.events + state_change.type: state_change.events } + @property + def cache_config(self) -> CacheConfig: + return self._cache_config + + @cache_config.setter + def cache_config(self, cache_config: CacheConfig): + self._cache_config = cache_config + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/src/conductor/client/workflow/conductor_workflow.py b/src/conductor/client/workflow/conductor_workflow.py index d99afe12..9a973958 100644 --- a/src/conductor/client/workflow/conductor_workflow.py +++ b/src/conductor/client/workflow/conductor_workflow.py @@ -194,7 +194,7 @@ def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str: def execute(self, workflow_input: Any = {}, wait_until_task_ref: str = '', wait_for_seconds: int = 10, request_id: str = None, - idempotency_key: str = None, idempotency_strategy : IdempotencyStrategy = IdempotencyStrategy.FAIL) -> WorkflowRun: + idempotency_key: str = None, idempotency_strategy : IdempotencyStrategy = IdempotencyStrategy.FAIL, task_to_domain: dict[str, str] = None) -> WorkflowRun: """ Executes a workflow synchronously. Useful for short duration workflow (e.g. < 20 seconds) Parameters @@ -216,6 +216,8 @@ def execute(self, workflow_input: Any = {}, wait_until_task_ref: str = '', wait_ if idempotency_key is not None: request.idempotency_key = idempotency_key request.idempotency_strategy = idempotency_strategy + if task_to_domain is not None: + request.task_to_domain = task_to_domain run = self._executor.execute_workflow(request, wait_until_task_ref=wait_until_task_ref, wait_for_seconds=wait_for_seconds, request_id=request_id) diff --git a/src/conductor/client/workflow/task/do_while_task.py b/src/conductor/client/workflow/task/do_while_task.py index ef7331c7..a5422cdb 100644 --- a/src/conductor/client/workflow/task/do_while_task.py +++ b/src/conductor/client/workflow/task/do_while_task.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import List +from typing import List, Dict, Any from typing_extensions import Self @@ -43,3 +43,15 @@ def __init__(self, task_ref_name: str, iterations: int, tasks: List[TaskInterfac ), tasks=tasks, ) + + +class ForEachTask(DoWhileTask): + def __init__(self, task_ref_name: str, tasks: List[TaskInterface], iterate_over:str, variables: List[str] = None) -> Self: + super().__init__( + task_ref_name=task_ref_name, + termination_condition=get_for_loop_condition( + task_ref_name, 0, + ), + tasks=tasks, + ) + super().input_parameter("items", iterate_over) diff --git a/src/conductor/client/workflow/task/http_poll_task.py b/src/conductor/client/workflow/task/http_poll_task.py new file mode 100644 index 00000000..a958ef9c --- /dev/null +++ b/src/conductor/client/workflow/task/http_poll_task.py @@ -0,0 +1,76 @@ +from copy import deepcopy +from enum import Enum +from typing import Any, Dict, List, Union + +from typing_extensions import Self + +from conductor.client.workflow.task.http_task import HttpTask, HttpInput, HttpMethod +from conductor.client.workflow.task.task import TaskInterface +from conductor.client.workflow.task.task_type import TaskType + + +class HttpPollInput(): + swagger_types = { + '_uri': 'str', + '_method': 'str', + '_accept': 'list[str]', + '_headers': 'dict[str, list[str]]', + '_content_type': 'str', + '_connection_time_out': 'int', + '_read_timeout': 'int', + '_body': 'str', + '_termination_condition': 'str', + '_polling_interval': 'int', + '_max_poll_count': 'int', + '_polling_strategy': str + } + + attribute_map = { + '_uri': 'uri', + '_method': 'method', + '_accept': 'accept', + '_headers': 'headers', + '_content_type': 'contentType', + '_connection_time_out': 'connectionTimeOut', + '_read_timeout': 'readTimeOut', + '_body': 'body', + '_termination_condition': 'terminationCondition', + '_polling_interval': 'pollingInterval', + '_max_poll_count': 'maxPollCount', + '_polling_strategy': 'pollingStrategy' + } + + def __init__(self, + termination_condition: str = None, + max_poll_count : int = 100, + polling_interval : int = 100, + polling_strategy: str = 'FIXED', + method: HttpMethod = HttpMethod.GET, + uri: str = None, + headers: Dict[str, List[str]] = None, + accept: str = None, + content_type: str = None, + connection_time_out: int = None, + read_timeout: int = None, + body: Any = None) -> Self: + self._method = deepcopy(method) + self._uri = deepcopy(uri) + self._headers = deepcopy(headers) + self._accept = deepcopy(accept) + self._content_type = deepcopy(content_type) + self._connection_time_out = deepcopy(connection_time_out) + self._read_timeout = deepcopy(read_timeout) + self._body = deepcopy(body) + self._termination_condition = termination_condition + self._max_poll_count = max_poll_count + self._polling_interval = polling_interval + self._polling_strategy = polling_strategy + + +class HttpPollTask(TaskInterface): + def __init__(self, task_ref_name: str, http_input: HttpPollInput) -> Self: + super().__init__( + task_reference_name=task_ref_name, + task_type=TaskType.HTTP_POLL, + input_parameters={'http_request': http_input} + ) diff --git a/src/conductor/client/workflow/task/http_task.py b/src/conductor/client/workflow/task/http_task.py index 7dd0729f..bdc459b5 100644 --- a/src/conductor/client/workflow/task/http_task.py +++ b/src/conductor/client/workflow/task/http_task.py @@ -23,7 +23,6 @@ class HttpInput: '_method': 'str', '_accept': 'list[str]', '_headers': 'dict[str, list[str]]', - '_accept': 'str', '_content_type': 'str', '_connection_time_out': 'int', '_read_timeout': 'int', @@ -35,7 +34,6 @@ class HttpInput: '_method': 'method', '_accept': 'accept', '_headers': 'headers', - '_accept': 'accept', '_content_type': 'contentType', '_connection_time_out': 'connectionTimeOut', '_read_timeout': 'readTimeOut', diff --git a/src/conductor/client/workflow/task/task.py b/src/conductor/client/workflow/task/task.py index 397fd885..8a55ff0a 100644 --- a/src/conductor/client/workflow/task/task.py +++ b/src/conductor/client/workflow/task/task.py @@ -4,7 +4,7 @@ from typing_extensions import Self, Union -from conductor.client.http.models.workflow_task import WorkflowTask +from conductor.client.http.models.workflow_task import WorkflowTask, CacheConfig from conductor.client.workflow.task.task_type import TaskType @@ -23,13 +23,17 @@ def __init__(self, task_name: str = None, description: str = None, optional: bool = None, - input_parameters: Dict[str, Any] = None) -> Self: + input_parameters: Dict[str, Any] = None, + cache_key : str = None, + cache_ttl_second : int = 0) -> Self: self.task_reference_name = task_reference_name self.task_type = task_type self.name = task_name or task_reference_name self.description = description self.optional = optional self.input_parameters = input_parameters + self._cache_key = cache_key + self._cache_ttl_second = cache_ttl_second @property def task_reference_name(self) -> str: @@ -61,6 +65,10 @@ def name(self, name: str) -> None: raise Exception('invalid type') self._name = name + def cache(self, cache_key: str, cache_ttl_second: int): + self._cache_key = cache_key + self._cache_ttl_second = cache_ttl_second + @property def description(self) -> str: return self._description @@ -104,6 +112,9 @@ def input_parameter(self, key: str, value: Any) -> Self: return self def to_workflow_task(self) -> WorkflowTask: + cache_config = None + if self._cache_ttl_second > 0 and self._cache_key is not None: + cache_config = CacheConfig(key=self._cache_key, ttl_in_second=self._cache_ttl_second) return WorkflowTask( name=self._name, task_reference_name=self._task_reference_name, @@ -111,13 +122,17 @@ def to_workflow_task(self) -> WorkflowTask: description=self._description, input_parameters=self._input_parameters, optional=self._optional, + cache_config=cache_config ) def output(self, json_path: str = None) -> str: if json_path is None: return '${' + f'{self.task_reference_name}.output' + '}' else: - return '${' + f'{self.task_reference_name}.output.{json_path}' + '}' + if json_path.startswith('.'): + return '${' + f'{self.task_reference_name}.output{json_path}' + '}' + else: + return '${' + f'{self.task_reference_name}.output.{json_path}' + '}' def input(self, json_path: str = None, key : str = None, value : Any = None) -> Union[str, Self]: if key is not None and value is not None: diff --git a/src/conductor/client/workflow/task/task_type.py b/src/conductor/client/workflow/task/task_type.py index 7c68878c..60a36a92 100644 --- a/src/conductor/client/workflow/task/task_type.py +++ b/src/conductor/client/workflow/task/task_type.py @@ -18,6 +18,7 @@ class TaskType(str, Enum): HUMAN = 'HUMAN' USER_DEFINED = 'USER_DEFINED' HTTP = 'HTTP' + HTTP_POLL = 'HTTP_POLL' LAMBDA = 'LAMBDA' INLINE = 'INLINE' EXCLUSIVE_JOIN = 'EXCLUSIVE_JOIN'