Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Adds reprovision API to support updating search pipelines, ingest pipelines index settings #824

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,11 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))
- Add allow_delete parameter to Deprovision API ([#763](https://github.com/opensearch-project/flow-framework/pull/763))
- Adds reprovision API to support updating search pipelines, ingest pipelines index settings ([#804](https://github.com/opensearch-project/flow-framework/pull/804))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

### Bug Fixes
- Handle Not Found exceptions as successful deletions for agents and models ([#805](https://github.com/opensearch-project/flow-framework/pull/805))
- Wrap CreateIndexRequest mappings in _doc key as required ([#809](https://github.com/opensearch-project/flow-framework/pull/809))
- Have FlowFrameworkException status recognized by ExceptionsHelper ([#811](https://github.com/opensearch-project/flow-framework/pull/811))

### Infrastructure
### Documentation
### Maintenance
### Refactoring
- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778))
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ dependencies {
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.3"
implementation "com.google.code.gson:gson:2.10.1"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
Expand Down Expand Up @@ -170,7 +172,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStepAction.INSTANCE, GetWorkflowStepTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class),
new ActionHandler<>(ReprovisionWorkflowAction.INSTANCE, ReprovisionWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ private CommonValue() {}
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";

/*
* Constants associated with plugin configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.opensearch.flowframework.workflow.RegisterRemoteModelStep;
import org.opensearch.flowframework.workflow.ReindexStep;
import org.opensearch.flowframework.workflow.UndeployModelStep;
import org.opensearch.flowframework.workflow.UpdateIndexStep;
import org.opensearch.flowframework.workflow.UpdateIngestPipelineStep;
import org.opensearch.flowframework.workflow.UpdateSearchPipelineStep;

import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -43,29 +46,39 @@
public enum WorkflowResources {

/** Workflow steps for creating/deleting a connector and associated created resource */
CREATE_CONNECTOR(CreateConnectorStep.NAME, WorkflowResources.CONNECTOR_ID, DeleteConnectorStep.NAME),
CREATE_CONNECTOR(CreateConnectorStep.NAME, null, DeleteConnectorStep.NAME, WorkflowResources.CONNECTOR_ID),
/** Workflow steps for registering/deleting a remote model and associated created resource */
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local model and associated created resource */
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local sparse encoding model and associated created resource */
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local OpenSearch provided pretrained model and associated created resource */
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a model group and associated created resource */
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, WorkflowResources.MODEL_GROUP_ID, NoOpStep.NAME),
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, null, NoOpStep.NAME, WorkflowResources.MODEL_GROUP_ID),
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
DEPLOY_MODEL(DeployModelStep.NAME, null, UndeployModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteIngestPipelineStep.NAME),
CREATE_INGEST_PIPELINE(
CreateIngestPipelineStep.NAME,
UpdateIngestPipelineStep.NAME,
DeleteIngestPipelineStep.NAME,
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteSearchPipelineStep.NAME),
CREATE_SEARCH_PIPELINE(
CreateSearchPipelineStep.NAME,
UpdateSearchPipelineStep.NAME,
DeleteSearchPipelineStep.NAME,
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, UpdateIndexStep.NAME, DeleteIndexStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
REINDEX(ReindexStep.NAME, null, NoOpStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);
REGISTER_AGENT(RegisterAgentStep.NAME, null, DeleteAgentStep.NAME, WorkflowResources.AGENT_ID);

/** Connector Id for a remote model connector */
public static final String CONNECTOR_ID = "connector_id";
Expand All @@ -80,34 +93,37 @@
/** Agent Id */
public static final String AGENT_ID = "agent_id";

private final String workflowStep;
private final String resourceCreated;
private final String createStep;
private final String updateStep;
private final String deprovisionStep;
private final String resourceCreated;

private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
WorkflowResources(String createStep, String updateStep, String deprovisionStep, String resourceCreated) {
this.createStep = createStep;
this.updateStep = updateStep;
this.deprovisionStep = deprovisionStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
* Returns the create step for the given enum Constant
* @return the create step of this data.
*/
public String getWorkflowStep() {
return workflowStep;
public String getCreateStep() {
return createStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
* Returns the updateStep for the given enum Constant
* @return the updateStep of this data.
*/
public String getResourceCreated() {
return resourceCreated;
public String getUpdateStep() {
return updateStep;
}

/**
Expand All @@ -118,6 +134,14 @@
return deprovisionStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
Expand All @@ -127,7 +151,9 @@
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
if (workflowStep.equals(mapping.getCreateStep())
|| workflowStep.equals(mapping.getDeprovisionStep())
|| workflowStep.equals(mapping.getUpdateStep())) {
return mapping.getResourceCreated();
}
}
Expand All @@ -145,7 +171,7 @@
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
Expand All @@ -154,6 +180,24 @@
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the update step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to update
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getUpdateStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getUpdateStep();
}
}
}
logger.error("Unable to find update step for step: {}", workflowStep);
throw new FlowFrameworkException("Unable to find update step for step: " + workflowStep, RestStatus.BAD_REQUEST);

Check warning on line 198 in src/main/java/org/opensearch/flowframework/common/WorkflowResources.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/common/WorkflowResources.java#L197-L198

Added lines #L197 - L198 were not covered by tests
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ public void updateResourceInStateIndex(
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
"ctx._source.resources_created.add(params.newResource);",
Collections.singletonMap("newResource", newResource.resourceMap())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.REPROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
Expand Down Expand Up @@ -74,7 +75,7 @@
return List.of(
// Create new workflow
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s", WORKFLOW_URI)),
// Update use case template
// Update use case template/ reprovision existing workflow
new Route(RestRequest.Method.PUT, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID))
);
}
Expand All @@ -84,8 +85,10 @@
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
? request.params()
Expand All @@ -108,28 +111,32 @@
);
}
if (!provision && !params.isEmpty()) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
return processError(ffe, params, request);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
return processError(ffe, params, request);
}
if (reprovision && workflowId == null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && useCase != null) {
FlowFrameworkException ffe = new FlowFrameworkException(

Check warning on line 135 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L135

Added line #L135 was not covered by tests
"You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);

Check warning on line 139 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L139

Added line #L139 was not covered by tests
}
try {
Template template;
Expand Down Expand Up @@ -213,7 +220,8 @@
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
useCaseDefaultsMap,
reprovision
);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
Expand Down Expand Up @@ -249,4 +257,13 @@
);
}
}

private RestChannelConsumer processError(FlowFrameworkException ffe, Map<String, String> params, RestRequest request) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
Loading
Loading