From 928403310e957f93254c90386ae3531abc060a8e Mon Sep 17 00:00:00 2001 From: boneys Date: Mon, 25 Dec 2023 23:10:20 -0800 Subject: [PATCH 01/11] Add an API to fetch all event handlers --- src/main/java/io/orkes/conductor/client/EventClient.java | 5 +++++ .../io/orkes/conductor/client/http/OrkesEventClient.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/src/main/java/io/orkes/conductor/client/EventClient.java b/src/main/java/io/orkes/conductor/client/EventClient.java index a7772b52..11394b24 100644 --- a/src/main/java/io/orkes/conductor/client/EventClient.java +++ b/src/main/java/io/orkes/conductor/client/EventClient.java @@ -14,8 +14,10 @@ +import java.util.List; import java.util.Map; +import com.netflix.conductor.common.metadata.events.EventHandler; import io.orkes.conductor.client.model.event.QueueConfiguration; public abstract class EventClient extends com.netflix.conductor.client.http.EventClient { @@ -24,4 +26,7 @@ public abstract class EventClient extends com.netflix.conductor.client.http.Even public abstract void deleteQueueConfig(QueueConfiguration queueConfiguration); public abstract void putQueueConfig(QueueConfiguration queueConfiguration) throws Exception; + + public abstract List getEventHandlers(); + } diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java index 926792ca..8b6fc892 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java @@ -67,6 +67,11 @@ public List getEventHandlers(String event, boolean activeOnly) { return eventResourceApi.getEventHandlersForEvent(event, activeOnly); } + @Override + public List getEventHandlers() { + return eventResourceApi.getEventHandlers(); + } + @Override public void unregisterEventHandler(String name) { eventResourceApi.removeEventHandlerStatus(name); From b9be26b9cb53f58cee1a719ca7f1d64fbe5a4c78 Mon Sep 17 00:00:00 2001 From: boneys Date: Tue, 26 Dec 2023 19:35:36 -0800 Subject: [PATCH 02/11] Add APIs for moving event handlers to workers --- gradle.properties | 2 +- .../orkes/conductor/client/EventClient.java | 2 + .../client/http/OrkesEventClient.java | 5 + .../client/http/api/EventResourceApi.java | 214 ++++++++++++------ 4 files changed, 157 insertions(+), 66 deletions(-) diff --git a/gradle.properties b/gradle.properties index e997a9af..077fe085 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=2.0.0 \ No newline at end of file +version=2.0.9 \ No newline at end of file diff --git a/src/main/java/io/orkes/conductor/client/EventClient.java b/src/main/java/io/orkes/conductor/client/EventClient.java index 11394b24..a69fdf9a 100644 --- a/src/main/java/io/orkes/conductor/client/EventClient.java +++ b/src/main/java/io/orkes/conductor/client/EventClient.java @@ -29,4 +29,6 @@ public abstract class EventClient extends com.netflix.conductor.client.http.Even public abstract List getEventHandlers(); + public abstract void handleIncomingEvent(Map payload); + } diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java index 8b6fc892..dd187ecb 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java @@ -72,6 +72,11 @@ public List getEventHandlers() { return eventResourceApi.getEventHandlers(); } + @Override + public void handleIncomingEvent(Map payload) { + this.eventResourceApi.handleIncomingEvent(payload); + } + @Override public void unregisterEventHandler(String name) { eventResourceApi.removeEventHandlerStatus(name); diff --git a/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java b/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java index 1219e916..f65e7d07 100644 --- a/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java +++ b/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java @@ -12,6 +12,16 @@ */ package io.orkes.conductor.client.http.api; +import com.fasterxml.jackson.core.type.TypeReference; +import com.netflix.conductor.common.metadata.events.EventHandler; +import io.orkes.conductor.client.ApiClient; +import io.orkes.conductor.client.http.ApiException; +import io.orkes.conductor.client.http.ApiResponse; +import io.orkes.conductor.client.http.Configuration; +import io.orkes.conductor.client.http.Pair; +import io.orkes.conductor.client.http.ProgressRequestBody; +import io.orkes.conductor.client.http.ProgressResponseBody; + import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; @@ -19,13 +29,6 @@ import java.util.List; import java.util.Map; -import com.netflix.conductor.common.metadata.events.EventHandler; - -import io.orkes.conductor.client.ApiClient; -import io.orkes.conductor.client.http.*; - -import com.fasterxml.jackson.core.type.TypeReference; - public class EventResourceApi { private ApiClient apiClient; @@ -48,8 +51,8 @@ public void setApiClient(ApiClient apiClient) { /** * Build call for addEventHandler * - * @param eventHandler (required) - * @param progressListener Progress listener + * @param eventHandler (required) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -103,7 +106,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "POST", @@ -137,7 +140,7 @@ private com.squareup.okhttp.Call addEventHandlerValidateBeforeCall( * * @param eventHandler (required) * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public void addEventHandler(EventHandler eventHandler) throws ApiException { addEventHandlerWithHttpInfo(eventHandler); @@ -149,7 +152,7 @@ public void addEventHandler(EventHandler eventHandler) throws ApiException { * @param eventHandler (required) * @return ApiResponse<Void> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse addEventHandlerWithHttpInfo(EventHandler eventHandler) throws ApiException { @@ -160,9 +163,9 @@ private ApiResponse addEventHandlerWithHttpInfo(EventHandler eventHandler) /** * Build call for deleteQueueConfig * - * @param queueType (required) - * @param queueName (required) - * @param progressListener Progress listener + * @param queueType (required) + * @param queueName (required) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -224,7 +227,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "DELETE", @@ -267,7 +270,7 @@ private com.squareup.okhttp.Call deleteQueueConfigValidateBeforeCall( * @param queueName (required) * @return Object * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public Object deleteQueueConfig(String queueType, String queueName) throws ApiException { ApiResponse resp = deleteQueueConfigWithHttpInfo(queueType, queueName); @@ -281,20 +284,21 @@ public Object deleteQueueConfig(String queueType, String queueName) throws ApiEx * @param queueName (required) * @return ApiResponse<Object> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse deleteQueueConfigWithHttpInfo(String queueType, String queueName) throws ApiException { com.squareup.okhttp.Call call = deleteQueueConfigValidateBeforeCall(queueType, queueName, null, null); - Type localVarReturnType = new TypeReference() {}.getType(); + Type localVarReturnType = new TypeReference() { + }.getType(); return apiClient.execute(call, localVarReturnType); } /** * Build call for getEventHandlers * - * @param progressListener Progress listener + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -347,7 +351,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "GET", @@ -375,7 +379,7 @@ private com.squareup.okhttp.Call getEventHandlersValidateBeforeCall( * * @return List<EventHandler> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public List getEventHandlers() throws ApiException { ApiResponse> resp = getEventHandlersWithHttpInfo(); @@ -387,20 +391,21 @@ public List getEventHandlers() throws ApiException { * * @return ApiResponse<List<EventHandler>> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse> getEventHandlersWithHttpInfo() throws ApiException { com.squareup.okhttp.Call call = getEventHandlersValidateBeforeCall(null, null); - Type localVarReturnType = new TypeReference>() {}.getType(); + Type localVarReturnType = new TypeReference>() { + }.getType(); return apiClient.execute(call, localVarReturnType); } /** * Build call for getEventHandlersForEvent * - * @param event (required) - * @param activeOnly (optional, default to true) - * @param progressListener Progress listener + * @param event (required) + * @param activeOnly (optional, default to true) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -460,7 +465,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "GET", @@ -494,11 +499,11 @@ private com.squareup.okhttp.Call getEventHandlersForEventValidateBeforeCall( /** * Get event handlers for a given event * - * @param event (required) + * @param event (required) * @param activeOnly (optional, default to true) * @return List<EventHandler> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public List getEventHandlersForEvent(String event, Boolean activeOnly) throws ApiException { @@ -510,26 +515,27 @@ public List getEventHandlersForEvent(String event, Boolean activeO /** * Get event handlers for a given event * - * @param event (required) + * @param event (required) * @param activeOnly (optional, default to true) * @return ApiResponse<List<EventHandler>> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse> getEventHandlersForEventWithHttpInfo( String event, Boolean activeOnly) throws ApiException { com.squareup.okhttp.Call call = getEventHandlersForEventValidateBeforeCall(event, activeOnly, null, null); - Type localVarReturnType = new TypeReference>() {}.getType(); + Type localVarReturnType = new TypeReference>() { + }.getType(); return apiClient.execute(call, localVarReturnType); } /** * Build call for getQueueConfig * - * @param queueType (required) - * @param queueName (required) - * @param progressListener Progress listener + * @param queueType (required) + * @param queueName (required) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -591,7 +597,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "GET", @@ -633,7 +639,7 @@ private com.squareup.okhttp.Call getQueueConfigValidateBeforeCall( * @param queueName (required) * @return Object * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public Map getQueueConfig(String queueType, String queueName) throws ApiException { ApiResponse> resp = getQueueConfigWithHttpInfo(queueType, queueName); @@ -647,20 +653,21 @@ public Map getQueueConfig(String queueType, String queueName) th * @param queueName (required) * @return ApiResponse<Object> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse> getQueueConfigWithHttpInfo(String queueType, String queueName) throws ApiException { com.squareup.okhttp.Call call = getQueueConfigValidateBeforeCall(queueType, queueName, null, null); - Type localVarReturnType = new TypeReference>() {}.getType(); + Type localVarReturnType = new TypeReference>() { + }.getType(); return apiClient.execute(call, localVarReturnType); } /** * Build call for getQueueNames * - * @param progressListener Progress listener + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -713,7 +720,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "GET", @@ -741,7 +748,7 @@ private com.squareup.okhttp.Call getQueueNamesValidateBeforeCall( * * @return Object * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public Object getQueueNames() throws ApiException { ApiResponse resp = getQueueNamesWithHttpInfo(); @@ -753,21 +760,22 @@ public Object getQueueNames() throws ApiException { * * @return ApiResponse<Object> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse getQueueNamesWithHttpInfo() throws ApiException { com.squareup.okhttp.Call call = getQueueNamesValidateBeforeCall(null, null); - Type localVarReturnType = new TypeReference() {}.getType(); + Type localVarReturnType = new TypeReference() { + }.getType(); return apiClient.execute(call, localVarReturnType); } /** * Build call for putQueueConfig * - * @param body (required) - * @param queueType (required) - * @param queueName (required) - * @param progressListener Progress listener + * @param body (required) + * @param queueType (required) + * @param queueName (required) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -829,7 +837,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "PUT", @@ -874,12 +882,12 @@ private com.squareup.okhttp.Call putQueueConfigValidateBeforeCall( /** * Create or update queue config by name * - * @param body (required) + * @param body (required) * @param queueType (required) * @param queueName (required) * @return Object * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public Object putQueueConfig(String body, String queueType, String queueName) throws ApiException { @@ -890,26 +898,27 @@ public Object putQueueConfig(String body, String queueType, String queueName) /** * Create or update queue config by name * - * @param body (required) + * @param body (required) * @param queueType (required) * @param queueName (required) * @return ApiResponse<Object> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse putQueueConfigWithHttpInfo( String body, String queueType, String queueName) throws ApiException { com.squareup.okhttp.Call call = putQueueConfigValidateBeforeCall(body, queueType, queueName, null, null); - Type localVarReturnType = new TypeReference() {}.getType(); + Type localVarReturnType = new TypeReference() { + }.getType(); return apiClient.execute(call, localVarReturnType); } /** * Build call for removeEventHandlerStatus * - * @param name (required) - * @param progressListener Progress listener + * @param name (required) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -967,7 +976,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "DELETE", @@ -1001,7 +1010,7 @@ private com.squareup.okhttp.Call removeEventHandlerStatusValidateBeforeCall( * * @param name (required) * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public void removeEventHandlerStatus(String name) throws ApiException { removeEventHandlerStatusWithHttpInfo(name); @@ -1013,7 +1022,7 @@ public void removeEventHandlerStatus(String name) throws ApiException { * @param name (required) * @return ApiResponse<Void> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse removeEventHandlerStatusWithHttpInfo(String name) throws ApiException { @@ -1025,8 +1034,8 @@ private ApiResponse removeEventHandlerStatusWithHttpInfo(String name) /** * Build call for updateEventHandler * - * @param eventHandler (required) - * @param progressListener Progress listener + * @param eventHandler (required) + * @param progressListener Progress listener * @param progressRequestListener Progress request listener * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -1080,7 +1089,7 @@ public com.squareup.okhttp.Response intercept( }); } - String[] localVarAuthNames = new String[] {"api_key"}; + String[] localVarAuthNames = new String[]{"api_key"}; return apiClient.buildCall( localVarPath, "PUT", @@ -1114,19 +1123,25 @@ private com.squareup.okhttp.Call updateEventHandlerValidateBeforeCall( * * @param eventHandler (required) * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ public void updateEventHandler(EventHandler eventHandler) throws ApiException { updateEventHandlerWithHttpInfo(eventHandler); } + + public void handleIncomingEvent(Map payload) { + handleIncomingEventWithHttpInfo(payload); + } + + /** * Update an existing event handler. * * @param eventHandler (required) * @return ApiResponse<Void> * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the - * response body + * response body */ private ApiResponse updateEventHandlerWithHttpInfo(EventHandler eventHandler) throws ApiException { @@ -1134,4 +1149,73 @@ private ApiResponse updateEventHandlerWithHttpInfo(EventHandler eventHandl updateEventHandlerValidateBeforeCall(eventHandler, null, null); return apiClient.execute(call); } + + private ApiResponse handleIncomingEventWithHttpInfo(Map payload) + throws ApiException { + com.squareup.okhttp.Call call = handleIncomingEventValidateBeforeCall(payload, null, null); + return apiClient.execute(call); + } + + private com.squareup.okhttp.Call handleIncomingEventValidateBeforeCall( + Map payload, + final ProgressResponseBody.ProgressListener progressListener, + final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { + Object localVarPostBody = payload; + + // create path and map variables + String localVarPath = "/event/handleIncomingEvent"; + + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + + Map localVarHeaderParams = new HashMap(); + + Map localVarFormParams = new HashMap(); + + final String[] localVarAccepts = {}; + + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept); + + final String[] localVarContentTypes = {"application/json"}; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + localVarHeaderParams.put("Content-Type", localVarContentType); + + if (progressListener != null) { + apiClient + .getHttpClient() + .networkInterceptors() + .add( + new com.squareup.okhttp.Interceptor() { + @Override + public com.squareup.okhttp.Response intercept( + com.squareup.okhttp.Interceptor.Chain chain) + throws IOException { + com.squareup.okhttp.Response originalResponse = + chain.proceed(chain.request()); + return originalResponse + .newBuilder() + .body( + new ProgressResponseBody( + originalResponse.body(), + progressListener)) + .build(); + } + }); + } + + String[] localVarAuthNames = new String[] {"api_key"}; + return apiClient.buildCall( + localVarPath, + "POST", + localVarQueryParams, + localVarCollectionQueryParams, + localVarPostBody, + localVarHeaderParams, + localVarFormParams, + localVarAuthNames, + progressRequestListener); + } + + } From 1644dad4364956efe5be6f98f1834f98f941de7e Mon Sep 17 00:00:00 2001 From: boneys Date: Thu, 4 Jan 2024 13:45:10 -0800 Subject: [PATCH 03/11] Spotless check fixes --- .../io/orkes/conductor/client/EventClient.java | 1 + .../client/http/OrkesWorkflowClient.java | 4 ++-- .../client/http/api/EventResourceApi.java | 16 +++++++++------- .../model/JumpWorkflowExecutionRequest.java | 12 ++++++++++++ .../client/api/SchedulerClientTests.java | 6 +++--- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/orkes/conductor/client/EventClient.java b/src/main/java/io/orkes/conductor/client/EventClient.java index a69fdf9a..02a12b5a 100644 --- a/src/main/java/io/orkes/conductor/client/EventClient.java +++ b/src/main/java/io/orkes/conductor/client/EventClient.java @@ -18,6 +18,7 @@ import java.util.Map; import com.netflix.conductor.common.metadata.events.EventHandler; + import io.orkes.conductor.client.model.event.QueueConfiguration; public abstract class EventClient extends com.netflix.conductor.client.http.EventClient { diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index 5610ad18..a78fae5a 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -19,10 +19,9 @@ import java.util.UUID; import java.util.concurrent.*; -import com.netflix.conductor.common.metadata.workflow.*; -import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; import org.apache.commons.lang.StringUtils; +import com.netflix.conductor.common.metadata.workflow.*; import com.netflix.conductor.common.model.BulkResponse; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; @@ -35,6 +34,7 @@ import io.orkes.conductor.client.http.api.WorkflowBulkResourceApi; import io.orkes.conductor.client.http.api.WorkflowResourceApi; import io.orkes.conductor.client.model.CorrelationIdsSearchRequest; +import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; diff --git a/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java b/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java index f65e7d07..57e387c4 100644 --- a/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java +++ b/src/main/java/io/orkes/conductor/client/http/api/EventResourceApi.java @@ -12,8 +12,15 @@ */ package io.orkes.conductor.client.http.api; -import com.fasterxml.jackson.core.type.TypeReference; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import com.netflix.conductor.common.metadata.events.EventHandler; + import io.orkes.conductor.client.ApiClient; import io.orkes.conductor.client.http.ApiException; import io.orkes.conductor.client.http.ApiResponse; @@ -22,12 +29,7 @@ import io.orkes.conductor.client.http.ProgressRequestBody; import io.orkes.conductor.client.http.ProgressResponseBody; -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; public class EventResourceApi { private ApiClient apiClient; diff --git a/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java b/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java index 2d7a30fa..78f7ff08 100644 --- a/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java +++ b/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java @@ -1,3 +1,15 @@ +/* + * Copyright 2024 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package io.orkes.conductor.client.model; import java.util.Map; diff --git a/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java b/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java index 07b82c8e..9edc8fd3 100644 --- a/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java @@ -12,16 +12,16 @@ */ package io.orkes.conductor.client.api; -import io.orkes.conductor.client.model.TagObject; +import java.util.List; + import org.junit.jupiter.api.Test; import io.orkes.conductor.client.SchedulerClient; import io.orkes.conductor.client.model.SaveScheduleRequest; +import io.orkes.conductor.client.model.TagObject; import io.orkes.conductor.client.model.WorkflowSchedule; import io.orkes.conductor.client.util.Commons; -import java.util.List; - import static org.junit.jupiter.api.Assertions.*; public class SchedulerClientTests extends ClientTest { From 7892c7f967141480f81fd752cbeb80c45f33ab1a Mon Sep 17 00:00:00 2001 From: boneys Date: Thu, 4 Jan 2024 14:45:42 -0800 Subject: [PATCH 04/11] Fix some of the failing tests --- .../conductor/client/api/AuthorizationClientTests.java | 3 +-- .../orkes/conductor/client/api/MetadataClientTests.java | 9 +++++---- .../orkes/conductor/client/api/SchedulerClientTests.java | 3 +-- .../io/orkes/conductor/client/api/SecretClientTests.java | 1 - .../io/orkes/conductor/client/api/TaskClientTests.java | 5 +++-- .../orkes/conductor/client/api/WorkflowClientTests.java | 7 +++---- .../java/io/orkes/conductor/client/util/Commons.java | 2 -- 7 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/test/java/io/orkes/conductor/client/api/AuthorizationClientTests.java b/src/test/java/io/orkes/conductor/client/api/AuthorizationClientTests.java index c929fc51..f120b010 100644 --- a/src/test/java/io/orkes/conductor/client/api/AuthorizationClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/AuthorizationClientTests.java @@ -255,7 +255,7 @@ void testMethods() { } } assertTrue(found); - authorizationClient.getPermissions("abc", Commons.GROUP_ID); + authorizationClient.getPermissions("APPLICATION", applicationId); assertEquals(authorizationClient.getApplication(applicationId).getId(), applicationId); assertTrue( authorizationClient @@ -325,7 +325,6 @@ AuthorizationRequest getAuthorizationRequest() { private List getTagObject() { TagObject tagObject = new TagObject(); - tagObject.setType(TagObject.TypeEnum.METADATA); tagObject.setKey("department"); tagObject.setValue("accounts"); return List.of(tagObject); diff --git a/src/test/java/io/orkes/conductor/client/api/MetadataClientTests.java b/src/test/java/io/orkes/conductor/client/api/MetadataClientTests.java index f58830af..529174dd 100644 --- a/src/test/java/io/orkes/conductor/client/api/MetadataClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/MetadataClientTests.java @@ -29,11 +29,12 @@ import static org.junit.jupiter.api.Assertions.*; +@SuppressWarnings("unchecked") public class MetadataClientTests extends ClientTest { private final MetadataClient metadataClient; public MetadataClientTests() { - metadataClient = super.orkesClients.getMetadataClient(); + metadataClient = orkesClients.getMetadataClient(); ((OrkesMetadataClient) metadataClient).withReadTimeout(45000); } @@ -50,7 +51,7 @@ void taskDefinition() { metadataClient.registerTaskDefs(List.of(taskDef)); metadataClient.updateTaskDef(taskDef); TaskDef receivedTaskDef = metadataClient.getTaskDef(Commons.TASK_NAME); - assertTrue(taskDef.getName().equals(receivedTaskDef.getName())); + assertEquals(taskDef.getName(), receivedTaskDef.getName()); } @Test @@ -72,7 +73,7 @@ void workflow() { .getWorkflowDefWithMetadata(Commons.WORKFLOW_NAME, Commons.WORKFLOW_VERSION); WorkflowDef receivedWorkflowDef = metadataClient.getWorkflowDef(Commons.WORKFLOW_NAME, Commons.WORKFLOW_VERSION); - assertTrue(receivedWorkflowDef.getName().equals(Commons.WORKFLOW_NAME)); + assertEquals(receivedWorkflowDef.getName(), Commons.WORKFLOW_NAME); assertEquals(receivedWorkflowDef.getVersion(), Commons.WORKFLOW_VERSION); } @@ -91,7 +92,7 @@ void tagTask() throws Exception { metadataClient.setTaskTags(List.of(tagObject), Commons.TASK_NAME); assertNotNull( TestUtil.retryMethodCall( - () -> metadataClient.getTags())); + metadataClient::getTags)); List tags = (List) TestUtil.retryMethodCall( () -> metadataClient.getTaskTags(Commons.TASK_NAME)); assertIterableEquals(List.of(tagObject), tags); diff --git a/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java b/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java index 9edc8fd3..14f832a8 100644 --- a/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/SchedulerClientTests.java @@ -31,7 +31,7 @@ public class SchedulerClientTests extends ClientTest { private final SchedulerClient schedulerClient; public SchedulerClientTests() { - schedulerClient = super.orkesClients.getSchedulerClient(); + schedulerClient = orkesClients.getSchedulerClient(); } @Test @@ -73,7 +73,6 @@ SaveScheduleRequest getSaveScheduleRequest() { private List getTagObject() { TagObject tagObject = new TagObject(); - tagObject.setType(TagObject.TypeEnum.METADATA); tagObject.setKey("department"); tagObject.setValue("accounts"); return List.of(tagObject); diff --git a/src/test/java/io/orkes/conductor/client/api/SecretClientTests.java b/src/test/java/io/orkes/conductor/client/api/SecretClientTests.java index ba23c490..9864745e 100644 --- a/src/test/java/io/orkes/conductor/client/api/SecretClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/SecretClientTests.java @@ -58,7 +58,6 @@ void testMethods() { private TagObject getTagObject() { TagObject tagObject = new TagObject(); - tagObject.setType(TagObject.TypeEnum.METADATA); tagObject.setKey("department"); tagObject.setValue("accounts"); return tagObject; diff --git a/src/test/java/io/orkes/conductor/client/api/TaskClientTests.java b/src/test/java/io/orkes/conductor/client/api/TaskClientTests.java index a1b91f98..7dbc09a7 100644 --- a/src/test/java/io/orkes/conductor/client/api/TaskClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/TaskClientTests.java @@ -85,6 +85,8 @@ public void testUpdateByRefName() { Workflow workflow = workflowClient.getWorkflow(workflowId, true); assertNotNull(workflow); + System.out.println("Running test for workflow: " + workflowId); + int maxLoop = 10; int count = 0; while (!workflow.getStatus().isTerminal() && count < maxLoop) { @@ -130,8 +132,7 @@ public void testUpdateByRefNameSync() { for (String referenceName : runningTasks) { System.out.println("Updating " + referenceName); try { - workflow = taskClient.updateTaskSync(workflowId, referenceName, TaskResult.Status.COMPLETED, - new TaskOutput()); + workflow = taskClient.updateTaskSync(workflowId, referenceName, TaskResult.Status.COMPLETED, new TaskOutput()); System.out.println("Workflow: " + workflow); } catch (ApiException apiException) { // 404 == task was updated already and there are no pending tasks diff --git a/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java b/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java index 18c402e8..b793c5f5 100644 --- a/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java @@ -87,14 +87,13 @@ public void testSearchByCorrelationIds() { } } // Let's give couple of seconds for indexing to complete - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - Map> result = workflowClient.getWorkflowsByNamesAndCorrelationIds(correlationIds, - workflowNames.stream().collect(Collectors.toList()), true, false); + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + Map> result = workflowClient.getWorkflowsByNamesAndCorrelationIds(correlationIds, new ArrayList<>(workflowNames), true, false); assertNotNull(result); assertEquals(correlationIds.size(), result.size()); for (String correlationId : correlationIds) { assertEquals(5, result.get(correlationId).size()); - Set ids = result.get(correlationId).stream().map(wf -> wf.getWorkflowId()) + Set ids = result.get(correlationId).stream().map(Workflow::getWorkflowId) .collect(Collectors.toSet()); assertEquals(correlationIdToWorkflows.get(correlationId), ids); } diff --git a/src/test/java/io/orkes/conductor/client/util/Commons.java b/src/test/java/io/orkes/conductor/client/util/Commons.java index 91edcb14..f69e5626 100644 --- a/src/test/java/io/orkes/conductor/client/util/Commons.java +++ b/src/test/java/io/orkes/conductor/client/util/Commons.java @@ -32,7 +32,6 @@ public class Commons { public static TagObject getTagObject() { TagObject tagObject = new TagObject(); - tagObject.setType(TagObject.TypeEnum.METADATA); tagObject.setKey("a"); tagObject.setValue("b"); return tagObject; @@ -40,7 +39,6 @@ public static TagObject getTagObject() { public static TagString getTagString() { TagString tagString = new TagString(); - tagString.setType(TagString.TypeEnum.METADATA); tagString.setKey("a"); tagString.setValue("b"); return tagString; From 1a300e7d9ea7fec75b741eb64b608e85e7cecb29 Mon Sep 17 00:00:00 2001 From: boneys Date: Thu, 4 Jan 2024 14:55:18 -0800 Subject: [PATCH 05/11] Removing some unused methods --- .../orkes/conductor/client/util/ApiUtil.java | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/src/test/java/io/orkes/conductor/client/util/ApiUtil.java b/src/test/java/io/orkes/conductor/client/util/ApiUtil.java index 9148c215..91f2da93 100644 --- a/src/test/java/io/orkes/conductor/client/util/ApiUtil.java +++ b/src/test/java/io/orkes/conductor/client/util/ApiUtil.java @@ -22,14 +22,6 @@ public class ApiUtil { private static final String ENV_KEY_ID = "SDK_INTEGRATION_TESTS_SERVER_KEY_ID"; private static final String ENV_SECRET = "SDK_INTEGRATION_TESTS_SERVER_KEY_SECRET"; - public static final String USER1_APP_ID = "USER1_APPLICATION_ID"; - - public static final String USER2_APP_ID = "USER2_APPLICATION_ID"; - public static final String USER1_KEY_ID = "USER1_KEY_ID"; - private static final String USER1_SECRET = "USER1_SECRET"; - public static final String USER2_KEY_ID = "USER2_KEY_ID"; - private static final String USER2_SECRET = "USER2_SECRET"; - public static OrkesClients getOrkesClient() { final ApiClient apiClient = getApiClientWithCredentials(); apiClient.setReadTimeout(10_000); @@ -50,34 +42,6 @@ public static ApiClient getApiClientWithCredentials() { return apiClient; } - public static ApiClient getUser1Client() { - String basePath = getBasePath(); - assertNotNull(basePath, ENV_ROOT_URI + " env not set"); - String keyId = getEnv(USER1_KEY_ID); - assertNotNull(keyId, USER1_KEY_ID + " env not set"); - String keySecret = getEnv(USER1_SECRET); - assertNotNull(keySecret, USER1_SECRET + " env not set"); - ApiClient apiClient = new ApiClient(basePath, keyId, keySecret); - apiClient.setWriteTimeout(30_000); - apiClient.setReadTimeout(30_000); - apiClient.setConnectTimeout(30_000); - return apiClient; - } - - public static ApiClient getUser2Client() { - String basePath = getBasePath(); - assertNotNull(basePath, ENV_ROOT_URI + " env not set"); - String keyId = getEnv(USER2_KEY_ID); - assertNotNull(keyId, USER2_KEY_ID + " env not set"); - String keySecret = getEnv(USER2_SECRET); - assertNotNull(keySecret, USER2_SECRET + " env not set"); - ApiClient apiClient = new ApiClient(basePath, keyId, keySecret); - apiClient.setWriteTimeout(30_000); - apiClient.setReadTimeout(30_000); - apiClient.setConnectTimeout(30_000); - return apiClient; - } - public static String getBasePath() { return getEnv(ENV_ROOT_URI); } From bafd3b604a964fb7780f4ddb3b8f5c7ab56fb1cf Mon Sep 17 00:00:00 2001 From: boneys Date: Thu, 4 Jan 2024 15:04:39 -0800 Subject: [PATCH 06/11] Fix feedback from review --- .../java/io/orkes/conductor/client/http/OrkesEventClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java index dd187ecb..265987ba 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesEventClient.java @@ -74,7 +74,7 @@ public List getEventHandlers() { @Override public void handleIncomingEvent(Map payload) { - this.eventResourceApi.handleIncomingEvent(payload); + eventResourceApi.handleIncomingEvent(payload); } @Override From 285dd1a926636c2d562174a66cebf9f58b1d438b Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 17 Jan 2024 10:49:51 -0800 Subject: [PATCH 07/11] state update api --- .../conductor/client/WorkflowClient.java | 16 +++ .../client/http/OrkesWorkflowClient.java | 12 ++ .../client/http/api/WorkflowResourceApi.java | 103 ++++++++++++++++++ .../client/model/WorkflowStateUpdate.java | 14 +++ 4 files changed, 145 insertions(+) create mode 100644 src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java diff --git a/src/main/java/io/orkes/conductor/client/WorkflowClient.java b/src/main/java/io/orkes/conductor/client/WorkflowClient.java index 9b443c2b..d22c3a4b 100644 --- a/src/main/java/io/orkes/conductor/client/WorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/WorkflowClient.java @@ -27,6 +27,7 @@ import io.orkes.conductor.client.http.ApiException; import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; +import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; @@ -96,4 +97,19 @@ public abstract Map> getWorkflowsByNamesAndCorrelationIds public abstract void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest); public abstract void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest body); + + /** + * + * Update a runningw workflow by updating its variables or one of the scheduled task identified by task reference name + * @param workflowId Id of the workflow to be updated + * @param waitUntilTaskRefNames List of task reference names to wait for. The api call will wait for ANY of these tasks to be availble in workflow. + * @param waitForSeconds Maximum time to wait for. If the workflow does not complete or reach one of the tasks listed in waitUntilTaskRefNames by this time, + * the call will return with the current status of the workflow + * @param updateRequest Payload for updating state of workflow. + * + * @return + */ + public abstract WorkflowRun updateWorkflow(String workflowId, List waitUntilTaskRefNames, Integer waitForSeconds, + WorkflowStateUpdate updateRequest); } + diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index a78fae5a..1d33a84f 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -14,12 +14,14 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import org.apache.commons.lang.StringUtils; +import org.checkerframework.checker.units.qual.A; import com.netflix.conductor.common.metadata.workflow.*; import com.netflix.conductor.common.model.BulkResponse; @@ -35,6 +37,7 @@ import io.orkes.conductor.client.http.api.WorkflowResourceApi; import io.orkes.conductor.client.model.CorrelationIdsSearchRequest; import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; +import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; @@ -352,6 +355,15 @@ public void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest upg httpClient.upgradeRunningWorkflow(upgradeWorkflowRequest, workflowId); } + @Override + public WorkflowRun updateWorkflow(String workflowId, List waitUntilTaskRefNames, Integer waitForSeconds, WorkflowStateUpdate updateRequest) { + String joinedReferenceNames = ""; + if (waitUntilTaskRefNames != null) { + joinedReferenceNames = String.join(",", waitUntilTaskRefNames); + } + return httpClient.updateWorkflowState(updateRequest, UUID.randomUUID().toString(), workflowId, joinedReferenceNames, waitForSeconds); + } + @Override public void close() { shutdown(); diff --git a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java index 3b6f110f..23e20914 100644 --- a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java +++ b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; +import com.google.common.reflect.TypeToken; import com.netflix.conductor.common.metadata.workflow.*; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowTestRequest; @@ -4012,4 +4013,106 @@ public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Ch return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener); } + + + /** + * Update workflow and task status + * Updates the workflow variables, tasks and triggers evaluation. + * @param body (required) + * @param requestId (required) + * @param workflowId (required) + * @param waitUntilTaskRef (optional) + * @param waitForSeconds (optional, default to 10) + * @return WorkflowRun + * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body + */ + public WorkflowRun updateWorkflowState(WorkflowStateUpdate updateRequest, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds) throws ApiException { + ApiResponse resp = updateWorkflowAndTaskStateWithHttpInfo(updateRequest, requestId, workflowId, waitUntilTaskRef, waitForSeconds); + return resp.getData(); + } + + /** + * Update workflow and task status + * Updates the workflow variables, tasks and triggers evaluation. + * @param body (required) + * @param requestId (required) + * @param workflowId (required) + * @param waitUntilTaskRef (optional) + * @param waitForSeconds (optional, default to 10) + * @return ApiResponse<WorkflowRun> + * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body + */ + public ApiResponse updateWorkflowAndTaskStateWithHttpInfo(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds) throws ApiException { + com.squareup.okhttp.Call call = updateWorkflowAndTaskStateValidateBeforeCall(body, requestId, workflowId, waitUntilTaskRef, waitForSeconds, null, null); + Type localVarReturnType = new TypeToken(){}.getType(); + return apiClient.execute(call, localVarReturnType); + } + + + public com.squareup.okhttp.Call updateWorkflowAndTaskStateCall(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { + Object localVarPostBody = body; + + // create path and map variables + String localVarPath = "/workflow/{workflowId}/state" + .replaceAll("\\{" + "workflowId" + "\\}", apiClient.escapeString(workflowId.toString())); + + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + if (requestId != null) + localVarQueryParams.addAll(apiClient.parameterToPair("requestId", requestId)); + if (waitUntilTaskRef != null) + localVarQueryParams.addAll(apiClient.parameterToPair("waitUntilTaskRef", waitUntilTaskRef)); + if (waitForSeconds != null) + localVarQueryParams.addAll(apiClient.parameterToPair("waitForSeconds", waitForSeconds)); + + Map localVarHeaderParams = new HashMap(); + + Map localVarFormParams = new HashMap(); + + final String[] localVarAccepts = { + "*/*" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept); + + final String[] localVarContentTypes = { + "application/json" + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + localVarHeaderParams.put("Content-Type", localVarContentType); + + if(progressListener != null) { + apiClient.getHttpClient().networkInterceptors().add(new com.squareup.okhttp.Interceptor() { + @Override + public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Chain chain) throws IOException { + com.squareup.okhttp.Response originalResponse = chain.proceed(chain.request()); + return originalResponse.newBuilder() + .body(new ProgressResponseBody(originalResponse.body(), progressListener)) + .build(); + } + }); + } + + String[] localVarAuthNames = new String[] { "api_key" }; + return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener); + } + + @SuppressWarnings("rawtypes") + private com.squareup.okhttp.Call updateWorkflowAndTaskStateValidateBeforeCall(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { + // verify the required parameter 'body' is set + if (body == null) { + throw new ApiException("Missing the required parameter 'body' when calling updateWorkflowAndTaskState(Async)"); + } + // verify the required parameter 'requestId' is set + if (requestId == null) { + throw new ApiException("Missing the required parameter 'requestId' when calling updateWorkflowAndTaskState(Async)"); + } + // verify the required parameter 'workflowId' is set + if (workflowId == null) { + throw new ApiException("Missing the required parameter 'workflowId' when calling updateWorkflowAndTaskState(Async)"); + } + + com.squareup.okhttp.Call call = updateWorkflowAndTaskStateCall(body, requestId, workflowId, waitUntilTaskRef, waitForSeconds, progressListener, progressRequestListener); + return call; + } } diff --git a/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java b/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java new file mode 100644 index 00000000..75e25986 --- /dev/null +++ b/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java @@ -0,0 +1,14 @@ +package io.orkes.conductor.client.model; + +import java.util.Map; + +import com.netflix.conductor.common.metadata.tasks.TaskResult; + +import lombok.Data; + +@Data +public class WorkflowStateUpdate { + private String taskReferenceName; + private Map variables; + private TaskResult taskResult; +} From 8abab6cb5ce53f653bd3da6e1c50968b05ac7a35 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 28 Jan 2024 15:54:06 -0800 Subject: [PATCH 08/11] fixes and spotless --- build.gradle | 7 ++++--- .../io/orkes/conductor/client/WorkflowClient.java | 1 - .../conductor/client/http/OrkesWorkflowClient.java | 2 -- .../client/http/api/WorkflowResourceApi.java | 4 ++-- .../conductor/client/model/WorkflowStateUpdate.java | 12 ++++++++++++ 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index b30bfed4..36ae3def 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ ext { versions = [ awaitility : '4.2.0', commonsLang : '3.12.0', - conductor : '3.9.21-orkes', + conductor : '3.9.24-orkes', jackson : '2.11.4!!', junit : '5.9.0', slf4j : '1.7.36', @@ -47,7 +47,9 @@ dependencies { api ("io.orkes.conductor:conductor-common:${versions.conductor}") api ("io.orkes.conductor:conductor-grpc:${versions.conductor}") - api ("io.orkes.conductor:conductor-java-sdk:${versions.conductor}") + api ("io.orkes.conductor:conductor-java-sdk:${versions.conductor}") { + exclude group: 'com.netflix.conductor' + } implementation 'javax.annotation:javax.annotation-api:1.3.2' implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:${versions.jackson}" @@ -98,7 +100,6 @@ dependencies { implementation "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}!!" implementation "org.apache.logging.log4j:log4j-jul:${versions.log4j}!!" implementation "org.apache.logging.log4j:log4j-web:${versions.log4j}!!" - //implementation "org.apache.logging.log4j:log4j-to-slf4j:${versions.log4j}!!" //spring implementation "org.springframework:spring-context:5.3.24" diff --git a/src/main/java/io/orkes/conductor/client/WorkflowClient.java b/src/main/java/io/orkes/conductor/client/WorkflowClient.java index d22c3a4b..0a8655ad 100644 --- a/src/main/java/io/orkes/conductor/client/WorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/WorkflowClient.java @@ -112,4 +112,3 @@ public abstract Map> getWorkflowsByNamesAndCorrelationIds public abstract WorkflowRun updateWorkflow(String workflowId, List waitUntilTaskRefNames, Integer waitForSeconds, WorkflowStateUpdate updateRequest); } - diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index 1d33a84f..59d530d4 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -14,14 +14,12 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import org.apache.commons.lang.StringUtils; -import org.checkerframework.checker.units.qual.A; import com.netflix.conductor.common.metadata.workflow.*; import com.netflix.conductor.common.model.BulkResponse; diff --git a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java index 23e20914..c761592f 100644 --- a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java +++ b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.Map; -import com.google.common.reflect.TypeToken; import com.netflix.conductor.common.metadata.workflow.*; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowTestRequest; @@ -30,6 +29,7 @@ import io.orkes.conductor.common.model.WorkflowRun; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.reflect.TypeToken; import com.squareup.okhttp.Call; public class WorkflowResourceApi { @@ -4018,7 +4018,7 @@ public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Ch /** * Update workflow and task status * Updates the workflow variables, tasks and triggers evaluation. - * @param body (required) + * @param updateRequest (required) * @param requestId (required) * @param workflowId (required) * @param waitUntilTaskRef (optional) diff --git a/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java b/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java index 75e25986..f374a35f 100644 --- a/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java +++ b/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java @@ -1,3 +1,15 @@ +/* + * Copyright 2024 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package io.orkes.conductor.client.model; import java.util.Map; From b599fd0c1aeda3ea1129972aeb1d0dc3bfcc876c Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 28 Jan 2024 15:59:33 -0800 Subject: [PATCH 09/11] Update EventClientTests.java --- .../client/api/EventClientTests.java | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/src/test/java/io/orkes/conductor/client/api/EventClientTests.java b/src/test/java/io/orkes/conductor/client/api/EventClientTests.java index 897e9138..fb061c21 100644 --- a/src/test/java/io/orkes/conductor/client/api/EventClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/EventClientTests.java @@ -13,10 +13,7 @@ package io.orkes.conductor.client.api; import java.util.List; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import com.netflix.conductor.common.metadata.events.EventHandler; @@ -25,17 +22,10 @@ import io.orkes.conductor.client.EventClient; import io.orkes.conductor.client.http.ApiException; -import io.orkes.conductor.client.model.event.QueueConfiguration; -import io.orkes.conductor.client.model.event.QueueWorkerConfiguration; -import io.orkes.conductor.client.model.event.kafka.KafkaConfiguration; -import io.orkes.conductor.client.model.event.kafka.KafkaConsumer; -import io.orkes.conductor.client.model.event.kafka.KafkaProducer; import io.orkes.conductor.client.util.Commons; -import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; public class EventClientTests extends ClientTest { private static final String EVENT_NAME = "test_sdk_java_event_name"; @@ -73,39 +63,9 @@ void testEventHandler() { assertIterableEquals(List.of(), eventClient.getEventHandlers(EVENT, false)); } - @Test - void testKafkaQueueConfiguration() throws Exception { - QueueConfiguration queueConfiguration = getQueueConfiguration(); - eventClient.deleteQueueConfig(queueConfiguration); - assertThrows( - ApiException.class, - () -> { - eventClient.getQueueConfig(queueConfiguration); - }); - eventClient.putQueueConfig(queueConfiguration); - Map configurationResponse = eventClient.getQueueConfig(queueConfiguration); - assertTrue(configurationResponse.containsKey("consumer")); - assertTrue(configurationResponse.containsKey("producer")); - eventClient.deleteQueueConfig(queueConfiguration); - } - QueueConfiguration getQueueConfiguration() throws Exception { - return new KafkaConfiguration(KAFKA_QUEUE_TOPIC_NAME) - .withConsumer(getKafkaConsumer()) - .withProducer(getKafkaProducer()); - } - QueueWorkerConfiguration getKafkaConsumer() throws Exception { - return new KafkaConsumer(KAFKA_BOOTSTRAP_SERVERS_CONFIG) - // 1 second, instead of default 2 seconds - .withConfiguration(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000"); - } - QueueWorkerConfiguration getKafkaProducer() throws Exception { - return new KafkaProducer(KAFKA_BOOTSTRAP_SERVERS_CONFIG) - // send messages in chunks of 1024 bytes, instead of default every new data - .withConfiguration(ProducerConfig.BATCH_SIZE_CONFIG, "1024"); - } EventHandler getEventHandler() { EventHandler eventHandler = new EventHandler(); From 21926276a2dc6f1bbadc1d27a00e1472eb624a26 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 28 Jan 2024 16:25:47 -0800 Subject: [PATCH 10/11] tests --- .../client/api/WorkflowStateUpdateTests.java | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java diff --git a/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java new file mode 100644 index 00000000..2d28b562 --- /dev/null +++ b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.run.Workflow; + +import io.orkes.conductor.client.WorkflowClient; +import io.orkes.conductor.client.model.WorkflowStateUpdate; +import io.orkes.conductor.common.model.WorkflowRun; +import lombok.SneakyThrows; + +public class WorkflowStateUpdateTests extends ClientTest { + + private static WorkflowClient workflowClient; + + @BeforeAll + public static void init() { + workflowClient = orkesClients.getWorkflowClient(); + } + + @SneakyThrows + public String startWorkflow() { + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); + startWorkflowRequest.setName("sync_task_variable_updates"); + startWorkflowRequest.setVersion(1); + var run = workflowClient.executeWorkflow(startWorkflowRequest, "wait_task_ref"); + return run.get(10, TimeUnit.SECONDS) + .getWorkflowId(); + } + + @Test + public void test() { + String workflowId = startWorkflow(); + System.out.println(workflowId); + + TaskResult taskResult = new TaskResult(); + taskResult.setOutputData(Map.of("a", "b")); + + WorkflowStateUpdate request = new WorkflowStateUpdate(); + request.setTaskReferenceName("wait_task_ref"); + request.setTaskResult(taskResult); + + request.setVariables(Map.of("case", "case1")); + + WorkflowRun workflowRun = workflowClient.updateWorkflow(workflowId, List.of("wait_task_ref_1", "wait_task_ref_2"), 10, request); + + System.out.println(workflowRun); + System.out.println(workflowRun.getStatus()); + System.out.println(workflowRun.getTasks() + .stream() + .map(task -> task.getReferenceTaskName() + ":" + task.getStatus()) + .collect(Collectors.toList())); + + request = new WorkflowStateUpdate(); + request.setTaskReferenceName("wait_task_ref_2"); + request.setTaskResult(taskResult); + workflowRun = workflowClient.updateWorkflow(workflowId, List.of(), 10, request); + + assertEquals(Workflow.WorkflowStatus.COMPLETED, workflowRun.getStatus()); + Set allTaskStatus = workflowRun.getTasks() + .stream() + .map(t -> t.getStatus()) + .collect(Collectors.toSet()); + assertEquals(1, allTaskStatus.size()); + assertEquals(Task.Status.COMPLETED, allTaskStatus.iterator().next()); + + System.out.println(workflowRun.getStatus()); + System.out.println(workflowRun.getTasks() + .stream() + .map(task -> task.getReferenceTaskName() + ":" + task.getStatus()) + .collect(Collectors.toList())); + + } +} From 80ca9b3deaa827337089ef7877a7ef4568501b8c Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 28 Jan 2024 16:26:04 -0800 Subject: [PATCH 11/11] formatting --- .../conductor/client/api/WorkflowStateUpdateTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java index 2d28b562..1505179f 100644 --- a/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java +++ b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java @@ -12,9 +12,6 @@ */ package io.orkes.conductor.client.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.util.List; import java.util.Map; import java.util.Set; @@ -32,8 +29,11 @@ import io.orkes.conductor.client.WorkflowClient; import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.common.model.WorkflowRun; + import lombok.SneakyThrows; +import static org.junit.Assert.assertEquals; + public class WorkflowStateUpdateTests extends ClientTest { private static WorkflowClient workflowClient;