Skip to content

Commit

Permalink
Merge pull request #154 from orkes-io/workflow_state_update_api
Browse files Browse the repository at this point in the history
Workflow state update api
  • Loading branch information
v1r3n authored Jan 29, 2024
2 parents 253581c + 80ca9b3 commit ad5b2d4
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 43 deletions.
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ext {
versions = [
awaitility : '4.2.0',
commonsLang : '3.12.0',
conductor : '3.9.21-orkes',
conductor : '3.9.24-orkes',
jackson : '2.11.4!!',
junit : '5.9.0',
slf4j : '1.7.36',
Expand All @@ -47,7 +47,9 @@ dependencies {

api ("io.orkes.conductor:conductor-common:${versions.conductor}")
api ("io.orkes.conductor:conductor-grpc:${versions.conductor}")
api ("io.orkes.conductor:conductor-java-sdk:${versions.conductor}")
api ("io.orkes.conductor:conductor-java-sdk:${versions.conductor}") {
exclude group: 'com.netflix.conductor'
}
implementation 'javax.annotation:javax.annotation-api:1.3.2'

implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:${versions.jackson}"
Expand Down Expand Up @@ -98,7 +100,6 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}!!"
implementation "org.apache.logging.log4j:log4j-jul:${versions.log4j}!!"
implementation "org.apache.logging.log4j:log4j-web:${versions.log4j}!!"
//implementation "org.apache.logging.log4j:log4j-to-slf4j:${versions.log4j}!!"

//spring
implementation "org.springframework:spring-context:5.3.24"
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/orkes/conductor/client/WorkflowClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.orkes.conductor.client.http.ApiException;
import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest;
import io.orkes.conductor.client.model.WorkflowStateUpdate;
import io.orkes.conductor.client.model.WorkflowStatus;
import io.orkes.conductor.common.model.WorkflowRun;

Expand Down Expand Up @@ -96,4 +97,18 @@ public abstract Map<String, List<Workflow>> getWorkflowsByNamesAndCorrelationIds
public abstract void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest);

public abstract void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest body);

/**
*
* Update a runningw workflow by updating its variables or one of the scheduled task identified by task reference name
* @param workflowId Id of the workflow to be updated
* @param waitUntilTaskRefNames List of task reference names to wait for. The api call will wait for ANY of these tasks to be availble in workflow.
* @param waitForSeconds Maximum time to wait for. If the workflow does not complete or reach one of the tasks listed in waitUntilTaskRefNames by this time,
* the call will return with the current status of the workflow
* @param updateRequest Payload for updating state of workflow.
*
* @return
*/
public abstract WorkflowRun updateWorkflow(String workflowId, List<String> waitUntilTaskRefNames, Integer waitForSeconds,
WorkflowStateUpdate updateRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.orkes.conductor.client.http.api.WorkflowResourceApi;
import io.orkes.conductor.client.model.CorrelationIdsSearchRequest;
import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest;
import io.orkes.conductor.client.model.WorkflowStateUpdate;
import io.orkes.conductor.client.model.WorkflowStatus;
import io.orkes.conductor.common.model.WorkflowRun;

Expand Down Expand Up @@ -352,6 +353,15 @@ public void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest upg
httpClient.upgradeRunningWorkflow(upgradeWorkflowRequest, workflowId);
}

@Override
public WorkflowRun updateWorkflow(String workflowId, List<String> waitUntilTaskRefNames, Integer waitForSeconds, WorkflowStateUpdate updateRequest) {
String joinedReferenceNames = "";
if (waitUntilTaskRefNames != null) {
joinedReferenceNames = String.join(",", waitUntilTaskRefNames);
}
return httpClient.updateWorkflowState(updateRequest, UUID.randomUUID().toString(), workflowId, joinedReferenceNames, waitForSeconds);
}

@Override
public void close() {
shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.orkes.conductor.common.model.WorkflowRun;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.reflect.TypeToken;
import com.squareup.okhttp.Call;

public class WorkflowResourceApi {
Expand Down Expand Up @@ -4012,4 +4013,106 @@ public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Ch
return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener);
}



