diff --git a/examples/orkes/fork_join_script.py b/examples/orkes/fork_join_script.py new file mode 100644 index 00000000..8d7ac206 --- /dev/null +++ b/examples/orkes/fork_join_script.py @@ -0,0 +1,69 @@ +import json + +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import StartWorkflowRequest, RerunWorkflowRequest, TaskResult, WorkflowRun, \ + WorkflowDef +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.http.models.workflow_def import to_workflow_def +from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate +from conductor.client.orkes_clients import OrkesClients +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.fork_task import ForkTask +from conductor.client.workflow.task.http_task import HttpTask +from conductor.client.workflow.task.join_task import JoinTask +from conductor.client.workflow_client import WorkflowClient + + +def main(): + api_config = Configuration() + clients = OrkesClients(configuration=api_config) + workflow_client = clients.get_workflow_client() + executor = clients.get_workflow_executor() + + workflow = ConductorWorkflow(name='fork_join_example', version=1, executor=executor) + fork_size = 10 + tasks = [] + join_on = [] + for i in range(fork_size): + http = HttpTask(task_ref_name=f"http_{i}", + http_input={"uri": "https://orkes-api-tester.orkesconductor.com/api2"}) + http.optional = True + tasks.append([http]) + join_on.append(f"http_{i}") + + # HTTP tasks are marked as optional and the URL gives 404 error + # the script below checks if the tasks are completed or completed with errors and completes the join task + script = """ + (function(){ + let results = {}; + let pendingJoinsFound = false; + if($.joinOn){ + $.joinOn.forEach((element)=>{ + if($[element] && $[element].status !== 'COMPLETED' && $[element] && $[element].status !== 'COMPLETED_WITH_ERRORS'){ + results[element] = $[element].status; + pendingJoinsFound = true; + } + }); + if(pendingJoinsFound){ + return { + "status":"IN_PROGRESS", + "reasonForIncompletion":"Pending", + "outputData":{ + "scriptResults": results + } + }; + } + // To complete the Join - return true OR an object with status = 'COMPLETED' like above. + return true; + } + })(); + """ + join = JoinTask(task_ref_name='join', join_on_script=script, join_on=join_on) + fork = ForkTask(task_ref_name="fork", forked_tasks=tasks) + workflow >> fork >> join + workflow_id = workflow.start_workflow_with_input() + print(f'started workflow with id {workflow_id}') + + +if __name__ == '__main__': + main() diff --git a/src/conductor/client/workflow/conductor_workflow.py b/src/conductor/client/workflow/conductor_workflow.py index 9a973958..0bdbe4b1 100644 --- a/src/conductor/client/workflow/conductor_workflow.py +++ b/src/conductor/client/workflow/conductor_workflow.py @@ -192,6 +192,25 @@ def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str: start_workflow_request.version = self.version return self._executor.start_workflow(start_workflow_request) + def start_workflow_with_input(self, workflow_input: dict = {}, correlation_id=None, task_to_domain=None, + priority=None, idempotency_key: str = None, idempotency_strategy: IdempotencyStrategy = IdempotencyStrategy.FAIL) -> str: + """ + Starts the workflow with given inputs and parameters and returns the id of the started workflow + """ + + start_workflow_request = StartWorkflowRequest() + start_workflow_request.workflow_def = self.to_workflow_def() + start_workflow_request.name = self.name + start_workflow_request.version = self.version + start_workflow_request.input = workflow_input + start_workflow_request.correlation_id = correlation_id + start_workflow_request.idempotency_key = idempotency_key + start_workflow_request.idempotency_strategy = idempotency_strategy + start_workflow_request.priority = priority + start_workflow_request.task_to_domain =task_to_domain + + return self._executor.start_workflow(start_workflow_request) + def execute(self, workflow_input: Any = {}, wait_until_task_ref: str = '', wait_for_seconds: int = 10, request_id: str = None, idempotency_key: str = None, idempotency_strategy : IdempotencyStrategy = IdempotencyStrategy.FAIL, task_to_domain: dict[str, str] = None) -> WorkflowRun: diff --git a/src/conductor/client/workflow/task/join_task.py b/src/conductor/client/workflow/task/join_task.py index 5c735f34..1be017ba 100644 --- a/src/conductor/client/workflow/task/join_task.py +++ b/src/conductor/client/workflow/task/join_task.py @@ -9,12 +9,15 @@ class JoinTask(TaskInterface): - def __init__(self, task_ref_name: str, join_on: List[str] = None) -> Self: + def __init__(self, task_ref_name: str, join_on: List[str] = None, join_on_script: str = None) -> Self: super().__init__( task_reference_name=task_ref_name, task_type=TaskType.JOIN ) self._join_on = deepcopy(join_on) + if join_on_script is not None: + self.evaluator_type = 'js' + self.expression = join_on_script def to_workflow_task(self) -> WorkflowTask: workflow = super().to_workflow_task() diff --git a/src/conductor/client/workflow/task/task.py b/src/conductor/client/workflow/task/task.py index 8a55ff0a..48a6bf17 100644 --- a/src/conductor/client/workflow/task/task.py +++ b/src/conductor/client/workflow/task/task.py @@ -34,6 +34,8 @@ def __init__(self, self.input_parameters = input_parameters self._cache_key = cache_key self._cache_ttl_second = cache_ttl_second + self._expression = None + self._evaluator_type = None @property def task_reference_name(self) -> str: @@ -65,6 +67,22 @@ def name(self, name: str) -> None: raise Exception('invalid type') self._name = name + @property + def expression(self) -> str: + return self._expression + + @expression.setter + def expression(self, expression: str) -> None: + self._expression = expression + + @property + def evaluator_type(self) -> str: + return self._evaluator_type + + @evaluator_type.setter + def evaluator_type(self, evaluator_type: str) -> None: + self._evaluator_type = evaluator_type + def cache(self, cache_key: str, cache_ttl_second: int): self._cache_key = cache_key self._cache_ttl_second = cache_ttl_second @@ -122,7 +140,9 @@ def to_workflow_task(self) -> WorkflowTask: description=self._description, input_parameters=self._input_parameters, optional=self._optional, - cache_config=cache_config + cache_config=cache_config, + expression=self._expression, + evaluator_type=self._evaluator_type ) def output(self, json_path: str = None) -> str: