Skip to content

Commit

Permalink
[Backport 2.x] Add optional delay parameter to no-op step (#677)
Browse files Browse the repository at this point in the history
Add optional delay parameter to no-op step (#674)


(cherry picked from commit bae3aa2)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent b9c7354 commit 8130608
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Enhancements
- Add guardrails to default use case params ([#658](https://github.com/opensearch-project/flow-framework/pull/658))
- Allow strings for boolean workflow step parameters ([#671](https://github.com/opensearch-project/flow-framework/pull/671))
- Add optional delay parameter to no-op step ([#674](https://github.com/opensearch-project/flow-framework/pull/674))

### Bug Fixes
- Reset workflow state to initial state after successful deprovision ([#635](https://github.com/opensearch-project/flow-framework/pull/635))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ private CommonValue() {}
public static final String CONFIGURATIONS = "configurations";
/** Guardrails field */
public static final String GUARDRAILS_FIELD = "guardrails";
/** Delay field */
public static final String DELAY_FIELD = "delay";

/*
* Constants associated with resource provisioning / state
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@
package org.opensearch.flowframework.workflow;

import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DELAY_FIELD;

/**
* A workflow step that does nothing. May be used for synchronizing other actions.
Expand All @@ -32,6 +41,29 @@ public PlainActionFuture<WorkflowData> execute(
Map<String, String> params
) {
PlainActionFuture<WorkflowData> future = PlainActionFuture.newFuture();

Set<String> requiredKeys = Collections.emptySet();
Set<String> optionalKeys = Set.of(DELAY_FIELD);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);
if (inputs.containsKey(DELAY_FIELD)) {
long delay = TimeValue.parseTimeValue(inputs.get(DELAY_FIELD).toString(), DELAY_FIELD).millis();
Thread.sleep(delay);
}
} catch (IllegalArgumentException iae) {
throw new WorkflowStepException(iae.getMessage(), RestStatus.BAD_REQUEST);
} catch (InterruptedException e) {
FutureUtils.cancel(future);
}

future.onResponse(WorkflowData.EMPTY);
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
package org.opensearch.flowframework.workflow;

import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import static org.opensearch.flowframework.common.CommonValue.DELAY_FIELD;

public class NoOpStepTests extends OpenSearchTestCase {

Expand All @@ -28,4 +32,32 @@ public void testNoOpStep() throws IOException {
);
assertTrue(future.isDone());
}

public void testNoOpStepDelay() throws IOException, InterruptedException {
NoOpStep noopStep = new NoOpStep();
WorkflowData delayData = new WorkflowData(Map.of(DELAY_FIELD, "1s"), null, null);

long start = System.nanoTime();
PlainActionFuture<WorkflowData> future = noopStep.execute(
"nodeId",
delayData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertTrue(future.isDone());
// Sleep isn't exactly accurate so leave 100ms of roundoff
assertTrue(System.nanoTime() - start > 900_000_000L);
}

public void testNoOpStepParse() throws IOException {
NoOpStep noopStep = new NoOpStep();
WorkflowData delayData = new WorkflowData(Map.of(DELAY_FIELD, "foo"), null, null);

Exception ex = assertThrows(
WorkflowStepException.class,
() -> noopStep.execute("nodeId", delayData, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())
);
assertEquals("failed to parse setting [delay] with value [foo] as a time value: unit is missing or unrecognized", ex.getMessage());
}
}

0 comments on commit 8130608

Please sign in to comment.