Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for script with JOIN tasks #265

Merged
merged 1 commit into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions examples/orkes/fork_join_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import json

from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import StartWorkflowRequest, RerunWorkflowRequest, TaskResult, WorkflowRun, \
WorkflowDef
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.models.workflow_def import to_workflow_def
from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.fork_task import ForkTask
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.join_task import JoinTask
from conductor.client.workflow_client import WorkflowClient


def main():
api_config = Configuration()
clients = OrkesClients(configuration=api_config)
workflow_client = clients.get_workflow_client()
executor = clients.get_workflow_executor()

workflow = ConductorWorkflow(name='fork_join_example', version=1, executor=executor)
fork_size = 10
tasks = []
join_on = []
for i in range(fork_size):
http = HttpTask(task_ref_name=f"http_{i}",
http_input={"uri": "https://orkes-api-tester.orkesconductor.com/api2"})
http.optional = True
tasks.append([http])
join_on.append(f"http_{i}")

# HTTP tasks are marked as optional and the URL gives 404 error
# the script below checks if the tasks are completed or completed with errors and completes the join task
script = """
(function(){
let results = {};
let pendingJoinsFound = false;
if($.joinOn){
$.joinOn.forEach((element)=>{
if($[element] && $[element].status !== 'COMPLETED' && $[element] && $[element].status !== 'COMPLETED_WITH_ERRORS'){
results[element] = $[element].status;
pendingJoinsFound = true;
}
});
if(pendingJoinsFound){
return {
"status":"IN_PROGRESS",
"reasonForIncompletion":"Pending",
"outputData":{
"scriptResults": results
}
};
}
// To complete the Join - return true OR an object with status = 'COMPLETED' like above.
return true;
}
})();
"""
join = JoinTask(task_ref_name='join', join_on_script=script, join_on=join_on)
fork = ForkTask(task_ref_name="fork", forked_tasks=tasks)
workflow >> fork >> join
workflow_id = workflow.start_workflow_with_input()
print(f'started workflow with id {workflow_id}')


if __name__ == '__main__':
main()
19 changes: 19 additions & 0 deletions src/conductor/client/workflow/conductor_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str:
start_workflow_request.version = self.version
return self._executor.start_workflow(start_workflow_request)

def start_workflow_with_input(self, workflow_input: dict = {}, correlation_id=None, task_to_domain=None,
priority=None, idempotency_key: str = None, idempotency_strategy: IdempotencyStrategy = IdempotencyStrategy.FAIL) -> str:
"""
Starts the workflow with given inputs and parameters and returns the id of the started workflow
"""

start_workflow_request = StartWorkflowRequest()
start_workflow_request.workflow_def = self.to_workflow_def()
start_workflow_request.name = self.name
start_workflow_request.version = self.version
start_workflow_request.input = workflow_input
start_workflow_request.correlation_id = correlation_id
start_workflow_request.idempotency_key = idempotency_key
start_workflow_request.idempotency_strategy = idempotency_strategy
start_workflow_request.priority = priority
start_workflow_request.task_to_domain =task_to_domain

return self._executor.start_workflow(start_workflow_request)

def execute(self, workflow_input: Any = {}, wait_until_task_ref: str = '', wait_for_seconds: int = 10,
request_id: str = None,
idempotency_key: str = None, idempotency_strategy : IdempotencyStrategy = IdempotencyStrategy.FAIL, task_to_domain: dict[str, str] = None) -> WorkflowRun:
Expand Down
5 changes: 4 additions & 1 deletion src/conductor/client/workflow/task/join_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@


class JoinTask(TaskInterface):
def __init__(self, task_ref_name: str, join_on: List[str] = None) -> Self:
def __init__(self, task_ref_name: str, join_on: List[str] = None, join_on_script: str = None) -> Self:
super().__init__(
task_reference_name=task_ref_name,
task_type=TaskType.JOIN
)
self._join_on = deepcopy(join_on)
if join_on_script is not None:
self.evaluator_type = 'js'
self.expression = join_on_script

def to_workflow_task(self) -> WorkflowTask:
workflow = super().to_workflow_task()
Expand Down
22 changes: 21 additions & 1 deletion src/conductor/client/workflow/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def __init__(self,
self.input_parameters = input_parameters
self._cache_key = cache_key
self._cache_ttl_second = cache_ttl_second
self._expression = None
self._evaluator_type = None

@property
def task_reference_name(self) -> str:
Expand Down Expand Up @@ -65,6 +67,22 @@ def name(self, name: str) -> None:
raise Exception('invalid type')
self._name = name

@property
def expression(self) -> str:
return self._expression

@expression.setter
def expression(self, expression: str) -> None:
self._expression = expression

@property
def evaluator_type(self) -> str:
return self._evaluator_type

@evaluator_type.setter
def evaluator_type(self, evaluator_type: str) -> None:
self._evaluator_type = evaluator_type

def cache(self, cache_key: str, cache_ttl_second: int):
self._cache_key = cache_key
self._cache_ttl_second = cache_ttl_second
Expand Down Expand Up @@ -122,7 +140,9 @@ def to_workflow_task(self) -> WorkflowTask:
description=self._description,
input_parameters=self._input_parameters,
optional=self._optional,
cache_config=cache_config
cache_config=cache_config,
expression=self._expression,
evaluator_type=self._evaluator_type
)

def output(self, json_path: str = None) -> str:
Expand Down
Loading