Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Dec 3, 2023
1 parent a6fcab0 commit bc7dd82
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/conductor/client/orkes/orkes_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def rerun_workflow(self, workflow_id: str, rerun_workflow_request: RerunWorkflow
self.workflowResourceApi.rerun(rerun_workflow_request, workflow_id)

def retry_workflow(self, workflow_id: str, resume_subworkflow_tasks: Optional[bool] = False):
self.workflowResourceApi.retry1(workflow_id, resume_subworkflow_tasks=resume_subworkflow_tasks)
self.workflowResourceApi.retry(workflow_id, resume_subworkflow_tasks=resume_subworkflow_tasks)

def terminate_workflow(self, workflow_id: str, reason: Optional[str] = None):
kwargs = {"reason": reason} if reason else {}
Expand Down
7 changes: 5 additions & 2 deletions tests/integration/metadata/test_workflow_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_kitchensink_workflow_registration(workflow_executor: WorkflowExecutor)
)
if type(workflow_id) != str or workflow_id == '':
raise Exception(f'failed to start workflow, name: {WORKFLOW_NAME}')

workflow_executor.terminate(workflow_id=workflow_id, reason="End test")


Expand Down Expand Up @@ -116,13 +116,15 @@ def generate_do_while_task() -> LoopTask:
tasks=generate_switch_task(),
)


def generate_do_while_task_multiple() -> LoopTask:
return LoopTask(
task_ref_name="loop_until_success_multiple",
iterations=1,
tasks=[generate_simple_task(i) for i in range(13, 14)],
)


def generate_fork_task(workflow_executor: WorkflowExecutor) -> ForkTask:
return ForkTask(
task_ref_name='forked',
Expand All @@ -136,6 +138,7 @@ def generate_fork_task(workflow_executor: WorkflowExecutor) -> ForkTask:
]
)