/**
* Update workflow and task status
* Updates the workflow variables, tasks and triggers evaluation.
* @param updateRequest (required)
* @param requestId (required)
* @param workflowId (required)
* @param waitUntilTaskRef (optional)
* @param waitForSeconds (optional, default to 10)
* @return WorkflowRun
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
*/
public WorkflowRun updateWorkflowState(WorkflowStateUpdate updateRequest, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds) throws ApiException {
ApiResponse<WorkflowRun> resp = updateWorkflowAndTaskStateWithHttpInfo(updateRequest, requestId, workflowId, waitUntilTaskRef, waitForSeconds);
return resp.getData();
}

/**
* Update workflow and task status
* Updates the workflow variables, tasks and triggers evaluation.
* @param body (required)
* @param requestId (required)
* @param workflowId (required)
* @param waitUntilTaskRef (optional)
* @param waitForSeconds (optional, default to 10)
* @return ApiResponse&lt;WorkflowRun&gt;
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
*/
public ApiResponse<WorkflowRun> updateWorkflowAndTaskStateWithHttpInfo(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds) throws ApiException {
com.squareup.okhttp.Call call = updateWorkflowAndTaskStateValidateBeforeCall(body, requestId, workflowId, waitUntilTaskRef, waitForSeconds, null, null);
Type localVarReturnType = new TypeToken<WorkflowRun>(){}.getType();
return apiClient.execute(call, localVarReturnType);
}


public com.squareup.okhttp.Call updateWorkflowAndTaskStateCall(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
Object localVarPostBody = body;

// create path and map variables
String localVarPath = "/workflow/{workflowId}/state"
.replaceAll("\\{" + "workflowId" + "\\}", apiClient.escapeString(workflowId.toString()));

List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
if (requestId != null)
localVarQueryParams.addAll(apiClient.parameterToPair("requestId", requestId));
if (waitUntilTaskRef != null)
localVarQueryParams.addAll(apiClient.parameterToPair("waitUntilTaskRef", waitUntilTaskRef));
if (waitForSeconds != null)
localVarQueryParams.addAll(apiClient.parameterToPair("waitForSeconds", waitForSeconds));

Map<String, String> localVarHeaderParams = new HashMap<String, String>();

Map<String, Object> localVarFormParams = new HashMap<String, Object>();

final String[] localVarAccepts = {
"*/*"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept);

final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
localVarHeaderParams.put("Content-Type", localVarContentType);

if(progressListener != null) {
apiClient.getHttpClient().networkInterceptors().add(new com.squareup.okhttp.Interceptor() {
@Override
public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Chain chain) throws IOException {
com.squareup.okhttp.Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new ProgressResponseBody(originalResponse.body(), progressListener))
.build();
}
});
}

