diff --git a/examples/applications/query-opensearch/.gitignore b/examples/applications/query-opensearch/.gitignore new file mode 100644 index 000000000..55dea2dd3 --- /dev/null +++ b/examples/applications/query-opensearch/.gitignore @@ -0,0 +1 @@ +java/lib/* \ No newline at end of file diff --git a/examples/applications/query-opensearch/README.md b/examples/applications/query-opensearch/README.md new file mode 100644 index 000000000..8b42225c4 --- /dev/null +++ b/examples/applications/query-opensearch/README.md @@ -0,0 +1,66 @@ +# Querying a Pinecone Index + +This sample application shows how to perform queries against a Pinecone index. + +## Prerequisites + +Run OpenSearch locally. +``` +docker network create n1 +docker run --rm -it --network n1 --name opensearch -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" -e "plugins.security.disabled=true" opensearchproject/opensearch:latest +``` + + +Or if you want to use Amazon AWS OpenSearch Serverless: +``` +export OPENSEARCH_USERNAME= +export OPENSEARCH_PASSWORD= +export OPENSEARCH_HOST=xxxx..aoss.amazonaws.com +export OPENSEARCH_REGION= +``` + +Note that the you need to create a AWS OpenSearch collection. +This examples uses both document IDs and vector. + +This is not supported by the current Vector Search type collection so you either remove the document IDs or you use the new Vector Search type collection. + +## Configure access to the Vector Database + +Configure OpenAI access key to generate embeddings: + +``` +export OPEN_AI_ACCESS_KEY= ... +``` + + +## Deploy the LangStream application + +``` +langstream docker run test -app examples/applications/query-opensearch -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml --docker-args --network --docker-args n1 +``` + +or with AWS OpenSearch: + +``` +langstream docker run test -app examples/applications/query-opensearch -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml +``` + +## Fill the index + +Let's start a produce that sends messages to the vectors-topic: + +``` +langstream gateway produce test fill-index -v "My cat eats carrots" +langstream gateway produce test fill-index -v "My dog is called Jim" +``` + +## Search + +Search via k-NN (k-Nearest Neighbors): + +``` +langstream gateway chat test -g chat +$ > Food +My cat eats carrots +``` + diff --git a/examples/applications/query-opensearch/configuration.yaml b/examples/applications/query-opensearch/configuration.yaml new file mode 100644 index 000000000..44579a92f --- /dev/null +++ b/examples/applications/query-opensearch/configuration.yaml @@ -0,0 +1,35 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "open-ai-configuration" + name: "OpenAI Azure configuration" + configuration: + url: "${secrets.open-ai.url}" + access-key: "${secrets.open-ai.access-key}" + provider: "${secrets.open-ai.provider}" + - type: "vector-database" + name: "opensearch-datasource" + configuration: + service: "opensearch" + username: "${secrets.opensearch.username}" + password: "${secrets.opensearch.password}" + host: "${secrets.opensearch.host}" + port: "${secrets.opensearch.port}" + https: "${secrets.opensearch.https}" + region: "${secrets.opensearch.region}" diff --git a/examples/applications/query-opensearch/gateways.yaml b/examples/applications/query-opensearch/gateways.yaml new file mode 100644 index 000000000..15a2e4571 --- /dev/null +++ b/examples/applications/query-opensearch/gateways.yaml @@ -0,0 +1,29 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: fill-index + type: produce + topic: vectors-topic + + - id: chat + type: chat + chat-options: + headers: + - value-from-parameters: sessionId + questions-topic: input-topic + answers-topic: output-topic diff --git a/examples/applications/query-opensearch/query.yaml b/examples/applications/query-opensearch/query.yaml new file mode 100644 index 000000000..aef01aee5 --- /dev/null +++ b/examples/applications/query-opensearch/query.yaml @@ -0,0 +1,64 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Index Products on Vector Database" +topics: + - name: "input-topic" + creation-mode: create-if-not-exists + - name: "output-topic" + creation-mode: create-if-not-exists +errors: + on-failure: skip +pipeline: + - name: "convert-to-json" + type: "document-to-json" + input: "input-topic" + configuration: + text-field: "question" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model + embeddings-field: "value.embeddings" + text: "{{ value.question }}" + - name: "Execute Query" + type: "query-vector-db" + configuration: + datasource: "opensearch-datasource" + query: | + { + "index": "langstream-index", + "query": { + "knn": { + "embeddings": { + "vector": ?, + "k": 1 + } + } + } + } + fields: + - "value.embeddings" + output-field: "value.query_result" + only-first: true + - name: "Format response" + type: compute + output: "output-topic" + configuration: + fields: + - name: "value" + type: STRING + expression: "value.query_result.document.content" \ No newline at end of file diff --git a/examples/applications/query-opensearch/write.yaml b/examples/applications/query-opensearch/write.yaml new file mode 100644 index 000000000..f9ddd819c --- /dev/null +++ b/examples/applications/query-opensearch/write.yaml @@ -0,0 +1,74 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Index Products on Vector Database" +topics: + - name: "vectors-topic" + creation-mode: create-if-not-exists +assets: + - name: "os-index" + asset-type: "opensearch-index" + creation-mode: create-if-not-exists + config: + index-name: "langstream-index" + datasource: "opensearch-datasource" + settings: | + { + "index": { + "knn": true, + "knn.algo_param.ef_search": 100 + } + } + mappings: | + { + "properties": { + "content": { + "type": "text" + }, + "embeddings": { + "type": "knn_vector", + "dimension": 1536 + } + } + } +errors: + on-failure: skip +pipeline: + - name: "convert-to-json" + type: "document-to-json" + input: "vectors-topic" + configuration: + text-field: "document" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model + embeddings-field: "value.embeddings" + text: "{{ value.document }}" + batch-size: 10 + flush-interval: 500 + - name: "Write to vector db" + type: "vector-db-sink" + configuration: + datasource: "opensearch-datasource" + index-name: langstream-index + bulk-parameters: + refresh: "wait_for" + fields: + - name: "embeddings" + expression: "value.embeddings" + - name: "content" + expression: "value.document" diff --git a/examples/secrets/secrets.yaml b/examples/secrets/secrets.yaml index 15e13a3c3..259a44d59 100644 --- a/examples/secrets/secrets.yaml +++ b/examples/secrets/secrets.yaml @@ -104,4 +104,13 @@ secrets: username: "${SOLR_USERNAME:-}" password: "${SOLR_PASSWORD:-}" host: "${SOLR_HOST:-localhost}" - port: "${SOLR_PORT:-8983}" \ No newline at end of file + port: "${SOLR_PORT:-8983}" + - name: opensearch + id: opensearch + data: + username: "${OPENSEARCH_USERNAME:-admin}" + password: "${OPENSEARCH_PASSWORD:-admin}" + host: "${OPENSEARCH_HOST:-localhost}" + port: "${OPENSEARCH_PORT:-9200}" + https: "${OPENSEARCH_HTTPS:-false}" + region: "${OPENSEARCH_REGION}" \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/pom.xml b/langstream-agents/langstream-vector-agents/pom.xml index 781890157..a7bf65719 100644 --- a/langstream-agents/langstream-vector-agents/pom.xml +++ b/langstream-agents/langstream-vector-agents/pom.xml @@ -30,6 +30,17 @@ 17 17 + + + + software.amazon.awssdk + bom + 2.20.162 + pom + import + + + ${project.groupId} @@ -76,6 +87,15 @@ solr-solrj 9.3.0 + + org.opensearch.client + opensearch-java + 2.6.0 + + + software.amazon.awssdk + opensearch + org.projectlombok lombok @@ -110,6 +130,12 @@ cassandra test + + org.opensearch + opensearch-testcontainers + 2.0.0 + test + diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchAssetsManagerProvider.java new file mode 100644 index 000000000..6008ccd5a --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchAssetsManagerProvider.java @@ -0,0 +1,178 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.opensearch; + +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.assets.AssetManagerProvider; +import ai.langstream.api.util.ConfigurationUtils; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.mapping.TypeMapping; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.CreateIndexResponse; +import org.opensearch.client.opensearch.indices.DeleteIndexRequest; +import org.opensearch.client.opensearch.indices.DeleteIndexResponse; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.IndexSettings; +import org.opensearch.client.util.MissingRequiredPropertyException; + +@Slf4j +public class OpenSearchAssetsManagerProvider implements AssetManagerProvider { + + @Override + public boolean supports(String assetType) { + return "opensearch-index".equals(assetType); + } + + @Override + public AssetManager createInstance(String assetType) { + + switch (assetType) { + case "opensearch-index": + return new OpenSearchIndexAssetManager(); + default: + throw new IllegalArgumentException(); + } + } + + private static class OpenSearchIndexAssetManager implements AssetManager { + + OpenSearchDataSource.OpenSearchQueryStepDataSource datasource; + + AssetDefinition assetDefinition; + + @Override + public void initialize(AssetDefinition assetDefinition) throws Exception { + this.datasource = buildDataSource(assetDefinition); + this.assetDefinition = assetDefinition; + } + + @Override + public boolean assetExists() throws Exception { + String indexName = getIndexName(); + return datasource + .getClient() + .indices() + .exists(new ExistsRequest.Builder().index(indexName).build()) + .value(); + } + + private String getIndexName() { + String tableName = + ConfigurationUtils.getString("index-name", null, assetDefinition.getConfig()); + return tableName; + } + + @Override + public void deployAsset() throws Exception { + final String index = getIndexName(); + + final CreateIndexRequest.Builder builder = + new CreateIndexRequest.Builder().index(index); + String settingsJson = + ConfigurationUtils.getString("settings", null, assetDefinition.getConfig()); + String mappingsJson = + ConfigurationUtils.getString("mappings", null, assetDefinition.getConfig()); + log.info( + "Creating index {} with settings {} and mappings {}", + index, + settingsJson, + mappingsJson); + if (settingsJson != null && !settingsJson.isBlank()) { + try { + final IndexSettings indexSettings = + OpenSearchDataSource.parseOpenSearchRequestBodyJson( + settingsJson, IndexSettings._DESERIALIZER); + builder.settings(indexSettings); + } catch (MissingRequiredPropertyException exception) { + throw new IllegalArgumentException( + "Invalid settings json value " + settingsJson, exception); + } + } + + if (mappingsJson != null && !mappingsJson.isBlank()) { + try { + final TypeMapping typeMapping = + OpenSearchDataSource.parseOpenSearchRequestBodyJson( + mappingsJson, TypeMapping._DESERIALIZER); + builder.mappings(typeMapping); + } catch (MissingRequiredPropertyException exception) { + throw new IllegalArgumentException( + "Invalid mappings json value :" + mappingsJson, exception); + } + } + final CreateIndexRequest request = builder.build(); + log.info("Creating index {} ", index); + + final CreateIndexResponse response = datasource.getClient().indices().create(request); + if (response.acknowledged() != null && response.acknowledged()) { + log.info("Created index {}", index); + } else { + log.error("Failed to create index {}", index); + throw new RuntimeException( + "Failed to create index " + index + " ack=" + response.acknowledged()); + } + } + + @Override + public boolean deleteAssetIfExists() throws Exception { + try { + final DeleteIndexResponse delete = + datasource + .getClient() + .indices() + .delete( + new DeleteIndexRequest.Builder() + .index(getIndexName()) + .build()); + if (delete.acknowledged()) { + log.info("Deleted index {}", getIndexName()); + return true; + } else { + log.error("Failed to delete index {}", getIndexName()); + return false; + } + } catch (OpenSearchException exception) { + if ("index_not_found_exception".equals(exception.error().type())) { + return false; + } + throw new RuntimeException(exception); + } + } + + @Override + public void close() throws Exception { + if (datasource != null) { + datasource.close(); + } + } + } + + private static OpenSearchDataSource.OpenSearchQueryStepDataSource buildDataSource( + AssetDefinition assetDefinition) { + OpenSearchDataSource dataSource = new OpenSearchDataSource(); + Map datasourceDefinition = + ConfigurationUtils.getMap("datasource", Map.of(), assetDefinition.getConfig()); + Map configuration = + ConfigurationUtils.getMap("configuration", Map.of(), datasourceDefinition); + OpenSearchDataSource.OpenSearchQueryStepDataSource result = + dataSource.createDataSourceImplementation(configuration); + result.initialize(null); + return result; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchDataSource.java new file mode 100644 index 000000000..74033abb8 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchDataSource.java @@ -0,0 +1,222 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.opensearch; + +import static ai.langstream.agents.vector.InterpolationUtils.buildObjectFromJson; + +import ai.langstream.ai.agents.datasource.DataSourceProvider; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Data; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.core5.http.HttpHost; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.aws.AwsSdk2Transport; +import org.opensearch.client.transport.aws.AwsSdk2TransportOptions; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.regions.Region; + +@Slf4j +public class OpenSearchDataSource implements DataSourceProvider { + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Override + public boolean supports(Map dataSourceConfig) { + return "opensearch".equals(dataSourceConfig.get("service")); + } + + @Data + public static final class OpenSearchConfig { + + private boolean https = true; + private String host; + private int port = 9200; + private String region; + // BASIC AUTH + private String username; + private String password; + } + + @Override + public OpenSearchQueryStepDataSource createDataSourceImplementation( + Map dataSourceConfig) { + + OpenSearchConfig clientConfig = + MAPPER.convertValue(dataSourceConfig, OpenSearchConfig.class); + + return new OpenSearchQueryStepDataSource(clientConfig); + } + + public static class OpenSearchQueryStepDataSource implements QueryStepDataSource { + + @Getter private final OpenSearchConfig clientConfig; + @Getter private OpenSearchClient client; + + public OpenSearchQueryStepDataSource(OpenSearchConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + @SneakyThrows + public void initialize(Map config) { + SdkHttpClient httpClient = new DefaultSdkHttpClientBuilder().build(); + + final String host = clientConfig.getHost(); + if (host == null) { + throw new IllegalArgumentException("Missing host"); + } + boolean useAwsSdk = host.endsWith("amazonaws.com"); + final OpenSearchTransport transport; + + if (useAwsSdk) { + final AwsBasicCredentials credentials = + AwsBasicCredentials.create( + clientConfig.getUsername(), clientConfig.getPassword()); + transport = + new AwsSdk2Transport( + httpClient, + host.replace("https://", ""), + "aoss", + Region.of(clientConfig.getRegion()), + AwsSdk2TransportOptions.builder() + .setCredentials( + StaticCredentialsProvider.create(credentials)) + .build()); + } else { + final HttpHost httpHost = + new HttpHost( + clientConfig.isHttps() ? "https" : "http", + host, + clientConfig.getPort()); + final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(httpHost), + new UsernamePasswordCredentials( + clientConfig.getUsername(), + clientConfig.getPassword().toCharArray())); + transport = + ApacheHttpClient5TransportBuilder.builder(httpHost) + .setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider( + credentialsProvider)) + .build(); + } + + this.client = new OpenSearchClient(transport); + log.info("Connecting to OpenSearch at {}", host); + } + + @Override + public List> fetchData(String query, List params) { + try { + final Map asMap = buildObjectFromJson(query, Map.class, params, OBJECT_MAPPER); + final SearchRequest searchRequest = + OpenSearchDataSource.parseOpenSearchRequestBodyJson( + asMap, SearchRequest._DESERIALIZER); + + final SearchResponse result = client.search(searchRequest, Map.class); + return result.hits().hits().stream() + .map( + h -> { + Map object = new HashMap<>(); + object.put("id", h.id()); + object.put("document", h.source()); + object.put("score", h.score()); + object.put("index", h.index()); + return object; + }) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (OpenSearchException e) { + final String causes = + e.error().rootCause().stream() + .map( + cause -> + "type: " + + cause.type() + + " reason: " + + cause.reason()) + .collect(Collectors.joining("\n")); + String errMessage = + "Error executing OpenSearch query: " + + e.getMessage() + + "\nRoot causes:\n" + + causes + + "\nQuery: " + + query; + log.error(errMessage, e); + throw new RuntimeException(errMessage, e); + } + } + + @Override + public void close() { + if (client != null) { + try { + client._transport().close(); + } catch (Exception e) { + log.warn("Error closing OpenSearch client", e); + } + } + } + } + + protected static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .configure(SerializationFeature.INDENT_OUTPUT, false) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + protected static final JacksonJsonpMapper JACKSON_JSONP_MAPPER = + new JacksonJsonpMapper(OBJECT_MAPPER); + + public static T parseOpenSearchRequestBodyJson( + String json, JsonpDeserializer deserializer) throws IOException { + return parseOpenSearchRequestBodyJson( + OBJECT_MAPPER.readValue(json, Map.class), deserializer); + } + + public static T parseOpenSearchRequestBodyJson( + Map asMap, JsonpDeserializer deserializer) { + return JsonData.of(asMap, JACKSON_JSONP_MAPPER).deserialize(deserializer); + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchWriter.java new file mode 100644 index 000000000..8a1e7e03a --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/opensearch/OpenSearchWriter.java @@ -0,0 +1,358 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.opensearch; + +import static ai.langstream.ai.agents.commons.TransformContext.recordToTransformContext; + +import ai.langstream.ai.agents.commons.TransformContext; +import ai.langstream.ai.agents.commons.jstl.JstlEvaluator; +import ai.langstream.api.database.VectorDatabaseWriter; +import ai.langstream.api.database.VectorDatabaseWriterProvider; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.util.ConfigurationUtils; +import ai.langstream.api.util.OrderedAsyncBatchExecutor; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.client.opensearch._types.Refresh; +import org.opensearch.client.opensearch._types.Time; +import org.opensearch.client.opensearch._types.WaitForActiveShardOptions; +import org.opensearch.client.opensearch._types.WaitForActiveShards; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; + +@Slf4j +public class OpenSearchWriter implements VectorDatabaseWriterProvider { + + @Override + public boolean supports(Map dataSourceConfig) { + return "opensearch".equals(dataSourceConfig.get("service")); + } + + @Override + public OpenSearchVectorDatabaseWriter createImplementation( + Map datasourceConfig) { + return new OpenSearchVectorDatabaseWriter(datasourceConfig); + } + + public static class OpenSearchVectorDatabaseWriter + implements VectorDatabaseWriter, AutoCloseable { + + protected static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .configure(SerializationFeature.INDENT_OUTPUT, false) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + + @Getter private final OpenSearchDataSource.OpenSearchQueryStepDataSource dataSource; + + private OrderedAsyncBatchExecutor batchExecutor; + private ScheduledExecutorService executorService; + + private String indexName; + private JstlEvaluator id; + private Map fields = new HashMap<>(); + private BulkParameters bulkParameters; + + public OpenSearchVectorDatabaseWriter(Map datasourceConfig) { + OpenSearchDataSource dataSourceProvider = new OpenSearchDataSource(); + dataSource = dataSourceProvider.createDataSourceImplementation(datasourceConfig); + } + + @Override + public void close() throws Exception { + dataSource.close(); + } + + @Data + public static class BulkParameters { + String pipeline; + + String refresh; + + @JsonProperty("require_alias") + Boolean requireAlias; + + String routing; + String timeout; + + @JsonProperty("wait_for_active_shards") + String waitForActiveShards; + } + + @Override + public void initialise(Map agentConfiguration) throws Exception { + dataSource.initialize(null); + indexName = (String) agentConfiguration.get("index-name"); + List> fields = + (List>) + agentConfiguration.getOrDefault("fields", List.of()); + fields.forEach( + field -> { + this.fields.put( + field.get("name").toString(), + buildEvaluator(field, "expression", Object.class)); + }); + id = buildEvaluator(agentConfiguration, "id", String.class); + bulkParameters = + OBJECT_MAPPER.convertValue( + ConfigurationUtils.getMap( + "bulk-parameters", Map.of(), agentConfiguration), + BulkParameters.class); + + final int flushInterval = + ConfigurationUtils.getInt("flush-interval", 1000, agentConfiguration); + final int batchSize = ConfigurationUtils.getInt("batch-size", 10, agentConfiguration); + this.executorService = + flushInterval > 0 ? Executors.newSingleThreadScheduledExecutor() : null; + + this.batchExecutor = + new OrderedAsyncBatchExecutor<>( + batchSize, + (records, completableFuture) -> { + try { + + List bulkOps = new ArrayList<>(); + + for (OpenSearchRecord record : records) { + boolean delete = record.document() == null; + final BulkOperation bulkOp; + if (!delete) { + log.info( + "indexing document {} with id {} on index {}", + record.document(), + record.id(), + indexName); + final IndexOperation request = + new IndexOperation.Builder<>() + .index(indexName) + .document(record.document()) + .id(record.id()) + .build(); + bulkOp = + new BulkOperation.Builder() + .index(request) + .build(); + } else { + log.info( + "deleting document with id {} on index {}", + record.id(), + indexName); + final DeleteOperation request = + new DeleteOperation.Builder() + .index(indexName) + .id(record.id()) + .build(); + bulkOp = + new BulkOperation.Builder() + .delete(request) + .build(); + } + bulkOps.add(bulkOp); + } + + final BulkRequest.Builder bulkBuilder = + new BulkRequest.Builder(); + bulkBuilder.pipeline(bulkParameters.getPipeline()); + bulkBuilder.refresh(getRefreshValue()); + bulkBuilder.requireAlias(bulkParameters.getRequireAlias()); + bulkBuilder.routing(bulkParameters.getRouting()); + if (bulkParameters.getTimeout() != null) { + bulkBuilder.timeout( + new Time.Builder() + .time(bulkParameters.getTimeout()) + .build()); + } + if (bulkParameters.getWaitForActiveShards() != null) { + final WaitForActiveShards value; + if (bulkParameters.getWaitForActiveShards().equals("all")) { + value = + new WaitForActiveShards.Builder() + .option(WaitForActiveShardOptions.All) + .build(); + } else { + value = + new WaitForActiveShards.Builder() + .count( + Integer.parseInt( + bulkParameters + .getWaitForActiveShards())) + .build(); + } + bulkBuilder.waitForActiveShards(value); + } + + final BulkRequest bulkRequest = + bulkBuilder + .index(indexName) + .operations(bulkOps) + .build(); + final BulkResponse response; + try { + response = dataSource.getClient().bulk(bulkRequest); + } catch (IOException e) { + log.error( + "Error indexing documents on index {}: {}", + indexName, + e.getMessage(), + e); + for (OpenSearchRecord record : records) { + record.completableFuture().completeExceptionally(e); + } + completableFuture.completeExceptionally(e); + return; + } + int itemIndex = 0; + boolean failures = false; + for (BulkResponseItem item : response.items()) { + if (item.error() != null) { + String errorString = + item.error().type() + + " - " + + item.error().reason(); + ; + log.error( + "Error indexing document {} on index {}: {}", + item.id(), + indexName, + errorString); + failures = true; + records.get(itemIndex++) + .completableFuture() + .completeExceptionally( + new RuntimeException( + "Error indexing document: " + + errorString)); + } else { + records.get(itemIndex++) + .completableFuture() + .complete(null); + } + } + if (!failures) { + log.info( + "Indexed {} documents on index {}", + records.size(), + indexName); + completableFuture.complete(null); + } else { + completableFuture.completeExceptionally( + new RuntimeException("Error indexing documents")); + } + } catch (Throwable e) { + log.error( + "Error indexing documents on index {}: {}", + indexName, + e.getMessage(), + e); + for (OpenSearchRecord record : records) { + record.completableFuture().completeExceptionally(e); + } + completableFuture.completeExceptionally(e); + } + }, + flushInterval, + 1, + (__) -> 0, + executorService); + batchExecutor.start(); + } + + private Refresh getRefreshValue() { + if (bulkParameters.getRefresh() == null) { + return null; + } + for (Refresh value : Refresh.values()) { + if (bulkParameters.getRefresh().equals(value.jsonValue())) { + return value; + } + } + throw new IllegalArgumentException( + "Invalid refresh value: " + bulkParameters.getRefresh()); + } + + @Override + public CompletableFuture upsert(Record record, Map context) { + + CompletableFuture handle = new CompletableFuture<>(); + try { + TransformContext transformContext = recordToTransformContext(record, true); + Map documentJson; + + if (record.value() != null) { + documentJson = new HashMap<>(); + fields.forEach( + (name, evaluator) -> { + Object value = evaluator.evaluate(transformContext); + if (log.isDebugEnabled()) { + log.debug( + "setting value {} ({}) for field {}", + value, + value.getClass(), + name); + } + documentJson.put(name, value); + }); + } else { + documentJson = null; + } + + final String documentId; + if (id == null) { + documentId = null; + } else { + final Object evaluate = id.evaluate(transformContext); + documentId = evaluate == null ? null : evaluate.toString(); + } + if (documentJson == null && documentId == null) { + log.info("skipping null document and id, was record: {}", record); + return CompletableFuture.completedFuture(null); + } + batchExecutor.add(new OpenSearchRecord(documentId, documentJson, handle)); + } catch (Exception e) { + handle.completeExceptionally(e); + } + return handle; + } + + record OpenSearchRecord( + String id, Map document, CompletableFuture completableFuture) {} + } + + private static JstlEvaluator buildEvaluator( + Map agentConfiguration, String param, Class type) { + String expression = agentConfiguration.getOrDefault(param, "").toString(); + if (expression == null || expression.isEmpty()) { + return null; + } + return new JstlEvaluator("${" + expression + "}", type); + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index index 6971522d1..935c45a6a 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index @@ -3,4 +3,5 @@ cassandra-keyspace astra-keyspace milvus-collection jdbc-table -solr-collection \ No newline at end of file +solr-collection +opensearch-index \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider index cfd9291b9..81ead541b 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider @@ -1,3 +1,4 @@ ai.langstream.agents.vector.pinecone.PineconeDataSource ai.langstream.agents.vector.milvus.MilvusDataSource -ai.langstream.agents.vector.solr.SolrDataSource \ No newline at end of file +ai.langstream.agents.vector.solr.SolrDataSource +ai.langstream.agents.vector.opensearch.OpenSearchDataSource \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider index 0cc9347aa..f6c27bb80 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider @@ -2,4 +2,5 @@ ai.langstream.agents.vector.pinecone.PineconeWriter ai.langstream.agents.vector.cassandra.CassandraWriter ai.langstream.agents.vector.milvus.MilvusWriter ai.langstream.agents.vector.jdbc.JdbcWriter -ai.langstream.agents.vector.solr.SolrWriter \ No newline at end of file +ai.langstream.agents.vector.solr.SolrWriter +ai.langstream.agents.vector.opensearch.OpenSearchWriter \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider index 615c27571..45925c4fa 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider @@ -1,4 +1,5 @@ ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider ai.langstream.agents.vector.milvus.MilvusAssetsManagerProvider ai.langstream.agents.vector.jdbc.JdbcAssetsManagerProvider -ai.langstream.agents.vector.solr.SolrAssetsManagerProvider \ No newline at end of file +ai.langstream.agents.vector.solr.SolrAssetsManagerProvider +ai.langstream.agents.vector.opensearch.OpenSearchAssetsManagerProvider \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/OpenSearchDataSourceTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/OpenSearchDataSourceTest.java new file mode 100644 index 000000000..d3bdfcde7 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/OpenSearchDataSourceTest.java @@ -0,0 +1,302 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.datasource.impl; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.TestCase.assertTrue; + +import ai.langstream.agents.vector.opensearch.OpenSearchAssetsManagerProvider; +import ai.langstream.agents.vector.opensearch.OpenSearchWriter; +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.code.SimpleRecord; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.opensearch.testcontainers.OpensearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +@Testcontainers +class OpenSearchDataSourceTest { + + @Container + static OpensearchContainer OPENSEARCH = + new OpensearchContainer(DockerImageName.parse("opensearchproject/opensearch:2")) + .withEnv("discovery.type", "single-node"); + + private static Map getDatasourceConfig() { + return Map.of( + "https", + false, + "port", + OPENSEARCH.getMappedPort(9200), + "host", + "localhost", + "username", + "admin", + "password", + "admin"); + } + + @Test + void testAsset() throws Exception { + final Map assetConfig = + Map.of( + "index-name", + "test-index", + "settings", + """ + { + "index": { + "number_of_shards": 2, + "number_of_replicas": 1 + } + } + """, + "mappings", + """ + { + "properties": { + "age": { + "type": "integer" + } + } + } + """, + "datasource", + Map.of("configuration", getDatasourceConfig())); + final AssetManager instance = createAssetManager(assetConfig); + + instance.deployAsset(); + assertTrue(instance.assetExists()); + instance.deleteAssetIfExists(); + } + + private AssetManager createAssetManager(Map config) throws Exception { + final AssetManager instance = + new OpenSearchAssetsManagerProvider().createInstance("opensearch-index"); + final AssetDefinition def = new AssetDefinition(); + def.setConfig(config); + instance.initialize(def); + return instance; + } + + @Test + void testWriteBasicData() throws Exception { + final String indexName = "test-index-000"; + final Map assetConfig = + Map.of( + "index-name", + indexName, + "datasource", + Map.of("configuration", getDatasourceConfig())); + createAssetManager(assetConfig).deleteAssetIfExists(); + createAssetManager(assetConfig).deployAsset(); + try (final OpenSearchWriter.OpenSearchVectorDatabaseWriter writer = + new OpenSearchWriter().createImplementation(getDatasourceConfig()); ) { + + writer.initialise( + Map.of( + "index-name", + indexName, + "fields", + List.of( + Map.of("name", "text1", "expression", "value.text"), + Map.of("name", "chunk_id", "expression", "key.chunk_id")), + "id", + "key.name", + "bulk-parameters", + Map.of("refresh", "true"))); + SimpleRecord record = + SimpleRecord.of( + "{\"name\": \"{\\\"myid\\\":\\\"xx\\\"}\", \"chunk_id\": 1}", + """ + { + "text": "Lorem ipsum..." + } + """); + + writer.upsert(record, Map.of()).get(); + + List> result = + writer.getDataSource() + .fetchData( + """ + { + "index": "test-index-000", + "query": { + "match": { + "text1": "Lorem ipsum..." + } + + } + } + """, + List.of()); + log.info("result: {}", result); + + assertEquals(1, result.size()); + final String id = "{\"myid\":\"xx\"}"; + assertEquals(id, result.get(0).get("id")); + assertEquals(1, ((Map) result.get(0).get("document")).get("chunk_id")); + assertEquals("test-index-000", result.get(0).get("index")); + assertTrue(((Number) result.get(0).get("score")).floatValue() > 0.0f); + + result = + writer.getDataSource() + .fetchData( + """ + { + "index": "test-index-000", + "query": { + "ids": { + "values": [?] + } + + } + } + """, + List.of(id)); + assertEquals(1, result.size()); + writer.upsert( + SimpleRecord.of( + "{\"name\": \"{\\\"myid\\\":\\\"xx\\\"}\", \"chunk_id\": 1}", + null), + Map.of()) + .get(); + + result = + writer.getDataSource() + .fetchData( + """ + { + "index": "test-index-000", + "query": { + "ids": { + "values": [?] + } + + } + } + """, + List.of(id)); + + assertEquals(0, result.size()); + } + } + + @Test + void testWriteVectors() throws Exception { + final String indexName = "test-index-000"; + final Map assetConfig = + Map.of( + "index-name", + indexName, + "settings", + """ + { + "index": { + "knn": true, + "knn.algo_param.ef_search": 100 + } + } + """, + "mappings", + """ + { + "properties": { + "my_vector1": { + "type": "knn_vector", + "dimension": 3, + "method": { + "name": "hnsw", + "space_type": "l2", + "engine": "lucene", + "parameters": { + "ef_construction": 128, + "m": 24 + } + } + } + } + } + + """, + "datasource", + Map.of("configuration", getDatasourceConfig())); + createAssetManager(assetConfig).deleteAssetIfExists(); + createAssetManager(assetConfig).deployAsset(); + try (final OpenSearchWriter.OpenSearchVectorDatabaseWriter writer = + new OpenSearchWriter().createImplementation(getDatasourceConfig()); ) { + + writer.initialise( + Map.of( + "index-name", + indexName, + "fields", + List.of( + Map.of("name", "my_vector1", "expression", "value.embeddings"), + Map.of("name", "original", "expression", "value.data")), + "bulk-parameters", + Map.of("refresh", "true"))); + SimpleRecord record = + SimpleRecord.of( + null, + """ + { + "data": "This is a question", + "embeddings": [1.0, 2.0, 3.0] + } + """); + + writer.upsert(record, Map.of()).get(); + + final List> result = + writer.getDataSource() + .fetchData( + """ + { + "size": 2, + "index": "test-index-000", + "query": { + "knn": { + "my_vector1": { + "vector": [26.2, -120.0, 99.1], + "k": 2 + } + } + } + } + """, + List.of()); + log.info("result: {}", result); + + assertEquals(1, result.size()); + assertNotNull(result.get(0).get("id")); + assertEquals( + "This is a question", ((Map) result.get(0).get("document")).get("original")); + assertEquals( + List.of(1.0d, 2.0d, 3.0d), + ((Map) result.get(0).get("document")).get("my_vector1")); + assertEquals("test-index-000", result.get(0).get("index")); + } + } +} diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentSink.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentSink.java index ee5caa824..96557a7f5 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentSink.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentSink.java @@ -22,7 +22,7 @@ public interface AgentSink extends AgentCode { /** - * The agent processes records and typically writes then to an external service. + * The agent processes records and typically writes to an external service. * * @param record the record to write * @throws Exception if the agent fails to process the records diff --git a/langstream-api/src/main/java/ai/langstream/api/util/OrderedAsyncBatchExecutor.java b/langstream-api/src/main/java/ai/langstream/api/util/OrderedAsyncBatchExecutor.java index ba476990c..d724e9ee8 100644 --- a/langstream-api/src/main/java/ai/langstream/api/util/OrderedAsyncBatchExecutor.java +++ b/langstream-api/src/main/java/ai/langstream/api/util/OrderedAsyncBatchExecutor.java @@ -92,6 +92,9 @@ private void flush() { } private Bucket bucket(int hash) { + if (numBuckets == 1) { + return buckets[0]; + } return buckets[Math.abs(hash % numBuckets)]; } diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index 12293658b..3d17a6155 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -32,6 +32,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import lombok.SneakyThrows; import org.apache.commons.io.input.Tailer; @@ -341,13 +342,19 @@ private void executeOnDocker( .redirectErrorStream(true) .redirectOutput(outputLog.toFile()); Process process = processBuilder.start(); - Executors.newSingleThreadExecutor().execute(() -> tailLogSysOut(outputLog)); + CompletableFuture.runAsync( + () -> tailLogSysOut(outputLog), Executors.newSingleThreadExecutor()); if (startUI) { Executors.newSingleThreadExecutor() .execute(() -> startUI(tenant, applicationId, outputLog, process)); } - process.waitFor(); + final int exited = process.waitFor(); + if (exited != 0) { + // wait for the log to be printed + Thread.sleep(1000); + throw new RuntimeException("Process exited with code " + exited); + } } private void startUI(String tenant, String applicationId, Path outputLog, Process process) { diff --git a/langstream-core/src/main/java/ai/langstream/impl/assets/OpenSearchAssetsProvider.java b/langstream-core/src/main/java/ai/langstream/impl/assets/OpenSearchAssetsProvider.java new file mode 100644 index 000000000..e006692b5 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/assets/OpenSearchAssetsProvider.java @@ -0,0 +1,81 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.assets; + +import ai.langstream.api.doc.AssetConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.impl.common.AbstractAssetProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Set; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class OpenSearchAssetsProvider extends AbstractAssetProvider { + + public OpenSearchAssetsProvider() { + super(Set.of("opensearch-index")); + } + + @Override + protected Class getAssetConfigModelClass(String type) { + return TableConfig.class; + } + + @Override + protected boolean lookupResource(String fieldName) { + return "datasource".equals(fieldName); + } + + @AssetConfig( + name = "OpenSearch index", + description = """ + Manage OpenSearch index. + """) + @Data + public static class TableConfig { + + @ConfigProperty( + description = + """ + Reference to a datasource id configured in the application. + """, + required = true) + private String datasource; + + @ConfigProperty( + description = """ + Index name. + """, + required = true) + @JsonProperty("index-name") + private String indexName; + + @ConfigProperty( + description = + """ + JSON containing index mappings configuration. + """) + private String mappings; + + @ConfigProperty( + description = + """ + JSON containing index settings configuration. + """) + private String settings; + } +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java index e6e186e44..465804782 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java +++ b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java @@ -121,7 +121,7 @@ private void setupAsset(AssetDefinition asset, AssetManagerRegistry assetManager if (!exists) { log.info( - "Asset {} of type {} needs to be created", + "Asset {} of type {} needs to be created", asset.getId(), asset.getAssetType()); assetManagerImpl.deployAsset(); diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java index 3ff5636ad..20d9aa633 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java @@ -18,6 +18,7 @@ import ai.langstream.impl.resources.datasource.AstraDatasourceConfig; import ai.langstream.impl.resources.datasource.CassandraDatasourceConfig; import ai.langstream.impl.resources.datasource.JDBCDatasourceConfig; +import ai.langstream.impl.resources.datasource.OpenSearchDatasourceConfig; import java.util.Map; public class DataSourceResourceProvider extends BaseDataSourceResourceProvider { @@ -25,6 +26,7 @@ public class DataSourceResourceProvider extends BaseDataSourceResourceProvider { protected static final String SERVICE_ASTRA = "astra"; protected static final String SERVICE_CASSANDRA = "cassandra"; protected static final String SERVICE_JDBC = "jdbc"; + protected static final String SERVICE_OPENSEARCH = "opensearch"; public DataSourceResourceProvider() { super( @@ -32,6 +34,7 @@ public DataSourceResourceProvider() { Map.of( SERVICE_ASTRA, AstraDatasourceConfig.CONFIG, SERVICE_CASSANDRA, CassandraDatasourceConfig.CONFIG, - SERVICE_JDBC, JDBCDatasourceConfig.CONFIG)); + SERVICE_JDBC, JDBCDatasourceConfig.CONFIG, + SERVICE_OPENSEARCH, OpenSearchDatasourceConfig.CONFIG)); } } diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java index 135561387..77b58d508 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java @@ -18,6 +18,7 @@ import ai.langstream.impl.resources.datasource.AstraDatasourceConfig; import ai.langstream.impl.resources.datasource.CassandraDatasourceConfig; import ai.langstream.impl.resources.datasource.MilvusDatasourceConfig; +import ai.langstream.impl.resources.datasource.OpenSearchDatasourceConfig; import ai.langstream.impl.resources.datasource.PineconeDatasourceConfig; import ai.langstream.impl.resources.datasource.SolrDatasourceConfig; import java.util.Map; @@ -32,6 +33,7 @@ public VectorDatabaseResourceProvider() { "cassandra", CassandraDatasourceConfig.CONFIG, "pinecone", PineconeDatasourceConfig.CONFIG, "milvus", MilvusDatasourceConfig.CONFIG, - "solr", SolrDatasourceConfig.CONFIG)); + "solr", SolrDatasourceConfig.CONFIG, + "opensearch", OpenSearchDatasourceConfig.CONFIG)); } } diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/OpenSearchDatasourceConfig.java b/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/OpenSearchDatasourceConfig.java new file mode 100644 index 000000000..35e6701a3 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/OpenSearchDatasourceConfig.java @@ -0,0 +1,96 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.resources.datasource; + +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.doc.ResourceConfig; +import ai.langstream.api.model.Resource; +import ai.langstream.impl.resources.BaseDataSourceResourceProvider; +import ai.langstream.impl.uti.ClassConfigValidator; +import lombok.Data; + +@Data +@ResourceConfig( + name = "OpenSearch", + description = "Connect to OpenSearch service or AWS OpenSearch Service/Serverless.") +public class OpenSearchDatasourceConfig extends BaseDatasourceConfig { + + public static final BaseDataSourceResourceProvider.DatasourceConfig CONFIG = + new BaseDataSourceResourceProvider.DatasourceConfig() { + + @Override + public Class getResourceConfigModelClass() { + return OpenSearchDatasourceConfig.class; + } + + @Override + public void validate(Resource resource) { + ClassConfigValidator.validateResourceModelFromClass( + resource, + OpenSearchDatasourceConfig.class, + resource.configuration(), + false); + } + }; + + @ConfigProperty( + description = + """ + Whether to use https or not. + """, + defaultValue = "true") + private boolean https = true; + + @ConfigProperty( + description = + """ + Host parameter for connecting to OpenSearch. + Valid both for OpenSearch and AWS OpenSearch Service/Serverless. + """, + required = true) + private String host; + + @ConfigProperty( + description = + """ + Port parameter for connecting to OpenSearch service. + """, + defaultValue = "9200") + private int port; + + @ConfigProperty( + description = + """ + Region parameter for connecting to AWS OpenSearch Service/Serveless. + """) + private String region; + + @ConfigProperty( + description = + """ + Basic authentication for connecting to OpenSearch. + In case of AWS OpenSearch Service/Serverless, this is the access key. + """) + private String username; + + @ConfigProperty( + description = + """ + Basic authentication for connecting to OpenSearch. + In case of AWS OpenSearch Service/Serverless, this is the secret key. + """) + private String password; +} diff --git a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider index b77b64d6f..f47198f79 100644 --- a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider +++ b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider @@ -1,4 +1,5 @@ ai.langstream.impl.assets.CassandraAssetsProvider ai.langstream.impl.assets.MilvusAssetsProvider ai.langstream.impl.assets.JdbcAssetsProvider -ai.langstream.impl.assets.SolrAssetsProvider \ No newline at end of file +ai.langstream.impl.assets.SolrAssetsProvider +ai.langstream.impl.assets.OpenSearchAssetsProvider \ No newline at end of file diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml index e1b37c2c5..fa422a55d 100644 --- a/langstream-runtime/langstream-runtime-impl/pom.xml +++ b/langstream-runtime/langstream-runtime-impl/pom.xml @@ -203,6 +203,12 @@ pulsar test + + org.opensearch + opensearch-testcontainers + 2.0.0 + test + io.fabric8 kubernetes-server-mock diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java new file mode 100644 index 000000000..744ff9344 --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java @@ -0,0 +1,163 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.kafka; + +import ai.langstream.AbstractApplicationRunner; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.junit.jupiter.api.Test; +import org.opensearch.testcontainers.OpensearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +@Testcontainers +class OpenSearchVectorIT extends AbstractApplicationRunner { + @Container + static OpensearchContainer OPENSEARCH = + new OpensearchContainer(DockerImageName.parse("opensearchproject/opensearch:2")) + .withEnv("discovery.type", "single-node"); + + @Test + public void test() throws Exception { + Map application = + Map.of( + "configuration.yaml", + """ + configuration: + resources: + - type: "vector-database" + name: "OSDatasource" + configuration: + service: "opensearch" + https: false + port: %d + host: "localhost" + username: "admin" + password: "admin" + """ + .formatted(OPENSEARCH.getMappedPort(9200)), + "pipeline-write.yaml", + """ + topics: + - name: "insert-topic" + creation-mode: create-if-not-exists + assets: + - name: "os-index" + asset-type: "opensearch-index" + creation-mode: create-if-not-exists + config: + index-name: "my-index-1" + datasource: "OSDatasource" + settings: | + { + "index": { + "knn": true, + "knn.algo_param.ef_search": 100 + } + } + mappings: | + { + "properties": { + "content": { + "type": "text" + }, + "embeddings": { + "type": "knn_vector", + "dimension": 3 + } + } + } + pipeline: + - id: write + name: "Write" + type: "vector-db-sink" + input: "insert-topic" + configuration: + datasource: "OSDatasource" + index-name: "my-index-1" + bulk-parameters: + refresh: "true" + id: "key" + fields: + - name: "content" + expression: "value.content" + - name: "embeddings" + expression: "value.embeddings" + """, + "pipeline-read.yaml", + """ + topics: + - name: "input-topic" + creation-mode: create-if-not-exists + - name: "result-topic" + creation-mode: create-if-not-exists + pipeline: + - id: read + name: "read" + type: "query-vector-db" + input: "input-topic" + output: "result-topic" + configuration: + datasource: "OSDatasource" + query: | + { + "query": { + "knn": { + "embeddings": { + "vector": ?, + "k": 1 + } + } + } + } + fields: + - "value.embeddings" + output-field: "value.query-result" + """); + + String[] expectedAgents = new String[] {"app-write", "app-read"}; + try (ApplicationRuntime applicationRuntime = + deployApplication( + "tenant", "app", application, buildInstanceYaml(), expectedAgents)) { + try (KafkaProducer producer = createProducer(); + KafkaConsumer consumer = createConsumer("result-topic")) { + + for (int i = 0; i < 10; i++) { + sendMessage( + "insert-topic", + "key" + i, + "{\"content\": \"hello" + i + "\", \"embeddings\":[999,999," + i + "]}", + List.of(), + producer); + } + sendMessage("input-topic", "{\"embeddings\":[999,999,5]}", producer); + + executeAgentRunners(applicationRuntime); + waitForMessages( + consumer, + List.of( + "{\"embeddings\":[999,999,5],\"query-result\":[{\"score\":1.0," + + "\"document\":{\"embeddings\":[999,999,5],\"content\":\"hello5\"}," + + "\"index\":\"my-index-1\",\"id\":\"key5\"}]}")); + } + } + } +}