-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
112 lines (91 loc) · 3.99 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import sys
sys.path.insert(1, '../')
import uuid
import time
import logging
from examples.workflow.workflow_input import WorkflowInput
from examples.workflow.workflow_input import NotificationPreference
from examples.worker import worker_util
from examples.api import api_util
from conductor.client.workflow.task.task import TaskInterface
from conductor.client.workflow.task.switch_task import SwitchTask
from conductor.client.workflow.task.simple_task import SimpleTask
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
logging.disable(level=logging.DEBUG)
def decision_task() -> TaskInterface:
task = SwitchTask('emailorsms', '${workflow.input.notificationPref}')
task.switch_case(
NotificationPreference.EMAIL,
SimpleTask('send_email', 'send_email').input(
'email', '${get_user_info.output.email}')
)
task.switch_case(
NotificationPreference.SMS,
SimpleTask('send_sms', 'send_sms').input(
'phoneNumber', '${get_user_info.output.phoneNumber}')
)
return task
def main():
task_handler = worker_util.start_workers()
workflow_executor = WorkflowExecutor(api_util.get_configuration())
workflow = ConductorWorkflow(
executor=workflow_executor,
name='user_notification',
version=1,
)
workflow.input_parameters = ['userId', 'notificationPref']
simple_task = SimpleTask('get_user_info', 'get_user_info').input(
'userId', '${workflow.input.userId}')
workflow.add(simple_task)
workflow >> decision_task() # you can also use >> operator
workflow.register(overwrite=True) # register the workflow with the server
workflow_input = WorkflowInput('userA')
# Execute workflow synchronously, the call will wait until the workflow completes
start_workflow_sync(workflow_executor, workflow, workflow_input)
# Start async execution, returns the id of the workflow
start_workflow_async(workflow, workflow_input)
task_handler.stop_processes()
def start_workflow_sync(workflow_executor: WorkflowExecutor, workflow: ConductorWorkflow, workflow_input) -> None:
workflow_run = workflow_executor.workflow_client.execute_workflow(
body=StartWorkflowRequest(
name=workflow.name,
version=workflow.version
),
request_id=str(uuid.uuid4()),
version=workflow.version,
name=workflow.name,
wait_until_task_ref='',
_request_timeout= 60
)
print()
print('=======================================================================================')
print('Workflow Execution Completed')
print(f'Workflow Id: {workflow_run.workflow_id}')
print(f'Workflow Status: {workflow_run.status}')
print(f'Workflow Output: {str(workflow_run.output)}')
print(
f'Workflow Execution Flow UI: {api_util.get_workflow_execution_url(workflow_run.workflow_id)}')
print('=======================================================================================')
def start_workflow_async(workflow: ConductorWorkflow, workflow_input) -> None:
workflow_id = workflow.start_workflow(
StartWorkflowRequest(
name=workflow.name,
version=workflow.version,
input={
'userId': workflow_input.user_id,
'notificationPref': workflow_input.notification_pref
}
)
)
execution_url = api_util.get_workflow_execution_url(workflow_id)
time.sleep(4)
print()
print('=======================================================================================')
print('Workflow Execution Completed')
print(f'Workflow Id: {workflow_id}')
print(f'Workflow Execution Flow UI: {execution_url}')
print('=======================================================================================')
if __name__ == '__main__':
main()