String[] localVarAuthNames = new String[] { "api_key" };
return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call updateWorkflowAndTaskStateValidateBeforeCall(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
// verify the required parameter 'body' is set
if (body == null) {
throw new ApiException("Missing the required parameter 'body' when calling updateWorkflowAndTaskState(Async)");
}
// verify the required parameter 'requestId' is set
if (requestId == null) {
throw new ApiException("Missing the required parameter 'requestId' when calling updateWorkflowAndTaskState(Async)");
}
// verify the required parameter 'workflowId' is set
if (workflowId == null) {
throw new ApiException("Missing the required parameter 'workflowId' when calling updateWorkflowAndTaskState(Async)");
}

com.squareup.okhttp.Call call = updateWorkflowAndTaskStateCall(body, requestId, workflowId, waitUntilTaskRef, waitForSeconds, progressListener, progressRequestListener);
return call;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 Orkes, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.client.model;

import java.util.Map;

import com.netflix.conductor.common.metadata.tasks.TaskResult;

import lombok.Data;

@Data
public class WorkflowStateUpdate {
private String taskReferenceName;
private Map<String, Object> variables;
private TaskResult taskResult;
}
40 changes: 0 additions & 40 deletions src/test/java/io/orkes/conductor/client/api/EventClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
package io.orkes.conductor.client.api;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Test;

import com.netflix.conductor.common.metadata.events.EventHandler;
Expand All @@ -25,17 +22,10 @@

import io.orkes.conductor.client.EventClient;
import io.orkes.conductor.client.http.ApiException;
import io.orkes.conductor.client.model.event.QueueConfiguration;
import io.orkes.conductor.client.model.event.QueueWorkerConfiguration;
import io.orkes.conductor.client.model.event.kafka.KafkaConfiguration;
import io.orkes.conductor.client.model.event.kafka.KafkaConsumer;
import io.orkes.conductor.client.model.event.kafka.KafkaProducer;
import io.orkes.conductor.client.util.Commons;

import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class EventClientTests extends ClientTest {
private static final String EVENT_NAME = "test_sdk_java_event_name";
Expand Down Expand Up @@ -73,39 +63,9 @@ void testEventHandler() {
assertIterableEquals(List.of(), eventClient.getEventHandlers(EVENT, false));
}

@Test
void testKafkaQueueConfiguration() throws Exception {
QueueConfiguration queueConfiguration = getQueueConfiguration();
eventClient.deleteQueueConfig(queueConfiguration);
assertThrows(
ApiException.class,
() -> {
eventClient.getQueueConfig(queueConfiguration);
});
eventClient.putQueueConfig(queueConfiguration);
Map<String, Object> configurationResponse = eventClient.getQueueConfig(queueConfiguration);
assertTrue(configurationResponse.containsKey("consumer"));
assertTrue(configurationResponse.containsKey("producer"));
eventClient.deleteQueueConfig(queueConfiguration);
}

QueueConfiguration getQueueConfiguration() throws Exception {
return new KafkaConfiguration(KAFKA_QUEUE_TOPIC_NAME)
.withConsumer(getKafkaConsumer())
.withProducer(getKafkaProducer());
}

QueueWorkerConfiguration getKafkaConsumer() throws Exception {
return new KafkaConsumer(KAFKA_BOOTSTRAP_SERVERS_CONFIG)
// 1 second, instead of default 2 seconds
.withConfiguration(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
}

QueueWorkerConfiguration getKafkaProducer() throws Exception {
return new KafkaProducer(KAFKA_BOOTSTRAP_SERVERS_CONFIG)
// send messages in chunks of 1024 bytes, instead of default every new data
.withConfiguration(ProducerConfig.BATCH_SIZE_CONFIG, "1024");
}

EventHandler getEventHandler() {
EventHandler eventHandler = new EventHandler();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 Orkes, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.client.api;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.run.Workflow;

import io.orkes.conductor.client.WorkflowClient;
import io.orkes.conductor.client.model.WorkflowStateUpdate;
import io.orkes.conductor.common.model.WorkflowRun;

import lombok.SneakyThrows;

import static org.junit.Assert.assertEquals;

public class WorkflowStateUpdateTests extends ClientTest {

private static WorkflowClient workflowClient;

@BeforeAll
public static void init() {
workflowClient = orkesClients.getWorkflowClient();
}

@SneakyThrows
public String startWorkflow() {
StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest();
startWorkflowRequest.setName("sync_task_variable_updates");
startWorkflowRequest.setVersion(1);
var run = workflowClient.executeWorkflow(startWorkflowRequest, "wait_task_ref");
return run.get(10, TimeUnit.SECONDS)
.getWorkflowId();
}

@Test
public void test() {
String workflowId = startWorkflow();
System.out.println(workflowId);

TaskResult taskResult = new TaskResult();
taskResult.setOutputData(Map.of("a", "b"));

WorkflowStateUpdate request = new WorkflowStateUpdate();
request.setTaskReferenceName("wait_task_ref");
request.setTaskResult(taskResult);

request.setVariables(Map.of("case", "case1"));

WorkflowRun workflowRun = workflowClient.updateWorkflow(workflowId, List.of("wait_task_ref_1", "wait_task_ref_2"), 10, request);

System.out.println(workflowRun);
System.out.println(workflowRun.getStatus());
System.out.println(workflowRun.getTasks()
.stream()
.map(task -> task.getReferenceTaskName() + ":" + task.getStatus())
.collect(Collectors.toList()));

request = new WorkflowStateUpdate();
request.setTaskReferenceName("wait_task_ref_2");
request.setTaskResult(taskResult);
workflowRun = workflowClient.updateWorkflow(workflowId, List.of(), 10, request);

assertEquals(Workflow.WorkflowStatus.COMPLETED, workflowRun.getStatus());
Set<Task.Status> allTaskStatus = workflowRun.getTasks()
.stream()
.map(t -> t.getStatus())
.collect(Collectors.toSet());
assertEquals(1, allTaskStatus.size());
assertEquals(Task.Status.COMPLETED, allTaskStatus.iterator().next());

System.out.println(workflowRun.getStatus());
System.out.println(workflowRun.getTasks()
.stream()
.map(task -> task.getReferenceTaskName() + ":" + task.getStatus())
.collect(Collectors.toList()));

}
}

0 comments on commit ad5b2d4

Please sign in to comment.