def generate_join_task(workflow_executor: WorkflowExecutor, fork_task: ForkTask) -> JoinTask:
return JoinTask(
task_ref_name='join_forked',
Expand Down Expand Up @@ -205,7 +208,7 @@ def generate_sub_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkf

def generate_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
fork_task = generate_fork_task(workflow_executor)

workflow = ConductorWorkflow(
executor=workflow_executor,
name='test-python-sdk-workflow-as-code',
Expand Down
54 changes: 29 additions & 25 deletions tests/unit/orkes/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json

from unittest.mock import Mock, patch, MagicMock

from conductor.client.http.models import SkipTaskRequest
from conductor.client.http.rest import ApiException
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
Expand All @@ -18,15 +20,16 @@
WORKFLOW_NAME = 'ut_wf'
WORKFLOW_UUID = 'ut_wf_uuid'
TASK_NAME = 'ut_task'
CORRELATION_ID= 'correlation_id'
CORRELATION_ID = 'correlation_id'


class TestOrkesWorkflowClient(unittest.TestCase):

@classmethod
def setUpClass(cls):
configuration = Configuration("http://localhost:8080/api")
cls.workflow_client = OrkesWorkflowClient(configuration)

def setUp(self):
self.input = {"a": "test"}
logging.disable(logging.CRITICAL)
Expand All @@ -44,7 +47,7 @@ def test_startWorkflowByName(self, mock):
wfId = self.workflow_client.startWorkflowByName(WORKFLOW_NAME, self.input)
mock.assert_called_with(self.input, WORKFLOW_NAME)
self.assertEqual(wfId, WORKFLOW_UUID)

@patch.object(WorkflowResourceApi, 'start_workflow1')
def test_startWorkflowByName_with_version(self, mock):
mock.return_value = WORKFLOW_UUID
Expand All @@ -58,22 +61,22 @@ def test_startWorkflowByName_with_correlation_id(self, mock):
wfId = self.workflow_client.startWorkflowByName(WORKFLOW_NAME, self.input, correlationId=CORRELATION_ID)
mock.assert_called_with(self.input, WORKFLOW_NAME, correlation_id=CORRELATION_ID)
self.assertEqual(wfId, WORKFLOW_UUID)

@patch.object(WorkflowResourceApi, 'start_workflow1')
def test_startWorkflowByName_with_version_and_priority(self, mock):
mock.return_value = WORKFLOW_UUID
wfId = self.workflow_client.startWorkflowByName(WORKFLOW_NAME, self.input, version=1, priority=1)
mock.assert_called_with(self.input, WORKFLOW_NAME, version=1, priority=1)
self.assertEqual(wfId, WORKFLOW_UUID)

@patch.object(WorkflowResourceApi, 'start_workflow')
def test_startWorkflow(self, mock):
mock.return_value = WORKFLOW_UUID
startWorkflowReq = StartWorkflowRequest()
wfId = self.workflow_client.start_workflow(startWorkflowReq)
mock.assert_called_with(startWorkflowReq)
self.assertEqual(wfId, WORKFLOW_UUID)

@patch.object(WorkflowResourceApi, 'execute_workflow')
def test_executeWorkflow(self, mock):
expectedWfRun = WorkflowRun()
Expand All @@ -82,25 +85,25 @@ def test_executeWorkflow(self, mock):
workflowRun = self.workflow_client.execute_workflow(
startWorkflowReq, "request_id", WORKFLOW_NAME, 1
)
mock.assert_called_with(startWorkflowReq,"request_id", WORKFLOW_NAME, 1)
mock.assert_called_with(startWorkflowReq, "request_id", WORKFLOW_NAME, 1)
self.assertEqual(workflowRun, expectedWfRun)

@patch.object(WorkflowResourceApi, 'pause_workflow1')
@patch.object(WorkflowResourceApi, 'pause_workflow')
def test_pauseWorkflow(self, mock):
self.workflow_client.pause_workflow(WORKFLOW_UUID)
mock.assert_called_with(WORKFLOW_UUID)
@patch.object(WorkflowResourceApi, 'resume_workflow1')

@patch.object(WorkflowResourceApi, 'resume_workflow')
def test_resumeWorkflow(self, mock):
self.workflow_client.resume_workflow(WORKFLOW_UUID)
mock.assert_called_with(WORKFLOW_UUID)
@patch.object(WorkflowResourceApi, 'restart1')

@patch.object(WorkflowResourceApi, 'restart')
def test_restartWorkflow(self, mock):
self.workflow_client.restart_workflow(WORKFLOW_UUID)
mock.assert_called_with(WORKFLOW_UUID, use_latest_definitions=False)
@patch.object(WorkflowResourceApi, 'restart1')

@patch.object(WorkflowResourceApi, 'restart')
def test_restartWorkflow_with_latest_wfDef(self, mock):
self.workflow_client.restart_workflow(WORKFLOW_UUID, True)
mock.assert_called_with(WORKFLOW_UUID, use_latest_definitions=True)
Expand All @@ -110,13 +113,13 @@ def test_rerunWorkflow(self, mock):
reRunReq = RerunWorkflowRequest()
self.workflow_client.rerun_workflow(WORKFLOW_UUID, reRunReq)
mock.assert_called_with(reRunReq, WORKFLOW_UUID)
@patch.object(WorkflowResourceApi, 'retry1')

@patch.object(WorkflowResourceApi, 'retry')
def test_retryWorkflow(self, mock):
self.workflow_client.retry_workflow(WORKFLOW_UUID)
mock.assert_called_with(WORKFLOW_UUID, resume_subworkflow_tasks=False)

@patch.object(WorkflowResourceApi, 'retry1')
@patch.object(WorkflowResourceApi, 'retry')
def test_retryWorkflow_with_resumeSubworkflowTasks(self, mock):
self.workflow_client.retry_workflow(WORKFLOW_UUID, True)
mock.assert_called_with(WORKFLOW_UUID, resume_subworkflow_tasks=True)
Expand All @@ -125,13 +128,13 @@ def test_retryWorkflow_with_resumeSubworkflowTasks(self, mock):
def test_terminateWorkflow(self, mock):
self.workflow_client.terminate_workflow(WORKFLOW_UUID)
mock.assert_called_with(WORKFLOW_UUID)

@patch.object(WorkflowResourceApi, 'terminate1')
def test_terminateWorkflow_with_reason(self, mock):
reason = "Unit test failed"
self.workflow_client.terminate_workflow(WORKFLOW_UUID, reason)
mock.assert_called_with(WORKFLOW_UUID, reason=reason)

@patch.object(WorkflowResourceApi, 'get_execution_status')
def test_getWorkflow(self, mock):
mock.return_value = Workflow(workflow_id=WORKFLOW_UUID)
Expand All @@ -148,7 +151,7 @@ def test_getWorkflow_without_tasks(self, mock):

@patch.object(WorkflowResourceApi, 'get_execution_status')
def test_getWorkflow_non_existent(self, mock):
error_body = { 'status': 404, 'message': 'Workflow not found' }
error_body = {'status': 404, 'message': 'Workflow not found'}
mock.side_effect = MagicMock(side_effect=ApiException(status=404, body=json.dumps(error_body)))
with self.assertRaises(APIError):
self.workflow_client.get_workflow(WORKFLOW_UUID, False)
Expand All @@ -167,9 +170,10 @@ def test_deleteWorkflow_without_archival(self, mock):
@patch.object(WorkflowResourceApi, 'skip_task_from_workflow')
def test_skipTaskFromWorkflow(self, mock):
taskRefName = TASK_NAME + "_ref"
workflow = self.workflow_client.skip_task_from_workflow(WORKFLOW_UUID, taskRefName)
mock.assert_called_with(WORKFLOW_UUID, taskRefName)

request = SkipTaskRequest()
workflow = self.workflow_client.skip_task_from_workflow(WORKFLOW_UUID, taskRefName, request)
mock.assert_called_with(WORKFLOW_UUID, taskRefName, request)

@patch.object(WorkflowResourceApi, 'test_workflow')
def test_testWorkflow(self, mock):
mock.return_value = Workflow(workflow_id=WORKFLOW_UUID)
Expand All @@ -179,4 +183,4 @@ def test_testWorkflow(self, mock):
)
workflow = self.workflow_client.test_workflow(testRequest)
mock.assert_called_with(testRequest)
self.assertEqual(workflow.workflow_id, WORKFLOW_UUID)
self.assertEqual(workflow.workflow_id, WORKFLOW_UUID)

0 comments on commit bc7dd82

Please sign in to comment.