From 23f9b6d3554359bd8780fb324c995f0539606713 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 23 Nov 2023 15:30:14 +0100 Subject: [PATCH] [agents] Add support for AstraDB Collections (Astra Vector DB, using Stargate) (#731) --- .../chatbot-rag-memory/crawler.yaml | 1 - examples/applications/flare/crawler.yaml | 1 - .../applications/langchain-chat/crawler.yaml | 1 - .../applications/query-milvus/crawler.yaml | 1 - examples/applications/query-solr/crawler.yaml | 1 - .../webcrawler-astra-vector-db/.gitignore | 1 + .../webcrawler-astra-vector-db/README.md | 45 + .../webcrawler-astra-vector-db/chatbot.yaml | 106 +++ .../configuration.yaml | 34 + .../webcrawler-astra-vector-db/crawler.yaml | 93 ++ .../webcrawler-astra-vector-db/gateways.yaml | 43 + .../write-to-astra.yaml | 94 +++ .../webcrawler-source/crawler.yaml | 1 - .../webcrawler-source/write-to-astra.yaml | 2 +- examples/secrets/secrets.yaml | 4 + .../ai/agents/commons/jstl/JstlFunctions.java | 82 +- .../langstream-agents-flow-control/pom.xml | 4 + .../agents/flow/LogEventProcessor.java | 24 +- .../agents/flow/FlowControlAgentsTest.java | 32 + .../langstream-ai-agents/pom.xml | 27 +- ....java => CassandraDataSourceProvider.java} | 2 +- .../oss/streaming/ai/ComputeStep.java | 54 +- .../datastax/oss/streaming/ai/QueryStep.java | 8 + .../ai/datasource/CassandraDataSource.java | 21 +- ...am.ai.agents.datasource.DataSourceProvider | 2 +- .../streaming/ai/jstl/JstlFunctionsTest.java | 45 + .../agents/vector/QueryVectorDBAgent.java | 5 +- .../AstraVectorDBAssetsManagerProvider.java | 118 +++ .../vector/astra/AstraVectorDBDataSource.java | 296 +++++++ .../AstraVectorDBDataSourceProvider.java | 37 + .../vector/astra/AstraVectorDBWriter.java | 191 +++++ .../CassandraAssetsManagerProvider.java | 8 +- .../vector/cassandra/CassandraWriter.java | 4 +- .../META-INF/ai.langstream.assets.index | 3 +- ...am.ai.agents.datasource.DataSourceProvider | 3 +- ....api.database.VectorDatabaseWriterProvider | 3 +- ...eam.api.runner.assets.AssetManagerProvider | 3 +- .../datasource/impl/AstraVectorDBTest.java | 417 +++++++++ .../api/database/VectorDatabaseWriter.java | 2 +- .../assets/AstraVectorDBAssetsProvider.java | 98 +++ .../impl/assets/CassandraAssetsProvider.java | 13 +- .../resources/DataSourceResourceProvider.java | 6 +- .../VectorDatabaseResourceProvider.java | 2 + .../datasource/AstraDatasourceConfig.java | 3 +- .../AstraVectorDBDatasourceConfig.java | 62 ++ ...i.langstream.api.runtime.AssetNodeProvider | 3 +- .../k8s/agents/FlowControlAgentsProvider.java | 7 + .../agents/QueryVectorDBAgentProvider.java | 4 +- ...traVectorDBVectorDatabaseWriterConfig.java | 70 ++ .../QueryVectorDBAgentProviderTest.java | 793 +++++++++--------- .../kafka/AstraVectorDBAssetQueryWriteIT.java | 134 +++ 51 files changed, 2563 insertions(+), 451 deletions(-) create mode 100644 examples/applications/webcrawler-astra-vector-db/.gitignore create mode 100644 examples/applications/webcrawler-astra-vector-db/README.md create mode 100644 examples/applications/webcrawler-astra-vector-db/chatbot.yaml create mode 100644 examples/applications/webcrawler-astra-vector-db/configuration.yaml create mode 100644 examples/applications/webcrawler-astra-vector-db/crawler.yaml create mode 100644 examples/applications/webcrawler-astra-vector-db/gateways.yaml create mode 100644 examples/applications/webcrawler-astra-vector-db/write-to-astra.yaml rename langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/{AstraDataSource.java => CassandraDataSourceProvider.java} (94%) create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBAssetsManagerProvider.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSource.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSourceProvider.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBWriter.java create mode 100644 langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/AstraVectorDBTest.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/assets/AstraVectorDBAssetsProvider.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraVectorDBDatasourceConfig.java create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/AstraVectorDBVectorDatabaseWriterConfig.java create mode 100644 langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraVectorDBAssetQueryWriteIT.java diff --git a/examples/applications/chatbot-rag-memory/crawler.yaml b/examples/applications/chatbot-rag-memory/crawler.yaml index 8e1641ca1..492508586 100644 --- a/examples/applications/chatbot-rag-memory/crawler.yaml +++ b/examples/applications/chatbot-rag-memory/crawler.yaml @@ -46,7 +46,6 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/examples/applications/flare/crawler.yaml b/examples/applications/flare/crawler.yaml index 158671b59..6fa1b9c4c 100644 --- a/examples/applications/flare/crawler.yaml +++ b/examples/applications/flare/crawler.yaml @@ -70,7 +70,6 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en", "fr"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/examples/applications/langchain-chat/crawler.yaml b/examples/applications/langchain-chat/crawler.yaml index d2685fbae..c9bc82de1 100644 --- a/examples/applications/langchain-chat/crawler.yaml +++ b/examples/applications/langchain-chat/crawler.yaml @@ -50,7 +50,6 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en", "fr"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/examples/applications/query-milvus/crawler.yaml b/examples/applications/query-milvus/crawler.yaml index 1729f3b74..decf5206b 100644 --- a/examples/applications/query-milvus/crawler.yaml +++ b/examples/applications/query-milvus/crawler.yaml @@ -106,7 +106,6 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en", "fr"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/examples/applications/query-solr/crawler.yaml b/examples/applications/query-solr/crawler.yaml index 687436bc0..94c5cee20 100644 --- a/examples/applications/query-solr/crawler.yaml +++ b/examples/applications/query-solr/crawler.yaml @@ -102,7 +102,6 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en", "fr"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/examples/applications/webcrawler-astra-vector-db/.gitignore b/examples/applications/webcrawler-astra-vector-db/.gitignore new file mode 100644 index 000000000..55dea2dd3 --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/.gitignore @@ -0,0 +1 @@ +java/lib/* \ No newline at end of file diff --git a/examples/applications/webcrawler-astra-vector-db/README.md b/examples/applications/webcrawler-astra-vector-db/README.md new file mode 100644 index 000000000..8c8836f35 --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/README.md @@ -0,0 +1,45 @@ +# Indexing a WebSite + +This sample application shows how to use the WebCrawler Source Connector and Astra Vector DB. + +## Collections + +This application creates a collection named "documents" in your DB. +You can change the name of the collection in the file `configuration.yaml`. + +## Configure access to the Vector Database + +Export some ENV variables in order to configure access to the database: + +``` +export ASTRA_VECTOR_DB_TOKEN=AstraCS:... +export ASTRA_VECTOR_DB_ENDPOINT=https://....astra.datastax.com +``` + +You can find the credentials in the Astra DB console. + +The examples/secrets/secrets.yaml resolves those environment variables for you. +When you go in production you are supposed to create a dedicated secrets.yaml file for each environment. + +## Configure the pipeline + +You can edit the file `crawler.yaml` and configure the list of the allowed web domains, this is required in order to not let the crawler escape outside your data. +Configure the list of seed URLs, for instance with your home page. + +The default configuration in this example will crawl the LangStream website. + +## Deploy the LangStream application + +``` +./bin/langstream docker run test -app examples/applications/webcrawler-astra-vector-db -s examples/secrets/secrets.yaml +``` + +## Talk with the Chat bot + +If the application launches successfully, you can talk with the chat bot using UI. + +You can also use the CLI: + +``` +./bin/langstream gateway chat test -cg bot-output -pg user-input -p sessionId=$(uuidgen) +``` diff --git a/examples/applications/webcrawler-astra-vector-db/chatbot.yaml b/examples/applications/webcrawler-astra-vector-db/chatbot.yaml new file mode 100644 index 000000000..96921c0fc --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/chatbot.yaml @@ -0,0 +1,106 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "questions-topic" + creation-mode: create-if-not-exists + - name: "answers-topic" + creation-mode: create-if-not-exists + - name: "log-topic" + creation-mode: create-if-not-exists +errors: + on-failure: "skip" +pipeline: + - name: "convert-to-structure" + type: "document-to-json" + input: "questions-topic" + configuration: + text-field: "question" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model + embeddings-field: "value.question_embeddings" + text: "{{ value.question }}" + flush-interval: 0 + - name: "lookup-related-documents" + type: "query-vector-db" + configuration: + datasource: "AstraDatasource" + query: | + { + "collection-name": "${globals.collection-name}", + "limit": 20, + "vector": ? + } + fields: + - "value.question_embeddings" + output-field: "value.related_documents" + - name: "re-rank documents with MMR" + type: "re-rank" + configuration: + max: 5 # keep only the top 5 documents, because we have an hard limit on the prompt size + field: "value.related_documents" + query-text: "value.question" + query-embeddings: "value.question_embeddings" + output-field: "value.related_documents" + text-field: "record.text" + embeddings-field: "record.vector" + algorithm: "MMR" + lambda: 0.5 + k1: 1.2 + b: 0.75 + - name: "ai-chat-completions" + type: "ai-chat-completions" + configuration: + model: "${secrets.open-ai.chat-completions-model}" # This needs to be set to the model deployment name, not the base name + # on the log-topic we add a field with the answer + completion-field: "value.answer" + # we are also logging the prompt we sent to the LLM + log-field: "value.prompt" + # here we configure the streaming behavior + # as soon as the LLM answers with a chunk we send it to the answers-topic + stream-to-topic: "answers-topic" + # on the streaming answer we send the answer as whole message + # the 'value' syntax is used to refer to the whole value of the message + stream-response-completion-field: "value" + # we want to stream the answer as soon as we have 20 chunks + # in order to reduce latency for the first message the agent sends the first message + # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value + # eventually we want to send bigger messages to reduce the overhead of each message on the topic + min-chunks-per-message: 20 + messages: + - role: system + content: | + An user is going to perform a questions, The documents below may help you in answering to their questions. + Please try to leverage them in your answer as much as possible. + Take into consideration that the user is always asking questions about the LangStream project. + If you provide code or YAML snippets, please explicitly state that they are examples. + Do not provide information that is not related to the LangStream project. + + Documents: + {{# value.related_documents}} + {{ text}} + {{/ value.related_documents}} + - role: user + content: "{{ value.question}}" + - name: "cleanup-response" + type: "drop-fields" + output: "log-topic" + configuration: + fields: + - "question_embeddings" + - "related_documents" \ No newline at end of file diff --git a/examples/applications/webcrawler-astra-vector-db/configuration.yaml b/examples/applications/webcrawler-astra-vector-db/configuration.yaml new file mode 100644 index 000000000..c01e34c8e --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/configuration.yaml @@ -0,0 +1,34 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + defaults: + globals: + collection-name: "documents" + resources: + - type: "open-ai-configuration" + name: "OpenAI Azure configuration" + configuration: + url: "${secrets.open-ai.url}" + access-key: "${secrets.open-ai.access-key}" + provider: "${secrets.open-ai.provider}" + - type: "datasource" + name: "AstraDatasource" + configuration: + service: "astra-vector-db" + token: "${secrets.astra-vector-db.token}" + endpoint: "${secrets.astra-vector-db.endpoint}" \ No newline at end of file diff --git a/examples/applications/webcrawler-astra-vector-db/crawler.yaml b/examples/applications/webcrawler-astra-vector-db/crawler.yaml new file mode 100644 index 000000000..4b8a49560 --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/crawler.yaml @@ -0,0 +1,93 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Crawl a website" +topics: + - name: "chunks-topic" + creation-mode: create-if-not-exists +resources: + size: 2 +pipeline: + - name: "Crawl the WebSite" + type: "webcrawler-source" + configuration: + seed-urls: ["https://docs.langstream.ai/"] + allowed-domains: ["https://docs.langstream.ai"] + forbidden-paths: [] + min-time-between-requests: 500 + reindex-interval-seconds: 3600 + max-error-count: 5 + max-urls: 1000 + max-depth: 50 + handle-robots-file: true + user-agent: "" # this is computed automatically, but you can override it + scan-html-documents: true + http-timeout: 10000 + handle-cookies: true + max-unflushed-pages: 100 + # store data directly on the agent disk, no need for external S3 storage + state-storage: disk + - name: "Extract text" + type: "text-extractor" + - name: "Normalise text" + type: "text-normaliser" + configuration: + make-lowercase: true + trim-spaces: true + - name: "Detect language" + type: "language-detector" + configuration: + property: "language" + - name: "Split into chunks" + type: "text-splitter" + configuration: + splitter_type: "RecursiveCharacterTextSplitter" + chunk_size: 400 + separators: ["\n\n", "\n", " ", ""] + keep_separator: false + chunk_overlap: 100 + length_function: "cl100k_base" + - name: "Convert to structured data" + type: "document-to-json" + configuration: + text-field: text + copy-properties: true + - name: "prepare-structure" + type: "compute" + configuration: + fields: + - name: "value.filename" + expression: "properties.url" + type: STRING + - name: "value.chunk_id" + expression: "properties.chunk_id" + type: STRING + - name: "value.language" + expression: "properties.language" + type: STRING + - name: "value.chunk_num_tokens" + expression: "properties.chunk_num_tokens" + type: STRING + - name: "compute-embeddings" + id: "step1" + type: "compute-ai-embeddings" + output: "chunks-topic" + configuration: + model: "${secrets.open-ai.embeddings-model}" + embeddings-field: "value.embeddings_vector" + text: "{{ value.text }}" + batch-size: 10 + flush-interval: 500 \ No newline at end of file diff --git a/examples/applications/webcrawler-astra-vector-db/gateways.yaml b/examples/applications/webcrawler-astra-vector-db/gateways.yaml new file mode 100644 index 000000000..132788270 --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/gateways.yaml @@ -0,0 +1,43 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: "user-input" + type: produce + topic: "questions-topic" + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: "bot-output" + type: consume + topic: "answers-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + + - id: "llm-debug" + type: consume + topic: "log-topic" \ No newline at end of file diff --git a/examples/applications/webcrawler-astra-vector-db/write-to-astra.yaml b/examples/applications/webcrawler-astra-vector-db/write-to-astra.yaml new file mode 100644 index 000000000..22af7c28c --- /dev/null +++ b/examples/applications/webcrawler-astra-vector-db/write-to-astra.yaml @@ -0,0 +1,94 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Write to AstraDB" +topics: + - name: "chunks-topic" + creation-mode: create-if-not-exists +assets: + - name: "documents-collection" + asset-type: "astra-collection" + creation-mode: create-if-not-exists + config: + collection-name: "${globals.collection-name}" + datasource: "AstraDatasource" + vector-dimension: 1536 +errors: + on-failure: "skip" +pipeline: + - name: "Find existing chunks" + input: "chunks-topic" + type: "query-vector-db" + configuration: + datasource: "AstraDatasource" + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + mode: "query" + query: | + { + "collection-name": "${globals.collection-name}", + "filter": { + "filename": ? + }, + "select": ["chunk_id"] + } + output-field: "value.all_chunks" + fields: + - "value.filename" + - name: "Detect stale chunks" + type: "compute" + configuration: + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + fields: + - name: "value.stale_chunks" + expression: "fn:filter(value.all_chunks, 'record.chunk_id >= fn:toInt(properties.text_num_chunks)')" + - name: "Delete stale chunks" + type: "query-vector-db" + configuration: + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + datasource: "AstraDatasource" + loop-over: "value.stale_chunks" + mode: "execute" + query: | + { + "action": "deleteMany", + "collection-name": "${globals.collection-name}", + "filter": { + "_id": ? + } + } + output-field: "value.delete_results" + fields: + - "record.id" + - type: "log-event" + configuration: + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + message: "Deleted stale chunks: {{{value.delete_results}}}" + - name: "Write to Astra" + type: "vector-db-sink" + configuration: + datasource: "AstraDatasource" + collection-name: "${globals.collection-name}" + fields: + - name: "id" + expression: "fn:concat(value.filename, '-', value.chunk_id)" + - name: "vector" + expression: "value.embeddings_vector" + - name: "text" + expression: "value.text" + - name: "filename" + expression: "value.filename" + - name: "chunk_id" + expression: "value.chunk_id" \ No newline at end of file diff --git a/examples/applications/webcrawler-source/crawler.yaml b/examples/applications/webcrawler-source/crawler.yaml index 020a7ba10..4b8a49560 100644 --- a/examples/applications/webcrawler-source/crawler.yaml +++ b/examples/applications/webcrawler-source/crawler.yaml @@ -50,7 +50,6 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en", "fr"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/examples/applications/webcrawler-source/write-to-astra.yaml b/examples/applications/webcrawler-source/write-to-astra.yaml index dcba707db..9a26c5c18 100644 --- a/examples/applications/webcrawler-source/write-to-astra.yaml +++ b/examples/applications/webcrawler-source/write-to-astra.yaml @@ -47,7 +47,7 @@ pipeline: when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" mode: "execute" query: "DELETE FROM documents.documents WHERE filename = ? AND chunk_id > ?" - output-field: "value.delete-results" + output-field: "value.delete_results" fields: - "value.filename" - "fn:toInt(value.chunk_id)" diff --git a/examples/secrets/secrets.yaml b/examples/secrets/secrets.yaml index 930b05317..6cd59ed29 100644 --- a/examples/secrets/secrets.yaml +++ b/examples/secrets/secrets.yaml @@ -57,6 +57,10 @@ secrets: # secureBundle: "" secureBundle: ${ASTRA_SECURE_BUNDLE:-} environment: ${ASTRA_ENVIRONMENT:-PROD} + - id: astra-vector-db + data: + token: ${ASTRA_VECTOR_DB_TOKEN:-} + endpoint: ${ASTRA_VECTOR_DB_ENDPOINT:-} - id: astra-langchain data: token: "${ASTRA_LANGCHAIN_TOKEN:-}" diff --git a/langstream-agents/langstream-agents-commons/src/main/java/ai/langstream/ai/agents/commons/jstl/JstlFunctions.java b/langstream-agents/langstream-agents-commons/src/main/java/ai/langstream/ai/agents/commons/jstl/JstlFunctions.java index cedf455d7..9d4055937 100644 --- a/langstream-agents/langstream-agents-commons/src/main/java/ai/langstream/ai/agents/commons/jstl/JstlFunctions.java +++ b/langstream-agents/langstream-agents-commons/src/main/java/ai/langstream/ai/agents/commons/jstl/JstlFunctions.java @@ -100,6 +100,28 @@ public static List toListOfFloat(Object input) { result.add(JstlTypeConverter.INSTANCE.coerceToFloat(o)); } return result; + } else if (input instanceof float[] a) { + List result = new ArrayList<>(a.length); + for (Object o : a) { + result.add(JstlTypeConverter.INSTANCE.coerceToFloat(o)); + } + return result; + } else { + throw new IllegalArgumentException("Cannot convert " + input + " to list of float"); + } + } + + public static float[] toArrayOfFloat(Object input) { + if (input == null) { + return null; + } + if (input instanceof Collection collection) { + float[] result = new float[collection.size()]; + int i = 0; + for (Object o : collection) { + result[i++] = JstlTypeConverter.INSTANCE.coerceToFloat(o); + } + return result; } else { throw new IllegalArgumentException("Cannot convert " + input + " to list of float"); } @@ -278,6 +300,41 @@ public static List addAll(Object one, Object two) { return results; } + private static class FilterContext { + private final MutableRecord currentMessage; + + FilterContext(MutableRecord currentMessage) { + this.currentMessage = currentMessage; + } + } + + public static class FilterContextHandle implements AutoCloseable { + + private static final ThreadLocal threadLocal = new ThreadLocal<>(); + + public static FilterContextHandle start(MutableRecord currentMessage) { + return new FilterContextHandle(currentMessage); + } + + private static FilterContext getCurrentFilterContext() { + return threadLocal.get(); + } + + FilterContextHandle(MutableRecord currentMessage) { + FilterContext filterContext = threadLocal.get(); + if (filterContext != null) { + throw new IllegalStateException( + "FilterContextHandle already exists for this thread"); + } + threadLocal.set(new FilterContext(currentMessage)); + } + + @Override + public void close() { + threadLocal.remove(); + } + } + public static List filter(Object input, String expression) { if (input == null) { return null; @@ -296,6 +353,8 @@ public static List filter(Object input, String expression) { throw new IllegalArgumentException( "fn:filter cannot filter object of type " + input.getClass().getName()); } + + FilterContext currentContext = FilterContextHandle.getCurrentFilterContext(); List result = new ArrayList<>(); JstlPredicate predicate = new JstlPredicate(expression); for (Object o : source) { @@ -304,14 +363,23 @@ public static List filter(Object input, String expression) { } // nulls are always filtered out if (o != null) { - MutableRecord context = new MutableRecord(); - context.setRecordObject(o); - boolean evaluate = predicate.test(context); - if (log.isDebugEnabled()) { - log.debug("Result of evaluation is {}", evaluate); + MutableRecord context; + if (currentContext != null) { + context = currentContext.currentMessage; + } else { + context = new MutableRecord(); } - if (evaluate) { - result.add(o); + context.setRecordObject(o); + try { + boolean evaluate = predicate.test(context); + if (log.isDebugEnabled()) { + log.debug("Result of evaluation is {}", evaluate); + } + if (evaluate) { + result.add(o); + } + } finally { + context.setRecordObject(null); } } } diff --git a/langstream-agents/langstream-agents-flow-control/pom.xml b/langstream-agents/langstream-agents-flow-control/pom.xml index 63196458f..99dd5618a 100644 --- a/langstream-agents/langstream-agents-flow-control/pom.xml +++ b/langstream-agents/langstream-agents-flow-control/pom.xml @@ -42,6 +42,10 @@ langstream-agents-commons ${project.version} + + com.samskivert + jmustache + org.projectlombok lombok diff --git a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/LogEventProcessor.java b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/LogEventProcessor.java index 777535d8b..62a93d855 100644 --- a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/LogEventProcessor.java +++ b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/LogEventProcessor.java @@ -15,6 +15,7 @@ */ package ai.langstream.agents.flow; +import ai.langstream.ai.agents.commons.JsonRecord; import ai.langstream.ai.agents.commons.MutableRecord; import ai.langstream.ai.agents.commons.jstl.JstlEvaluator; import ai.langstream.ai.agents.commons.jstl.predicate.JstlPredicate; @@ -24,6 +25,8 @@ import ai.langstream.api.runner.code.RecordSink; import ai.langstream.api.runtime.ComponentType; import ai.langstream.api.util.ConfigurationUtils; +import com.samskivert.mustache.Mustache; +import com.samskivert.mustache.Template; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -36,13 +39,22 @@ public class LogEventProcessor extends AbstractAgentCode implements AgentProcess record FieldDefinition(String name, JstlEvaluator expressionEvaluator) {} private final List fields = new ArrayList<>(); + private Template messageTemplate; private JstlPredicate predicate; @SuppressWarnings("unchecked") @Override public void init(Map configuration) { String when = ConfigurationUtils.getString("when", "true", configuration); + String message = ConfigurationUtils.getString("message", "", configuration); predicate = new JstlPredicate(when); + + if (!message.isEmpty()) { + messageTemplate = Mustache.compiler().compile(message); + } else { + messageTemplate = null; + } + List> fields = (List>) configuration.getOrDefault("fields", List.of()); fields.forEach( @@ -66,9 +78,12 @@ private void logRecord(Record originalRecord) { if (!predicate.test(mutableRecord)) { return; } - if (fields.isEmpty()) { + if (fields.isEmpty() && messageTemplate == null) { log.info("{}", originalRecord); - } else { + return; + } + + if (!fields.isEmpty()) { // using LinkedHashMap in order to keep the order Map values = new LinkedHashMap<>(); for (FieldDefinition field : fields) { @@ -76,6 +91,11 @@ private void logRecord(Record originalRecord) { } log.info("{}", values); } + + if (messageTemplate != null) { + JsonRecord jsonRecord = mutableRecord.toJsonRecord(); + log.info("{}", messageTemplate.execute(jsonRecord)); + } } @Override diff --git a/langstream-agents/langstream-agents-flow-control/src/test/java/ai/langstream/agents/flow/FlowControlAgentsTest.java b/langstream-agents/langstream-agents-flow-control/src/test/java/ai/langstream/agents/flow/FlowControlAgentsTest.java index 30c5b9cd4..a80b322ca 100644 --- a/langstream-agents/langstream-agents-flow-control/src/test/java/ai/langstream/agents/flow/FlowControlAgentsTest.java +++ b/langstream-agents/langstream-agents-flow-control/src/test/java/ai/langstream/agents/flow/FlowControlAgentsTest.java @@ -355,6 +355,8 @@ public void testLogEvent() throws Exception { try (LogEventProcessor processor = new LogEventProcessor(); ) { processor.init( Map.of( + "message", + "Original is {{{value.original}}}", "fields", List.of( Map.of( @@ -387,4 +389,34 @@ public void testLogEvent() throws Exception { } } } + + @Test + public void testLogEventNoFieldsNoMessage() throws Exception { + try (LogEventProcessor processor = new LogEventProcessor(); ) { + processor.init(Map.of()); + processor.start(); + + for (int i = 0; i < 10; i++) { + SimpleRecord someRecord = + SimpleRecord.builder() + .value( + """ + {"original": "Hello Folks %s", "activator": %s} + """ + .formatted(i, i)) + .build(); + + List read = new ArrayList<>(); + processor.process( + List.of(someRecord), + (sourceRecordAndResult) -> + read.addAll(sourceRecordAndResult.resultRecords())); + assertEquals(1, read.size()); + Record emittedToDownstream = read.get(0); + // the processor must pass downstream the original record + assertSame(emittedToDownstream, someRecord); + log.info("Received record {}", emittedToDownstream); + } + } + } } diff --git a/langstream-agents/langstream-ai-agents/pom.xml b/langstream-agents/langstream-ai-agents/pom.xml index b8d9474ba..94ddbf6bf 100644 --- a/langstream-agents/langstream-ai-agents/pom.xml +++ b/langstream-agents/langstream-ai-agents/pom.xml @@ -66,7 +66,32 @@ com.datastax.astra astra-sdk-devops - 0.6.9 + 1.0 + + + com.datastax.astra + astra-db-client + 1.0 + + + com.datastax.astra + astra-sdk + 1.0 + + + com.datastax.stargate + stargate-sdk + 2.1.4 + + + com.datastax.stargate + stargate-sdk-grpc + + + com.datastax.stargate + stargate-sdk-graphql + + io.netty.incubator diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/AstraDataSource.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/CassandraDataSourceProvider.java similarity index 94% rename from langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/AstraDataSource.java rename to langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/CassandraDataSourceProvider.java index 70fcfb0b7..99cbce9ab 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/AstraDataSource.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/CassandraDataSourceProvider.java @@ -20,7 +20,7 @@ import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; import java.util.Map; -public class AstraDataSource implements DataSourceProvider { +public class CassandraDataSourceProvider implements DataSourceProvider { @Override public boolean supports(Map dataSourceConfig) { diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ComputeStep.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ComputeStep.java index bc02a8d52..f97c4d404 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ComputeStep.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ComputeStep.java @@ -18,6 +18,7 @@ import ai.langstream.ai.agents.commons.AvroUtil; import ai.langstream.ai.agents.commons.MutableRecord; import ai.langstream.ai.agents.commons.TransformSchemaType; +import ai.langstream.ai.agents.commons.jstl.JstlFunctions; import ai.langstream.ai.agents.commons.jstl.JstlTypeConverter; import com.datastax.oss.streaming.ai.model.ComputeField; import com.datastax.oss.streaming.ai.model.ComputeFieldType; @@ -57,31 +58,34 @@ public class ComputeStep implements TransformStep { @Override public void process(MutableRecord mutableRecord) { - computePrimitiveField( - fields.stream() - .filter(f -> "primitive".equals(f.getScope())) - .collect(Collectors.toList()), - mutableRecord); - computeKeyFields( - fields.stream() - .filter(f -> "key".equals(f.getScope())) - .collect(Collectors.toList()), - mutableRecord); - computeValueFields( - fields.stream() - .filter(f -> "value".equals(f.getScope())) - .collect(Collectors.toList()), - mutableRecord); - computeHeaderFields( - fields.stream() - .filter(f -> "header".equals(f.getScope())) - .collect(Collectors.toList()), - mutableRecord); - computeHeaderPropertiesFields( - fields.stream() - .filter(f -> "header.properties".equals(f.getScope())) - .collect(Collectors.toList()), - mutableRecord); + try (JstlFunctions.FilterContextHandle handle = + JstlFunctions.FilterContextHandle.start(mutableRecord)) { + computePrimitiveField( + fields.stream() + .filter(f -> "primitive".equals(f.getScope())) + .collect(Collectors.toList()), + mutableRecord); + computeKeyFields( + fields.stream() + .filter(f -> "key".equals(f.getScope())) + .collect(Collectors.toList()), + mutableRecord); + computeValueFields( + fields.stream() + .filter(f -> "value".equals(f.getScope())) + .collect(Collectors.toList()), + mutableRecord); + computeHeaderFields( + fields.stream() + .filter(f -> "header".equals(f.getScope())) + .collect(Collectors.toList()), + mutableRecord); + computeHeaderPropertiesFields( + fields.stream() + .filter(f -> "header.properties".equals(f.getScope())) + .collect(Collectors.toList()), + mutableRecord); + } } public void computeValueFields(List fields, MutableRecord context) { diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/QueryStep.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/QueryStep.java index ce4d0b631..3be1875e4 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/QueryStep.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/QueryStep.java @@ -130,6 +130,10 @@ private List> processQuery(MutableRecord mutableRecord) { // loop over a list // for each item we name if "record" and we perform the query List nestedRecords = loopOverAccessor.evaluate(mutableRecord); + if (nestedRecords == null) { + log.info("Property {} not found in record {}", loopOver, mutableRecord); + nestedRecords = List.of(); + } results = new ArrayList<>(); for (Object document : nestedRecords) { MutableRecord nestedRecordContext = new MutableRecord(); @@ -150,6 +154,10 @@ private Object processExecute(MutableRecord mutableRecord) { // loop over a list // for each item we name if "record" and we perform the query List nestedRecords = loopOverAccessor.evaluate(mutableRecord); + if (nestedRecords == null) { + log.info("Property {} not found in record {}", loopOver, mutableRecord); + nestedRecords = List.of(); + } List> results = new ArrayList<>(); for (Object document : nestedRecords) { MutableRecord nestedRecordContext = new MutableRecord(); diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java index a3d2808e2..61e667765 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java @@ -31,9 +31,9 @@ import com.datastax.oss.driver.api.core.type.reflect.GenericType; import com.datastax.oss.driver.internal.core.type.codec.CqlVectorCodec; import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry; -import com.dtsx.astra.sdk.db.AstraDbClient; -import com.dtsx.astra.sdk.db.DatabaseClient; -import com.dtsx.astra.sdk.utils.ApiLocator; +import com.dtsx.astra.sdk.db.AstraDBOpsClient; +import com.dtsx.astra.sdk.db.DbOpsClient; +import com.dtsx.astra.sdk.utils.AstraEnvironment; import edu.umd.cs.findbugs.annotations.Nullable; import java.io.ByteArrayInputStream; import java.net.InetSocketAddress; @@ -241,13 +241,13 @@ private CqlSession buildCqlSession(Map dataSourceConfig) { log.info( "Automatically downloading the secure bundle for database name {} from AstraDB", astraDatabase); - DatabaseClient databaseClient = this.buildAstraClient(); + DbOpsClient databaseClient = this.buildAstraClient(); secureBundleDecoded = downloadSecureBundle(databaseClient); } else if (!astraDatabaseId.isEmpty() && !astraToken.isEmpty()) { log.info( "Automatically downloading the secure bundle for database id {} from AstraDB", astraDatabaseId); - DatabaseClient databaseClient = this.buildAstraClient(); + DbOpsClient databaseClient = this.buildAstraClient(); secureBundleDecoded = downloadSecureBundle(databaseClient); } else { log.info("No secure bundle provided, using the default CQL driver for Cassandra"); @@ -281,11 +281,11 @@ public CqlSession getSession() { return session; } - public DatabaseClient buildAstraClient() { + public DbOpsClient buildAstraClient() { return buildAstraClient(astraToken, astraDatabase, astraDatabaseId, astraEnvironment); } - public static DatabaseClient buildAstraClient( + public static DbOpsClient buildAstraClient( String astraToken, String astraDatabase, String astraDatabaseId, @@ -293,9 +293,8 @@ public static DatabaseClient buildAstraClient( if (astraToken.isEmpty()) { throw new IllegalArgumentException("You must configure the AstraDB token"); } - AstraDbClient astraDbClient = - new AstraDbClient( - astraToken, ApiLocator.AstraEnvironment.valueOf(astraEnvironment)); + AstraDBOpsClient astraDbClient = + new AstraDBOpsClient(astraToken, AstraEnvironment.valueOf(astraEnvironment)); if (!astraDatabase.isEmpty()) { return astraDbClient.databaseByName(astraDatabase); } else if (!astraDatabaseId.isEmpty()) { @@ -306,7 +305,7 @@ public static DatabaseClient buildAstraClient( } } - public static byte[] downloadSecureBundle(DatabaseClient databaseClient) { + public static byte[] downloadSecureBundle(DbOpsClient databaseClient) { long start = System.currentTimeMillis(); byte[] secureBundleDecoded = databaseClient.downloadDefaultSecureConnectBundle(); long delta = System.currentTimeMillis() - start; diff --git a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider index 3c023fedb..233a8ee8a 100644 --- a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider +++ b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider @@ -1,2 +1,2 @@ -ai.langstream.ai.agents.datasource.impl.AstraDataSource +ai.langstream.ai.agents.datasource.impl.CassandraDataSourceProvider ai.langstream.ai.agents.datasource.impl.JdbcDataSourceProvider \ No newline at end of file diff --git a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java index dcebe11cc..a9503c958 100644 --- a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java +++ b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import ai.langstream.ai.agents.commons.MutableRecord; import ai.langstream.ai.agents.commons.jstl.JstlFunctions; import java.math.BigDecimal; import java.math.BigInteger; @@ -525,4 +526,48 @@ void testFilterQueryResults() { assertEquals(2, filter.size()); } } + + @Test + void testFilterQueryResultsWithContext() { + + MutableRecord record = new MutableRecord(); + record.setValueObject(Map.of("threshold", "0.5")); + + try (JstlFunctions.FilterContextHandle context = + JstlFunctions.FilterContextHandle.start(record)) { + + // the query step always produces a list> result + // this test verifies that it is possible to filter the result using the JSTL filter + // function + List> queryResult = new ArrayList<>(); + queryResult.add(Map.of("name", "product1", "price", "1.2", "similarity", "0.9")); + queryResult.add(Map.of("name", "product2", "price", "1.7", "similarity", "0.1")); + + { + List filter = + JstlFunctions.filter( + queryResult, "fn:toDouble(record.similarity) >= value.threshold"); + assertEquals(1, filter.size()); + assertEquals("product1", ((Map) filter.get(0)).get("name")); + } + + { + List filter = + JstlFunctions.filter( + queryResult, "fn:toDouble(record.similarity) < value.threshold"); + assertEquals(1, filter.size()); + assertEquals("product2", ((Map) filter.get(0)).get("name")); + } + + { + List filter = JstlFunctions.filter(queryResult, "false"); + assertEquals(0, filter.size()); + } + + { + List filter = JstlFunctions.filter(queryResult, "true"); + assertEquals(2, filter.size()); + } + } + } } diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java index 76d3a0ed5..5bcfc726d 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java @@ -19,6 +19,7 @@ import static ai.langstream.ai.agents.commons.MutableRecord.recordToMutableRecord; import ai.langstream.ai.agents.commons.MutableRecord; +import ai.langstream.ai.agents.commons.jstl.predicate.JstlPredicate; import ai.langstream.ai.agents.datasource.DataSourceProviderRegistry; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.code.SingleRecordAgentProcessor; @@ -57,7 +58,9 @@ public void init(Map configuration) throws Exception { configuration.put("type", "query"); QueryConfig queryConfig = MAPPER.convertValue(configuration, QueryConfig.class); queryExecutor = (QueryStep) TransformFunctionUtil.newQuery(queryConfig, dataSource); - steps = List.of(new StepPredicatePair(queryExecutor, it -> true)); + JstlPredicate when = + queryConfig.getWhen() == null ? null : new JstlPredicate(queryConfig.getWhen()); + steps = List.of(new StepPredicatePair(queryExecutor, when)); } @Override diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBAssetsManagerProvider.java new file mode 100644 index 000000000..1b1a69870 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBAssetsManagerProvider.java @@ -0,0 +1,118 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.assets.AssetManagerProvider; +import ai.langstream.api.util.ConfigurationUtils; +import io.stargate.sdk.doc.exception.CollectionNotFoundException; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraVectorDBAssetsManagerProvider implements AssetManagerProvider { + + @Override + public boolean supports(String assetType) { + return "astra-collection".equals(assetType); + } + + @Override + public AssetManager createInstance(String assetType) { + + switch (assetType) { + case "astra-collection": + return new AstraDBCollectionAssetManager(); + default: + throw new IllegalArgumentException(); + } + } + + private abstract static class BaseAstraAssetManager implements AssetManager { + + AstraVectorDBDataSource datasource; + AssetDefinition assetDefinition; + + @Override + public void initialize(AssetDefinition assetDefinition) { + this.datasource = buildDataSource(assetDefinition); + this.assetDefinition = assetDefinition; + } + + @Override + public void close() throws Exception { + if (datasource != null) { + datasource.close(); + } + } + } + + private static class AstraDBCollectionAssetManager extends BaseAstraAssetManager { + + @Override + public boolean assetExists() throws Exception { + String collection = getCollection(); + log.info("Checking if collection {} exists", collection); + return datasource.getAstraDB().isCollectionExists(collection); + } + + @Override + public void deployAsset() throws Exception { + int vectorDimension = getVectorDimension(); + + String collection = getCollection(); + log.info("Create collection {} with vector dimension {}", collection, vectorDimension); + datasource.getAstraDB().createCollection(collection, vectorDimension); + } + + private String getCollection() { + return ConfigurationUtils.getString( + "collection-name", null, assetDefinition.getConfig()); + } + + private int getVectorDimension() { + return ConfigurationUtils.getInt("vector-dimension", 1536, assetDefinition.getConfig()); + } + + @Override + public boolean deleteAssetIfExists() throws Exception { + String collection = getCollection(); + + log.info("Deleting collection {}", collection); + + try { + datasource.getAstraDB().deleteCollection(collection); + return true; + } catch (CollectionNotFoundException e) { + log.info( + "collection does not exist, maybe it was deleted by another agent ({})", + e.toString()); + return false; + } + } + } + + private static AstraVectorDBDataSource buildDataSource(AssetDefinition assetDefinition) { + AstraVectorDBDataSource dataSource = new AstraVectorDBDataSource(); + Map datasourceDefinition = + ConfigurationUtils.getMap("datasource", Map.of(), assetDefinition.getConfig()); + Map configuration = + ConfigurationUtils.getMap("configuration", Map.of(), datasourceDefinition); + dataSource.initialize(configuration); + return dataSource; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSource.java new file mode 100644 index 000000000..7b669d5ab --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSource.java @@ -0,0 +1,296 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.agents.vector.InterpolationUtils; +import ai.langstream.ai.agents.commons.jstl.JstlFunctions; +import ai.langstream.api.util.ConfigurationUtils; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import com.dtsx.astra.sdk.AstraDB; +import io.stargate.sdk.json.CollectionClient; +import io.stargate.sdk.json.domain.DeleteQuery; +import io.stargate.sdk.json.domain.DeleteQueryBuilder; +import io.stargate.sdk.json.domain.JsonDocument; +import io.stargate.sdk.json.domain.JsonResult; +import io.stargate.sdk.json.domain.JsonResultUpdate; +import io.stargate.sdk.json.domain.SelectQuery; +import io.stargate.sdk.json.domain.SelectQueryBuilder; +import io.stargate.sdk.json.domain.UpdateQuery; +import io.stargate.sdk.json.domain.UpdateQueryBuilder; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraVectorDBDataSource implements QueryStepDataSource { + + static final Field update; + + static { + try { + update = UpdateQueryBuilder.class.getDeclaredField("update"); + update.setAccessible(true); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + AstraDB astraDB; + + @Override + public void initialize(Map dataSourceConfig) { + log.info( + "Initializing AstraVectorDBDataSource with config {}", + ConfigurationUtils.redactSecrets(dataSourceConfig)); + String astraToken = ConfigurationUtils.getString("token", "", dataSourceConfig); + String astraEndpoint = ConfigurationUtils.getString("endpoint", "", dataSourceConfig); + this.astraDB = new AstraDB(astraToken, astraEndpoint); + } + + @Override + public void close() {} + + @Override + public List> fetchData(String query, List params) { + if (log.isDebugEnabled()) { + log.debug( + "Executing query {} with params {} ({})", + query, + params, + params.stream() + .map(v -> v == null ? "null" : v.getClass().toString()) + .collect(Collectors.joining(","))); + } + try { + Map queryMap = + InterpolationUtils.buildObjectFromJson(query, Map.class, params); + if (queryMap.isEmpty()) { + throw new UnsupportedOperationException("Query is empty"); + } + String collectionName = (String) queryMap.remove("collection-name"); + if (collectionName == null) { + throw new UnsupportedOperationException("collection-name is not defined"); + } + CollectionClient collection = this.getAstraDB().collection(collectionName); + List result; + + float[] vector = JstlFunctions.toArrayOfFloat(queryMap.remove("vector")); + Integer limit = (Integer) queryMap.remove("limit"); + + boolean includeSimilarity = vector != null; + Object includeSimilarityParam = queryMap.remove("include-similarity"); + if (includeSimilarityParam != null) { + includeSimilarity = Boolean.parseBoolean(includeSimilarityParam.toString()); + } + Map filterMap = (Map) queryMap.remove("filter"); + SelectQueryBuilder selectQueryBuilder = SelectQuery.builder(); + Object selectClause = queryMap.remove("select"); + if (selectClause != null) { + if (selectClause instanceof List list) { + String[] arrayOfStrings = ((List) list).toArray(new String[0]); + selectQueryBuilder.select(arrayOfStrings); + } else { + throw new IllegalArgumentException( + "select clause must be a list of strings, but found: " + selectClause); + } + } + if (includeSimilarity) { + selectQueryBuilder.includeSimilarity(); + } + if (vector != null) { + selectQueryBuilder.orderByAnn(vector); + } + if (filterMap != null) { + selectQueryBuilder.withJsonFilter(JstlFunctions.toJson(filterMap)); + } + if (limit != null) { + selectQueryBuilder.limit(limit); + } + + SelectQuery selectQuery = selectQueryBuilder.build(); + if (log.isDebugEnabled()) { + log.debug("doing query {}", JstlFunctions.toJson(selectQuery)); + } + + result = collection.query(selectQuery).toList(); + + return result.stream() + .map( + m -> { + Map r = new HashMap<>(); + if (m.getData() != null) { + r.putAll(m.getData()); + } + r.put("id", m.getId()); + if (m.getSimilarity() != null) { + r.put("similarity", m.getSimilarity()); + } + if (m.getVector() != null) { + r.put("vector", JstlFunctions.toListOfFloat(m.getVector())); + } + return r; + }) + .collect(Collectors.toList()); + } catch (Exception err) { + throw new RuntimeException(err); + } + } + + @Override + public Map executeStatement( + String query, List generatedKeys, List params) { + if (log.isDebugEnabled()) { + log.debug( + "Executing statement {} with params {} ({})", + query, + params, + params.stream() + .map(v -> v == null ? "null" : v.getClass().toString()) + .collect(Collectors.joining(","))); + } + try { + Map queryMap = + InterpolationUtils.buildObjectFromJson(query, Map.class, params); + if (queryMap.isEmpty()) { + throw new UnsupportedOperationException("Query is empty"); + } + String collectionName = (String) queryMap.remove("collection-name"); + if (collectionName == null) { + throw new UnsupportedOperationException("collection-name is not defined"); + } + CollectionClient collection = this.getAstraDB().collection(collectionName); + + String action = (String) queryMap.remove("action"); + if (action == null) { + action = "findOneAndUpdate"; + } + + switch (action) { + case "findOneAndUpdate": + { + Map filterMap = + (Map) queryMap.remove("filter"); + UpdateQueryBuilder builder = UpdateQuery.builder(); + if (filterMap != null) { + builder.withJsonFilter(JstlFunctions.toJson(filterMap)); + } + String returnDocument = (String) queryMap.remove("return-document"); + if (returnDocument != null) { + builder.withReturnDocument( + UpdateQueryBuilder.ReturnDocument.valueOf(returnDocument)); + } + Map updateMap = + (Map) queryMap.remove("update"); + if (updateMap == null || updateMap.isEmpty()) { + throw new IllegalArgumentException("update map cannot be empty"); + } + update.set(builder, updateMap); + + UpdateQuery updateQuery = builder.build(); + if (log.isDebugEnabled()) { + log.debug( + "doing findOneAndUpdate with UpdateQuery {}", + JstlFunctions.toJson(updateQuery)); + } + JsonResultUpdate oneAndUpdate = collection.findOneAndUpdate(updateQuery); + return Map.of("count", oneAndUpdate.getUpdateStatus().getModifiedCount()); + } + case "deleteOne": + { + Map filterMap = + (Map) queryMap.remove("filter"); + DeleteQueryBuilder builder = DeleteQuery.builder(); + if (filterMap != null) { + builder.withJsonFilter(JstlFunctions.toJson(filterMap)); + } + DeleteQuery delete = builder.build(); + if (log.isDebugEnabled()) { + log.debug( + "doing deleteOne with DeleteQuery {}", + JstlFunctions.toJson(delete)); + } + int count = collection.deleteOne(delete); + return Map.of("count", count); + } + case "deleteMany": + { + Map filterMap = + (Map) queryMap.remove("filter"); + DeleteQueryBuilder builder = DeleteQuery.builder(); + if (filterMap != null) { + builder.withJsonFilter(JstlFunctions.toJson(filterMap)); + } + DeleteQuery delete = builder.build(); + if (log.isDebugEnabled()) { + log.debug( + "doing deleteMany with DeleteQuery {}", + JstlFunctions.toJson(delete)); + } + int count = collection.deleteMany(delete); + return Map.of("count", count); + } + case "insertOne": + { + Map documentData = + (Map) queryMap.remove("document"); + JsonDocument document = new JsonDocument(); + for (Map.Entry entry : documentData.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + switch (key) { + case "id": + document.id(value.toString()); + break; + case "vector": + document.vector(JstlFunctions.toArrayOfFloat(value)); + break; + case "data": + document.data(value); + break; + default: + document.put(key, value); + break; + } + } + if (document.getId() == null) { + document.setId(UUID.randomUUID().toString()); + } + + if (log.isDebugEnabled()) { + log.debug( + "doing insertOne with JsonDocument {}", + JstlFunctions.toJson(document)); + } + String id = collection.insertOne(document); + return Map.of("id", id); + } + default: + throw new UnsupportedOperationException("Unsupported action: " + action); + } + + } catch (Exception err) { + throw new RuntimeException(err); + } + } + + public AstraDB getAstraDB() { + return astraDB; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSourceProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSourceProvider.java new file mode 100644 index 000000000..3360ce61b --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBDataSourceProvider.java @@ -0,0 +1,37 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.ai.agents.datasource.DataSourceProvider; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraVectorDBDataSourceProvider implements DataSourceProvider { + + @Override + public boolean supports(Map dataSourceConfig) { + String service = (String) dataSourceConfig.get("service"); + return "astra-vector-db".equals(service); + } + + @Override + public QueryStepDataSource createDataSourceImplementation( + Map dataSourceConfig) { + return new AstraVectorDBDataSource(); + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBWriter.java new file mode 100644 index 000000000..e927ea318 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraVectorDBWriter.java @@ -0,0 +1,191 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import static ai.langstream.ai.agents.commons.MutableRecord.recordToMutableRecord; + +import ai.langstream.ai.agents.commons.MutableRecord; +import ai.langstream.ai.agents.commons.jstl.JstlEvaluator; +import ai.langstream.ai.agents.commons.jstl.JstlFunctions; +import ai.langstream.api.database.VectorDatabaseWriter; +import ai.langstream.api.database.VectorDatabaseWriterProvider; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.util.ConfigurationUtils; +import io.stargate.sdk.json.CollectionClient; +import io.stargate.sdk.json.domain.JsonDocument; +import io.stargate.sdk.json.domain.UpdateQuery; +import io.stargate.sdk.json.exception.ApiException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraVectorDBWriter implements VectorDatabaseWriterProvider { + + @Override + public boolean supports(Map dataSourceConfig) { + return "astra-vector-db".equals(dataSourceConfig.get("service")); + } + + @Override + public VectorDatabaseWriter createImplementation(Map datasourceConfig) { + return new AstraCollectionsDatabaseWriter(datasourceConfig); + } + + private static class AstraCollectionsDatabaseWriter implements VectorDatabaseWriter { + + AstraVectorDBDataSource dataSource; + private final Map datasourceConfig; + private String collectionName; + private CollectionClient collection; + + private final LinkedHashMap fields = new LinkedHashMap<>(); + + public AstraCollectionsDatabaseWriter(Map datasourceConfig) { + this.datasourceConfig = datasourceConfig; + this.dataSource = new AstraVectorDBDataSource(); + } + + @Override + public void initialise(Map agentConfiguration) { + collectionName = + ConfigurationUtils.getString("collection-name", "", agentConfiguration); + dataSource.initialize(datasourceConfig); + collection = dataSource.getAstraDB().collection(collectionName); + + List> fields = + (List>) + agentConfiguration.getOrDefault("fields", List.of()); + fields.forEach( + field -> { + this.fields.put( + field.get("name").toString(), + buildEvaluator(field, "expression", Object.class)); + }); + } + + @Override + public CompletableFuture upsert(Record record, Map context) { + MutableRecord mutableRecord = recordToMutableRecord(record, true); + JsonDocument document = new JsonDocument(); + try { + computeFields( + mutableRecord, + fields, + (name, value) -> { + if (value != null) { + if (log.isDebugEnabled()) { + log.debug( + "setting value {} ({}) for field {}", + value, + value.getClass(), + name); + } + switch (name) { + case "vector": + document.vector(JstlFunctions.toArrayOfFloat(value)); + break; + case "id": + document.id(value.toString()); + break; + case "data": + document.data(value); + break; + default: + document.put(name, value); + break; + } + } + }); + // ensure that we always have an ID + if (document.getId() == null) { + document.setId(UUID.randomUUID().toString()); + } + if (record.value() == null) { + int count = collection.deleteById(document.getId()); + if (log.isDebugEnabled()) { + if (count > 0) { + log.debug("Deleted document with id {}", document.getId()); + } else { + log.debug("No document with id {} to delete", document.getId()); + } + } + return CompletableFuture.completedFuture(document.getId()); + } else { + + try { + String id = collection.insertOne(document); + if (log.isDebugEnabled()) { + log.debug("Inserted document with id {}", id); + } + return CompletableFuture.completedFuture(id); + } catch (ApiException e) { + if ("DOCUMENT_ALREADY_EXISTS".equals(e.getErrorCode())) { + collection. // Already Exist + findOneAndReplace( + UpdateQuery.builder() + .where("_id") + .isEqualsTo(document.getId()) + .replaceBy(document) + .build()); + return CompletableFuture.completedFuture(document.getId()); + } else { + return CompletableFuture.failedFuture(e); + } + } + } + + } catch (Throwable e) { + log.error("Error while inserting document {}", document, e); + return CompletableFuture.failedFuture(e); + } + } + + @Override + public void close() {} + + private void computeFields( + MutableRecord mutableRecord, + Map fields, + BiConsumer acceptor) { + fields.forEach( + (name, evaluator) -> { + Object value = evaluator.evaluate(mutableRecord); + if (log.isDebugEnabled()) { + log.debug( + "setting value {} ({}) for field {}", + value, + value.getClass(), + name); + } + acceptor.accept(name, value); + }); + } + + private static JstlEvaluator buildEvaluator( + Map agentConfiguration, String param, Class type) { + String expression = agentConfiguration.getOrDefault(param, "").toString(); + if (expression == null || expression.isEmpty()) { + return null; + } + return new JstlEvaluator("${" + expression + "}", type); + } + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java index a7d2651fe..116502749 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java @@ -25,7 +25,7 @@ import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.api.core.servererrors.AlreadyExistsException; import com.datastax.oss.streaming.ai.datasource.CassandraDataSource; -import com.dtsx.astra.sdk.db.DatabaseClient; +import com.dtsx.astra.sdk.db.DbOpsClient; import java.util.List; import java.util.Map; import java.util.Optional; @@ -211,7 +211,7 @@ private static class AstraDBKeyspaceAssetManager extends BaseCassandraAssetManag public boolean assetExists() throws Exception { String keySpace = getKeyspace(); log.info("Checking if keyspace {} exists", keySpace); - DatabaseClient astraDbClient = datasource.buildAstraClient(); + DbOpsClient astraDbClient = datasource.buildAstraClient(); boolean exist = astraDbClient.keyspaces().exist(keySpace); log.info("Result: {}", exist); return exist; @@ -220,7 +220,7 @@ public boolean assetExists() throws Exception { @Override public void deployAsset() throws Exception { String keySpace = getKeyspace(); - DatabaseClient astraDbClient = datasource.buildAstraClient(); + DbOpsClient astraDbClient = datasource.buildAstraClient(); try { astraDbClient.keyspaces().create(keySpace); } catch (com.dtsx.astra.sdk.db.exception.KeyspaceAlreadyExistException e) { @@ -250,7 +250,7 @@ public boolean deleteAssetIfExists() throws Exception { String keySpace = getKeyspace(); log.info("Deleting keyspace {}", keySpace); - DatabaseClient astraDbClient = datasource.buildAstraClient(); + DbOpsClient astraDbClient = datasource.buildAstraClient(); try { astraDbClient.keyspaces().delete(keySpace); return true; diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java index b7508d45d..89e21afce 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java @@ -27,7 +27,7 @@ import com.datastax.oss.common.sink.config.CassandraSinkConfig; import com.datastax.oss.common.sink.util.SinkUtil; import com.datastax.oss.streaming.ai.datasource.CassandraDataSource; -import com.dtsx.astra.sdk.db.DatabaseClient; +import com.dtsx.astra.sdk.db.DbOpsClient; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashMap; @@ -121,7 +121,7 @@ public void initialise(Map agentConfiguration) { "environment", "PROD", datasource); if (!token.isEmpty() && (!database.isEmpty() || !databaseId.isEmpty())) { - DatabaseClient databaseClient = + DbOpsClient databaseClient = CassandraDataSource.buildAstraClient( token, database, diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index index 935c45a6a..5b3d25b29 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/ai.langstream.assets.index @@ -4,4 +4,5 @@ astra-keyspace milvus-collection jdbc-table solr-collection -opensearch-index \ No newline at end of file +opensearch-index +astra-collection \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider index 81ead541b..f269c5d74 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider @@ -1,4 +1,5 @@ ai.langstream.agents.vector.pinecone.PineconeDataSource ai.langstream.agents.vector.milvus.MilvusDataSource ai.langstream.agents.vector.solr.SolrDataSource -ai.langstream.agents.vector.opensearch.OpenSearchDataSource \ No newline at end of file +ai.langstream.agents.vector.opensearch.OpenSearchDataSource +ai.langstream.agents.vector.astra.AstraVectorDBDataSourceProvider \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider index f6c27bb80..0f0e73890 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider @@ -3,4 +3,5 @@ ai.langstream.agents.vector.cassandra.CassandraWriter ai.langstream.agents.vector.milvus.MilvusWriter ai.langstream.agents.vector.jdbc.JdbcWriter ai.langstream.agents.vector.solr.SolrWriter -ai.langstream.agents.vector.opensearch.OpenSearchWriter \ No newline at end of file +ai.langstream.agents.vector.opensearch.OpenSearchWriter +ai.langstream.agents.vector.astra.AstraVectorDBWriter \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider index 45925c4fa..98573dc73 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider @@ -2,4 +2,5 @@ ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider ai.langstream.agents.vector.milvus.MilvusAssetsManagerProvider ai.langstream.agents.vector.jdbc.JdbcAssetsManagerProvider ai.langstream.agents.vector.solr.SolrAssetsManagerProvider -ai.langstream.agents.vector.opensearch.OpenSearchAssetsManagerProvider \ No newline at end of file +ai.langstream.agents.vector.opensearch.OpenSearchAssetsManagerProvider +ai.langstream.agents.vector.astra.AstraVectorDBAssetsManagerProvider \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/AstraVectorDBTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/AstraVectorDBTest.java new file mode 100644 index 000000000..4b9e2b8cf --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/AstraVectorDBTest.java @@ -0,0 +1,417 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.datasource.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.langstream.agents.vector.astra.AstraVectorDBAssetsManagerProvider; +import ai.langstream.agents.vector.astra.AstraVectorDBDataSourceProvider; +import ai.langstream.agents.vector.astra.AstraVectorDBWriter; +import ai.langstream.api.database.VectorDatabaseWriter; +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.assets.AssetManagerProvider; +import ai.langstream.api.runner.code.SimpleRecord; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +@Disabled +@Slf4j +public class AstraVectorDBTest { + + private static final String TOKEN = + "AstraCS:HQKZyFwTNcNQFPhsLHPHlyYq:0fd08e29b7e7c590e947ac8fa9a4d6d785a4661a8eb1b3c011e2a0d19c2ecd7c"; + private static final String ENDPOINT = + "https://18bdf302-901f-4245-af09-061ebdb480d2-us-east1.apps.astra.datastax.com"; + + @Test + void testWriteAndRead() throws Exception { + AstraVectorDBDataSourceProvider dataSourceProvider = new AstraVectorDBDataSourceProvider(); + Map config = Map.of("token", TOKEN, "endpoint", ENDPOINT); + + String collectionName = "documents"; + int dimension = 32; + List vector = new ArrayList<>(); + List vector2 = new ArrayList<>(); + for (int i = 0; i < dimension; i++) { + vector.add(i * 1f / dimension); + vector2.add((i + 1) * 1f / dimension); + } + String vectorAsString = vector.toString(); + String vector2AsString = vector2.toString(); + + try (QueryStepDataSource datasource = + dataSourceProvider.createDataSourceImplementation(config); + VectorDatabaseWriter writer = + new AstraVectorDBWriter().createImplementation(config)) { + datasource.initialize(config); + + AssetManagerProvider assetsManagerProvider = new AstraVectorDBAssetsManagerProvider(); + try (AssetManager tableManager = + assetsManagerProvider.createInstance("astra-collection"); ) { + AssetDefinition assetDefinition = new AssetDefinition(); + assetDefinition.setAssetType("astra-collection"); + assetDefinition.setConfig( + Map.of( + "collection-name", + collectionName, + "datasource", + Map.of("configuration", config), + "vector-dimension", + vector.size())); + tableManager.initialize(assetDefinition); + tableManager.deleteAssetIfExists(); + + assertFalse(tableManager.assetExists()); + tableManager.deployAsset(); + + List> fields = + List.of( + Map.of( + "name", + "id", + "expression", + "fn:concat(key.name,'-',key.chunk_id)"), + Map.of("name", "name", "expression", "key.name"), + Map.of("name", "chunk_id", "expression", "key.chunk_id"), + Map.of( + "name", + "vector", + "expression", + "fn:toListOfFloat(value.vector)"), + Map.of("name", "text", "expression", "value.text")); + + writer.initialise(Map.of("collection-name", collectionName, "fields", fields)); + + // INSERT + + String name = "do'c1"; + + // the PK contains a single quote in order to test escaping values in deletion + SimpleRecord record = + SimpleRecord.of( + "{\"name\": \"%s\", \"chunk_id\": 1}".formatted(name), + """ + { + "vector": %s, + "text": "Lorem ipsum..." + } + """ + .formatted(vectorAsString)); + writer.upsert(record, Map.of()).get(); + + String similarityQuery = + """ + { + "collection-name": "%s", + "vector": ?, + "max": 10 + } + """ + .formatted(collectionName); + + // QUERY, SIMILARITY SEARCH + assertContents( + datasource, + similarityQuery, + List.of(vector), + results -> { + assertEquals(1, results.size()); + assertEquals(name, results.get(0).get("name")); + assertEquals("Lorem ipsum...", results.get(0).get("text")); + }); + + // QUERY, SIMILARITY SEARCH WITH METADATA FILTERING + String similarityQueryWithFilterOnName = + """ + { + "collection-name": "%s", + "vector": ?, + "max": 10, + "filter": { + "name": ? + } + } + """ + .formatted(collectionName); + assertContents( + datasource, + similarityQueryWithFilterOnName, + List.of(vector, name), + results -> { + assertEquals(1, results.size()); + assertEquals(name, results.get(0).get("name")); + assertEquals("Lorem ipsum...", results.get(0).get("text")); + }); + + assertContents( + datasource, + """ + { + "collection-name": "%s", + "vector": ?, + "max": 10, + "filter": { + "name": ? + }, + "select": ["text"], + "include-similarity": true + } + """ + .formatted(collectionName), + List.of(vector, name), + results -> { + assertEquals(1, results.size()); + assertNull(results.get(0).get("name")); + assertEquals("Lorem ipsum...", results.get(0).get("text")); + assertNotNull(results.get(0).get("similarity")); + assertNull(results.get(0).get("vector")); + }); + + assertContents( + datasource, + """ + { + "collection-name": "%s", + "vector": ?, + "max": 10, + "filter": { + "name": ? + }, + "select": ["text"], + "include-similarity": false + } + """ + .formatted(collectionName), + List.of(vector, name), + results -> { + assertEquals(1, results.size()); + assertNull(results.get(0).get("name")); + assertEquals("Lorem ipsum...", results.get(0).get("text")); + assertNull(results.get(0).get("similarity")); + assertNull(results.get(0).get("vector")); + }); + + assertContents( + datasource, + """ + { + "collection-name": "%s", + "vector": ?, + "max": 10, + "filter": { + "name": ? + }, + "select": ["text", "$vector"], + "include-similarity": false + } + """ + .formatted(collectionName), + List.of(vector, name), + results -> { + assertEquals(1, results.size()); + assertNull(results.get(0).get("name")); + assertEquals("Lorem ipsum...", results.get(0).get("text")); + assertNull(results.get(0).get("similarity")); + assertNotNull(results.get(0).get("vector")); + }); + + String queryWithFilterOnName = + """ + { + "collection-name": "%s", + "max": 10, + "filter": { + "name": ? + } + } + """ + .formatted(collectionName); + + // QUERY, WITH METADATA FILTERING + assertContents( + datasource, + queryWithFilterOnName, + List.of(name), + results -> { + assertEquals(1, results.size()); + }); + + // QUERY, WITH METADATA FILTERING, NO RESULTS + assertContents( + datasource, + queryWithFilterOnName, + List.of("bad-name"), + results -> { + assertEquals(0, results.size()); + }); + + // UPDATE + SimpleRecord recordUpdated = + SimpleRecord.of( + "{\"name\": \"%s\", \"chunk_id\": 1}".formatted(name), + """ + { + "vector": %s, + "text": "Lorem ipsum changed..." + } + """ + .formatted(vector2AsString)); + writer.upsert(recordUpdated, Map.of()).get(); + + assertContents( + datasource, + similarityQuery, + List.of(vector2), + results -> { + log.info("Results: {}", results); + assertEquals(1, results.size()); + assertEquals(name, results.get(0).get("name")); + assertEquals("Lorem ipsum changed...", results.get(0).get("text")); + }); + + // DELETE + SimpleRecord recordDelete = + SimpleRecord.of( + "{\"name\": \"%s\", \"chunk_id\": 1}".formatted(name), null); + writer.upsert(recordDelete, Map.of()).get(); + + assertContents( + datasource, + similarityQuery, + List.of(vector2), + result -> { + assertEquals(0, result.size()); + }); + + Map executeInsertRes = + executeStatement( + datasource, + """ + { + "action": "insertOne", + "collection-name": "%s", + "document": { + "id": "some-id", + "name": ?, + "vector": ?, + "text": "Some text" + } + } + """ + .formatted(collectionName), + List.of("some", vector)); + assertEquals("some-id", executeInsertRes.get("id")); + + assertContents( + datasource, + queryWithFilterOnName, + List.of("some"), + result -> { + assertEquals(1, result.size()); + assertEquals("Some text", result.get(0).get("text")); + }); + + executeStatement( + datasource, + """ + { + "action": "findOneAndUpdate", + "collection-name": "%s", + "filter": { + "_id": ? + }, + "update": { + "$set": { + "text": ? + } + } + } + """ + .formatted(collectionName), + List.of("some-id", "new value")); + + assertContents( + datasource, + queryWithFilterOnName, + List.of("some"), + result -> { + assertEquals(1, result.size()); + assertEquals("new value", result.get(0).get("text")); + }); + + executeStatement( + datasource, + """ + { + "action": "deleteOne", + "collection-name": "%s", + "filter": { + "_id": ? + } + } + """ + .formatted(collectionName), + List.of("some-id")); + + assertContents( + datasource, + queryWithFilterOnName, + List.of("some"), + result -> { + assertEquals(0, result.size()); + }); + + // CLEANUP + assertTrue(tableManager.assetExists()); + tableManager.deleteAssetIfExists(); + } + } + } + + private static List> assertContents( + QueryStepDataSource datasource, + String query, + List params, + Consumer>> assertion) { + log.info("Query: {}", query); + log.info("Params: {}", params); + List> results = datasource.fetchData(query, params); + log.info("Result: {}", results); + assertion.accept(results); + ; + return results; + } + + private static Map executeStatement( + QueryStepDataSource datasource, String query, List params) { + log.info("Query: {}", query); + log.info("Params: {}", params); + Map results = datasource.executeStatement(query, null, params); + log.info("Result: {}", results); + return results; + } +} diff --git a/langstream-api/src/main/java/ai/langstream/api/database/VectorDatabaseWriter.java b/langstream-api/src/main/java/ai/langstream/api/database/VectorDatabaseWriter.java index f890fa5a4..fb9715af2 100644 --- a/langstream-api/src/main/java/ai/langstream/api/database/VectorDatabaseWriter.java +++ b/langstream-api/src/main/java/ai/langstream/api/database/VectorDatabaseWriter.java @@ -23,7 +23,7 @@ * This is the interface for writing to a vector database. this interface is really simple by * intention. For advanced usages users should use Kafka Connect connectors. */ -public interface VectorDatabaseWriter { +public interface VectorDatabaseWriter extends AutoCloseable { default void initialise(Map agentConfiguration) throws Exception {} diff --git a/langstream-core/src/main/java/ai/langstream/impl/assets/AstraVectorDBAssetsProvider.java b/langstream-core/src/main/java/ai/langstream/impl/assets/AstraVectorDBAssetsProvider.java new file mode 100644 index 000000000..881c6bec1 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/assets/AstraVectorDBAssetsProvider.java @@ -0,0 +1,98 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.assets; + +import ai.langstream.api.doc.AssetConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.util.ConfigurationUtils; +import ai.langstream.impl.common.AbstractAssetProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.Set; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraVectorDBAssetsProvider extends AbstractAssetProvider { + + protected static final String ASTRA_COLLECTION = "astra-collection"; + + public AstraVectorDBAssetsProvider() { + super(Set.of(ASTRA_COLLECTION)); + } + + @Override + protected Class getAssetConfigModelClass(String type) { + return switch (type) { + case ASTRA_COLLECTION -> AstraCollectionConfig.class; + default -> throw new IllegalArgumentException("Unknown asset type " + type); + }; + } + + @Override + protected void validateAsset(AssetDefinition assetDefinition, Map asset) { + Map configuration = ConfigurationUtils.getMap("config", null, asset); + final Map datasource = + ConfigurationUtils.getMap("datasource", Map.of(), configuration); + final Map datasourceConfiguration = + ConfigurationUtils.getMap("configuration", Map.of(), datasource); + switch (assetDefinition.getAssetType()) { + default -> {} + } + } + + @Override + protected boolean lookupResource(String fieldName) { + return "datasource".equals(fieldName); + } + + @AssetConfig( + name = "Astra Collection", + description = + """ + Manage a DataStax Astra Collection. + """) + @Data + public static class AstraCollectionConfig { + + @ConfigProperty( + description = + """ + Reference to a datasource id configured in the application. + """, + required = true) + private String datasource; + + @ConfigProperty( + description = + """ + Name of the collection to create. + """, + required = true) + @JsonProperty("collection-name") + private String collectionName; + + @ConfigProperty( + description = + """ + Size of the vector. + """, + required = true) + @JsonProperty("vector-dimension") + private int vectorDimension; + } +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/assets/CassandraAssetsProvider.java b/langstream-core/src/main/java/ai/langstream/impl/assets/CassandraAssetsProvider.java index de34e8195..0681e2e1c 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/assets/CassandraAssetsProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/assets/CassandraAssetsProvider.java @@ -71,15 +71,22 @@ protected void validateAsset(AssetDefinition assetDefinition, Map { if (!datasourceConfiguration.containsKey("secureBundle") - && !datasourceConfiguration.containsKey("database")) { + && !datasourceConfiguration.containsKey("database") + && !datasourceConfiguration.containsKey("database-id")) { throw new IllegalArgumentException( "Use cassandra-keyspace for a standard Cassandra service (not AstraDB)"); } // are we are using the AstraDB SDK we need also the AstraCS token and // the name of the database requiredNonEmptyField(datasourceConfiguration, "token", describe(assetDefinition)); - requiredNonEmptyField( - datasourceConfiguration, "database", describe(assetDefinition)); + if (!datasourceConfiguration.containsKey("database")) { + requiredNonEmptyField( + datasourceConfiguration, "database-id", describe(assetDefinition)); + } + if (!datasourceConfiguration.containsKey("database-id")) { + requiredNonEmptyField( + datasourceConfiguration, "database", describe(assetDefinition)); + } } default -> {} } diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java index 20d9aa633..7091cdfbf 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java @@ -16,6 +16,7 @@ package ai.langstream.impl.resources; import ai.langstream.impl.resources.datasource.AstraDatasourceConfig; +import ai.langstream.impl.resources.datasource.AstraVectorDBDatasourceConfig; import ai.langstream.impl.resources.datasource.CassandraDatasourceConfig; import ai.langstream.impl.resources.datasource.JDBCDatasourceConfig; import ai.langstream.impl.resources.datasource.OpenSearchDatasourceConfig; @@ -24,6 +25,8 @@ public class DataSourceResourceProvider extends BaseDataSourceResourceProvider { protected static final String SERVICE_ASTRA = "astra"; + + protected static final String SERVICE_ASTRA_VECTOR_DB = "astra-vector-db"; protected static final String SERVICE_CASSANDRA = "cassandra"; protected static final String SERVICE_JDBC = "jdbc"; protected static final String SERVICE_OPENSEARCH = "opensearch"; @@ -35,6 +38,7 @@ public DataSourceResourceProvider() { SERVICE_ASTRA, AstraDatasourceConfig.CONFIG, SERVICE_CASSANDRA, CassandraDatasourceConfig.CONFIG, SERVICE_JDBC, JDBCDatasourceConfig.CONFIG, - SERVICE_OPENSEARCH, OpenSearchDatasourceConfig.CONFIG)); + SERVICE_OPENSEARCH, OpenSearchDatasourceConfig.CONFIG, + SERVICE_ASTRA_VECTOR_DB, AstraVectorDBDatasourceConfig.CONFIG)); } } diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java index 77b58d508..a977ff53f 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java @@ -16,6 +16,7 @@ package ai.langstream.impl.resources; import ai.langstream.impl.resources.datasource.AstraDatasourceConfig; +import ai.langstream.impl.resources.datasource.AstraVectorDBDatasourceConfig; import ai.langstream.impl.resources.datasource.CassandraDatasourceConfig; import ai.langstream.impl.resources.datasource.MilvusDatasourceConfig; import ai.langstream.impl.resources.datasource.OpenSearchDatasourceConfig; @@ -34,6 +35,7 @@ public VectorDatabaseResourceProvider() { "pinecone", PineconeDatasourceConfig.CONFIG, "milvus", MilvusDatasourceConfig.CONFIG, "solr", SolrDatasourceConfig.CONFIG, + "astra-vector-db", AstraVectorDBDatasourceConfig.CONFIG, "opensearch", OpenSearchDatasourceConfig.CONFIG)); } } diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraDatasourceConfig.java b/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraDatasourceConfig.java index 93b2bf318..f96e258de 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraDatasourceConfig.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraDatasourceConfig.java @@ -48,7 +48,8 @@ public void validate(Resource resource) { ConfigurationUtils.getString("secureBundle", "", configuration); if (secureBundle.isEmpty()) { if (configuration.get("token") == null - || configuration.get("database") == null) { + || (configuration.get("database") == null + && configuration.get("database-id") == null)) { throw new IllegalArgumentException( ClassConfigValidator.formatErrString( new ClassConfigValidator.ResourceEntityRef(resource), diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraVectorDBDatasourceConfig.java b/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraVectorDBDatasourceConfig.java new file mode 100644 index 000000000..7c1089fca --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/datasource/AstraVectorDBDatasourceConfig.java @@ -0,0 +1,62 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.resources.datasource; + +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.doc.ResourceConfig; +import ai.langstream.api.model.Resource; +import ai.langstream.impl.resources.BaseDataSourceResourceProvider; +import ai.langstream.impl.uti.ClassConfigValidator; +import lombok.Data; + +@Data +@ResourceConfig( + name = "Astra Vector DB", + description = "Connect to DataStax Astra Vector DB service.") +public class AstraVectorDBDatasourceConfig extends BaseDatasourceConfig { + + public static final BaseDataSourceResourceProvider.DatasourceConfig CONFIG = + new BaseDataSourceResourceProvider.DatasourceConfig() { + + @Override + public Class getResourceConfigModelClass() { + return AstraVectorDBDatasourceConfig.class; + } + + @Override + public void validate(Resource resource) { + ClassConfigValidator.validateResourceModelFromClass( + resource, + AstraVectorDBDatasourceConfig.class, + resource.configuration(), + false); + } + }; + + @ConfigProperty( + description = + """ + API Endpoint. + """) + private String endpoint; + + @ConfigProperty( + description = + """ + Astra Token (AstraCS:xxx) for connecting to the database. + """) + private String token; +} diff --git a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider index f47198f79..3a5d6610a 100644 --- a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider +++ b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider @@ -2,4 +2,5 @@ ai.langstream.impl.assets.CassandraAssetsProvider ai.langstream.impl.assets.MilvusAssetsProvider ai.langstream.impl.assets.JdbcAssetsProvider ai.langstream.impl.assets.SolrAssetsProvider -ai.langstream.impl.assets.OpenSearchAssetsProvider \ No newline at end of file +ai.langstream.impl.assets.OpenSearchAssetsProvider +ai.langstream.impl.assets.AstraVectorDBAssetsProvider \ No newline at end of file diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/FlowControlAgentsProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/FlowControlAgentsProvider.java index caa35feca..38e47fce8 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/FlowControlAgentsProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/FlowControlAgentsProvider.java @@ -174,6 +174,13 @@ public static class LogEventProcessorConfig { """) List fields; + @ConfigProperty( + description = + """ + Template for a log message to print (Mustache). + """) + String message; + @ConfigProperty( description = """ diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java index 219d30e22..122a6785f 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java @@ -31,6 +31,7 @@ import ai.langstream.impl.agents.ai.steps.QueryConfiguration; import ai.langstream.impl.uti.ClassConfigValidator; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import ai.langstream.runtime.impl.k8s.agents.vectors.AstraVectorDBVectorDatabaseWriterConfig; import ai.langstream.runtime.impl.k8s.agents.vectors.CassandraVectorDatabaseWriterConfig; import ai.langstream.runtime.impl.k8s.agents.vectors.JDBCVectorDatabaseWriterConfig; import ai.langstream.runtime.impl.k8s.agents.vectors.MilvusVectorDatabaseWriterConfig; @@ -80,7 +81,8 @@ public abstract static class VectorDatabaseWriterConfig { "pinecone", PineconeVectorDatabaseWriterConfig.INSTANCE, "opensearch", OpenSearchVectorDatabaseWriterConfig.INSTANCE, "solr", SolrVectorDatabaseWriterConfig.INSTANCE, - "milvus", MilvusVectorDatabaseWriterConfig.INSTANCE); + "milvus", MilvusVectorDatabaseWriterConfig.INSTANCE, + "astra-vector-db", AstraVectorDBVectorDatabaseWriterConfig.INSTANCE); public QueryVectorDBAgentProvider() { super( diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/AstraVectorDBVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/AstraVectorDBVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..ecb1792cc --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/AstraVectorDBVectorDatabaseWriterConfig.java @@ -0,0 +1,70 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import lombok.Data; + +@Data +@AgentConfig( + name = "Astra Vector DB", + description = + """ +Writes data to Apache Cassandra. +All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html +""") +public class AstraVectorDBVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + public static final QueryVectorDBAgentProvider.VectorDatabaseWriterConfig INSTANCE = + new AstraVectorDBVectorDatabaseWriterConfig(); + + @Override + public Class getAgentConfigModelClass() { + return AstraVectorDBVectorDatabaseWriterConfig.class; + } + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return true; + } + + @ConfigProperty( + description = + "The name of the collection to write to. The collection must already exist.", + required = true) + @JsonProperty("collection-name") + String collectionName; + + @Data + public static class CollectionField { + + @ConfigProperty(description = "Field name", required = true) + String name; + + @ConfigProperty( + description = "JSTL Expression for computing the field value.", + required = true) + String expression; + } + + @ConfigProperty(description = "Fields of the collection to write to.", required = true) + List fields; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java index 360e52df3..f3b26de78 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java @@ -141,383 +141,422 @@ public void testDocumentation() { Assertions.assertEquals( """ - { - "query-vector-db" : { - "name" : "Query a vector database", - "description" : "Query a vector database using Vector Search capabilities.", - "properties" : { - "composable" : { - "description" : "Whether this step can be composed with other steps.", - "required" : false, - "type" : "boolean", - "defaultValue" : "true" - }, - "datasource" : { - "description" : "Reference to a datasource id configured in the application.", - "required" : true, - "type" : "string" - }, - "fields" : { - "description" : "Fields of the record to use as input parameters for the query.", - "required" : false, - "type" : "array", - "items" : { - "description" : "Fields of the record to use as input parameters for the query.", - "required" : false, - "type" : "string", - "extendedValidationType" : "EL_EXPRESSION" - }, - "extendedValidationType" : "EL_EXPRESSION" - }, - "generated-keys" : { - "description" : "List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.", - "required" : false, - "type" : "array", - "items" : { - "description" : "List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.", - "required" : false, - "type" : "string" - } - }, - "loop-over" : { - "description" : "Loop over a list of items taken from the record. For instance value.documents.\\nIt must refer to a list of maps. In this case the output-field parameter\\nbut be like \\"record.fieldname\\" in order to replace or set a field in each record\\nwith the results of the query. In the query parameters you can refer to the\\nrecord fields using \\"record.field\\".", - "required" : false, - "type" : "string", - "extendedValidationType" : "EL_EXPRESSION" - }, - "mode" : { - "description" : "Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).", - "required" : false, - "type" : "string", - "defaultValue" : "query" - }, - "only-first" : { - "description" : "If true, only the first result of the query is stored in the output field.", - "required" : false, - "type" : "boolean", - "defaultValue" : "false" - }, - "output-field" : { - "description" : "The name of the field to use to store the query result.", - "required" : true, - "type" : "string" - }, - "query" : { - "description" : "The query to use to extract the data.", - "required" : true, - "type" : "string" - }, - "when" : { - "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", - "required" : false, - "type" : "string" - } - } - }, - "vector-db-sink_astra" : { - "type" : "vector-db-sink", - "name" : "Astra", - "description" : "Writes data to DataStax Astra service.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", - "properties" : { - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'.", - "required" : true, - "type" : "string" - }, - "keyspace" : { - "description" : "The keyspace of the table to write to.", - "required" : false, - "type" : "string" - }, - "mapping" : { - "description" : "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", - "required" : true, - "type" : "string" - }, - "table-name" : { - "description" : "The name of the table to write to. The table must already exist.", - "required" : true, - "type" : "string" - } - } - }, - "vector-db-sink_cassandra" : { - "type" : "vector-db-sink", - "name" : "Cassandra", - "description" : "Writes data to Apache Cassandra.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", - "properties" : { - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'.", - "required" : true, - "type" : "string" - }, - "keyspace" : { - "description" : "The keyspace of the table to write to.", - "required" : false, - "type" : "string" - }, - "mapping" : { - "description" : "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", - "required" : true, - "type" : "string" - }, - "table-name" : { - "description" : "The name of the table to write to. The table must already exist.", - "required" : true, - "type" : "string" - } - } - }, - "vector-db-sink_jdbc" : { - "type" : "vector-db-sink", - "name" : "JDBC", - "description" : "Writes data to any JDBC compatible database.", - "properties" : { - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'.", - "required" : true, - "type" : "string" - }, - "fields" : { - "description" : "Fields of the table to write to.", - "required" : true, - "type" : "array", - "items" : { - "description" : "Fields of the table to write to.", - "required" : true, - "type" : "object", - "properties" : { - "expression" : { - "description" : "JSTL Expression for computing the field value.", - "required" : true, - "type" : "string" - }, - "name" : { - "description" : "Field name", - "required" : true, - "type" : "string" - }, - "primary-key" : { - "description" : "Is this field part of the primary key?", - "required" : false, - "type" : "boolean", - "defaultValue" : "false" - } - } - } - }, - "table-name" : { - "description" : "The name of the table to write to. The table must already exist.", - "required" : true, - "type" : "string" - } - } - }, - "vector-db-sink_milvus" : { - "type" : "vector-db-sink", - "name" : "Milvus", - "description" : "Writes data to Milvus/Zillis service.", - "properties" : { - "collection-name" : { - "description" : "Collection name", - "required" : false, - "type" : "string" - }, - "database-name" : { - "description" : "Collection name", - "required" : false, - "type" : "string" - }, - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'.", - "required" : true, - "type" : "string" - }, - "fields" : { - "description" : "Fields definition.", - "required" : true, - "type" : "array", - "items" : { - "description" : "Fields definition.", - "required" : true, - "type" : "object", - "properties" : { - "expression" : { - "description" : "JSTL Expression for computing the field value.", - "required" : true, - "type" : "string" - }, - "name" : { - "description" : "Field name", - "required" : true, - "type" : "string" - } - } - } - } - } - }, - "vector-db-sink_opensearch" : { - "type" : "vector-db-sink", - "name" : "OpenSearch", - "description" : "Writes data to OpenSearch or AWS OpenSearch serverless.", - "properties" : { - "batch-size" : { - "description" : "Batch size for bulk operations. Hitting the batch size will trigger a flush.", - "required" : false, - "type" : "integer", - "defaultValue" : "10" - }, - "bulk-parameters" : { - "description" : "OpenSearch bulk URL parameters.", - "required" : false, - "type" : "object", - "properties" : { - "pipeline" : { - "description" : "The pipeline ID for preprocessing documents.\\nRefer to the OpenSearch documentation for more details.", - "required" : false, - "type" : "string" - }, - "refresh" : { - "description" : "Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer.\\nNote that AWS OpenSearch supports only false.\\nRefer to the OpenSearch documentation for more details.", - "required" : false, - "type" : "string" - }, - "require_alias" : { - "description" : "Set to true to require that all actions target an index alias rather than an index.\\nRefer to the OpenSearch documentation for more details.", - "required" : false, - "type" : "boolean" - }, - "routing" : { - "description" : "Routes the request to the specified shard.\\nRefer to the OpenSearch documentation for more details.", - "required" : false, - "type" : "string" - }, - "timeout" : { - "description" : "How long to wait for the request to return.\\nRefer to the OpenSearch documentation for more details.", - "required" : false, - "type" : "string" - }, - "wait_for_active_shards" : { - "description" : "Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed.\\nRefer to the OpenSearch documentation for more details.", - "required" : false, - "type" : "string" - } - } - }, - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'.", - "required" : true, - "type" : "string" - }, - "fields" : { - "description" : "Index fields definition.", - "required" : true, - "type" : "array", - "items" : { - "description" : "Index fields definition.", - "required" : true, - "type" : "object", - "properties" : { - "expression" : { - "description" : "JSTL Expression for computing the field value.", - "required" : true, - "type" : "string" - }, - "name" : { - "description" : "Field name", - "required" : true, - "type" : "string" - } - } - } - }, - "flush-interval" : { - "description" : "Flush interval in milliseconds", - "required" : false, - "type" : "integer", - "defaultValue" : "1000" - }, - "id" : { - "description" : "JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.", - "required" : false, - "type" : "string" - } - } - }, - "vector-db-sink_pinecone" : { - "type" : "vector-db-sink", - "name" : "Pinecone", - "description" : "Writes data to Pinecone service.\\n To add metadata fields you can add vector.metadata.my-field: \\"value.my-field\\". The value is a JSTL Expression to compute the actual value.", - "properties" : { - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'.", - "required" : true, - "type" : "string" - }, - "vector.id" : { - "description" : "JSTL Expression to compute the id.", - "required" : false, - "type" : "string" - }, - "vector.metadata" : { - "description" : "Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.", - "required" : false, - "type" : "object" - }, - "vector.namespace" : { - "description" : "JSTL Expression to compute the namespace.", - "required" : false, - "type" : "string" - }, - "vector.vector" : { - "description" : "JSTL Expression to compute the vector.", - "required" : false, - "type" : "string" - } - } - }, - "vector-db-sink_solr" : { - "type" : "vector-db-sink", - "name" : "Apache Solr", - "description" : "Writes data to Apache Solr service.\\n The collection-name is configured at datasource level.", - "properties" : { - "commit-within" : { - "description" : "Commit within option", - "required" : false, - "type" : "integer", - "defaultValue" : "1000" - }, - "datasource" : { - "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'.", - "required" : true, - "type" : "string" - }, - "fields" : { - "description" : "Fields definition.", - "required" : true, - "type" : "array", - "items" : { - "description" : "Fields definition.", - "required" : true, - "type" : "object", - "properties" : { - "expression" : { - "description" : "JSTL Expression for computing the field value.", - "required" : true, - "type" : "string" - }, - "name" : { - "description" : "Field name", - "required" : true, - "type" : "string" - } - } - } - } - } - } - }""", + { + "query-vector-db" : { + "name" : "Query a vector database", + "description" : "Query a vector database using Vector Search capabilities.", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "datasource" : { + "description" : "Reference to a datasource id configured in the application.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields of the record to use as input parameters for the query.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Fields of the record to use as input parameters for the query.", + "required" : false, + "type" : "string", + "extendedValidationType" : "EL_EXPRESSION" + }, + "extendedValidationType" : "EL_EXPRESSION" + }, + "generated-keys" : { + "description" : "List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.", + "required" : false, + "type" : "array", + "items" : { + "description" : "List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.", + "required" : false, + "type" : "string" + } + }, + "loop-over" : { + "description" : "Loop over a list of items taken from the record. For instance value.documents.\\nIt must refer to a list of maps. In this case the output-field parameter\\nbut be like \\"record.fieldname\\" in order to replace or set a field in each record\\nwith the results of the query. In the query parameters you can refer to the\\nrecord fields using \\"record.field\\".", + "required" : false, + "type" : "string", + "extendedValidationType" : "EL_EXPRESSION" + }, + "mode" : { + "description" : "Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).", + "required" : false, + "type" : "string", + "defaultValue" : "query" + }, + "only-first" : { + "description" : "If true, only the first result of the query is stored in the output field.", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + }, + "output-field" : { + "description" : "The name of the field to use to store the query result.", + "required" : true, + "type" : "string" + }, + "query" : { + "description" : "The query to use to extract the data.", + "required" : true, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "vector-db-sink_astra" : { + "type" : "vector-db-sink", + "name" : "Astra", + "description" : "Writes data to DataStax Astra service.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'.", + "required" : true, + "type" : "string" + }, + "keyspace" : { + "description" : "The keyspace of the table to write to.", + "required" : false, + "type" : "string" + }, + "mapping" : { + "description" : "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", + "required" : true, + "type" : "string" + }, + "table-name" : { + "description" : "The name of the table to write to. The table must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_astra-vector-db" : { + "type" : "vector-db-sink", + "name" : "Astra Vector DB", + "description" : "Writes data to Apache Cassandra.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", + "properties" : { + "collection-name" : { + "description" : "The name of the collection to write to. The collection must already exist.", + "required" : true, + "type" : "string" + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra-vector-db'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields of the collection to write to.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields of the collection to write to.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } + } + } + }, + "vector-db-sink_cassandra" : { + "type" : "vector-db-sink", + "name" : "Cassandra", + "description" : "Writes data to Apache Cassandra.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'.", + "required" : true, + "type" : "string" + }, + "keyspace" : { + "description" : "The keyspace of the table to write to.", + "required" : false, + "type" : "string" + }, + "mapping" : { + "description" : "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", + "required" : true, + "type" : "string" + }, + "table-name" : { + "description" : "The name of the table to write to. The table must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_jdbc" : { + "type" : "vector-db-sink", + "name" : "JDBC", + "description" : "Writes data to any JDBC compatible database.", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields of the table to write to.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields of the table to write to.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + }, + "primary-key" : { + "description" : "Is this field part of the primary key?", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + } + } + } + }, + "table-name" : { + "description" : "The name of the table to write to. The table must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_milvus" : { + "type" : "vector-db-sink", + "name" : "Milvus", + "description" : "Writes data to Milvus/Zillis service.", + "properties" : { + "collection-name" : { + "description" : "Collection name", + "required" : false, + "type" : "string" + }, + "database-name" : { + "description" : "Collection name", + "required" : false, + "type" : "string" + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields definition.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields definition.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } + } + } + }, + "vector-db-sink_opensearch" : { + "type" : "vector-db-sink", + "name" : "OpenSearch", + "description" : "Writes data to OpenSearch or AWS OpenSearch serverless.", + "properties" : { + "batch-size" : { + "description" : "Batch size for bulk operations. Hitting the batch size will trigger a flush.", + "required" : false, + "type" : "integer", + "defaultValue" : "10" + }, + "bulk-parameters" : { + "description" : "OpenSearch bulk URL parameters.", + "required" : false, + "type" : "object", + "properties" : { + "pipeline" : { + "description" : "The pipeline ID for preprocessing documents.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "refresh" : { + "description" : "Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer.\\nNote that AWS OpenSearch supports only false.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "require_alias" : { + "description" : "Set to true to require that all actions target an index alias rather than an index.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "boolean" + }, + "routing" : { + "description" : "Routes the request to the specified shard.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "timeout" : { + "description" : "How long to wait for the request to return.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "wait_for_active_shards" : { + "description" : "Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + } + } + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Index fields definition.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Index fields definition.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } + }, + "flush-interval" : { + "description" : "Flush interval in milliseconds", + "required" : false, + "type" : "integer", + "defaultValue" : "1000" + }, + "id" : { + "description" : "JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.", + "required" : false, + "type" : "string" + } + } + }, + "vector-db-sink_pinecone" : { + "type" : "vector-db-sink", + "name" : "Pinecone", + "description" : "Writes data to Pinecone service.\\n To add metadata fields you can add vector.metadata.my-field: \\"value.my-field\\". The value is a JSTL Expression to compute the actual value.", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'.", + "required" : true, + "type" : "string" + }, + "vector.id" : { + "description" : "JSTL Expression to compute the id.", + "required" : false, + "type" : "string" + }, + "vector.metadata" : { + "description" : "Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.", + "required" : false, + "type" : "object" + }, + "vector.namespace" : { + "description" : "JSTL Expression to compute the namespace.", + "required" : false, + "type" : "string" + }, + "vector.vector" : { + "description" : "JSTL Expression to compute the vector.", + "required" : false, + "type" : "string" + } + } + }, + "vector-db-sink_solr" : { + "type" : "vector-db-sink", + "name" : "Apache Solr", + "description" : "Writes data to Apache Solr service.\\n The collection-name is configured at datasource level.", + "properties" : { + "commit-within" : { + "description" : "Commit within option", + "required" : false, + "type" : "integer", + "defaultValue" : "1000" + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields definition.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields definition.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } + } + } + } + }""", SerializationUtil.prettyPrintJson(model)); } } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraVectorDBAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraVectorDBAssetQueryWriteIT.java new file mode 100644 index 000000000..6e57ab1a5 --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraVectorDBAssetQueryWriteIT.java @@ -0,0 +1,134 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.kafka; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +@Slf4j +@Disabled +class AstraVectorDBAssetQueryWriteIT extends AbstractKafkaApplicationRunner { + + static final String SECRETS_PATH = ""; + + @Test + public void testAstra() throws Exception { + String tenant = "tenant"; + String[] expectedAgents = {"app-step1"}; + + String secrets = Files.readString(Paths.get(SECRETS_PATH)); + + Map application = + Map.of( + "configuration.yaml", + """ + configuration: + resources: + - type: "vector-database" + name: "AstraDBDatasource" + configuration: + service: "astra-vector-db" + token: "${ secrets.astra.token }" + endpoint: "${ secrets.astra.endpoint }" + """, + "pipeline.yaml", + """ + assets: + - name: "documents-collection" + asset-type: "astra-collection" + creation-mode: create-if-not-exists + deletion-mode: delete + config: + collection-name: "documents" + datasource: "AstraDBDatasource" + vector-dimension: 3 + topics: + - name: "input-topic" + creation-mode: create-if-not-exists + pipeline: + - id: step1 + name: "Write a document using a query" + type: "query-vector-db" + input: "input-topic" + configuration: + mode: execute + datasource: "AstraDBDatasource" + query: | + { + "action": "insertOne", + "collection-name": "documents", + "document": { + "id": "the-id", + "name": "A name", + "text": "A text", + "vector": [1,2,3] + } + } + output-field: "value.insertresult" + fields: + - "value.documentId" + - name: "Read the document using a query" + type: "query-vector-db" + configuration: + datasource: "AstraDBDatasource" + query: | + { + "collection-name": "documents", + "filter": { + "id": ? + }, + "vector": [1,2,3] + } + only-first: true + output-field: "value.queryresult" + fields: + - "value.insertresult.id" + - id: step2 + name: "Write a new record to Astra" + type: "vector-db-sink" + configuration: + datasource: "AstraDBDatasource" + collection-name: "documents" + fields: + - name: "id" + expression: "fn:toString('new-id')" + - name: "vector" + expression: "value.queryresult.vector" + - name: "text" + expression: "value.queryresult.text" + - name: "name" + expression: "value.queryresult.name" + """); + + try (ApplicationRuntime applicationRuntime = + deployApplicationWithSecrets( + tenant, "app", application, buildInstanceYaml(), secrets, expectedAgents)) { + try (KafkaProducer producer = createProducer(); ) { + + sendMessage("input-topic", "{\"documentId\":1}", producer); + + executeAgentRunners(applicationRuntime); + } + + applicationDeployer.cleanup(tenant, applicationRuntime.implementation()); + } + } +}