From 56937dbed98091533ce9a70d220f214850857098 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 9 Nov 2023 11:21:26 +0100 Subject: [PATCH] [examples] Improve the S3 Source example (#706) --- dev/s3_upload.sh | 8 +- .../applications/docker-chatbot/crawler.yaml | 11 +- .../{text-processing => s3-source}/.gitignore | 0 examples/applications/s3-source/README.md | 40 +++++++ examples/applications/s3-source/chatbot.yaml | 101 ++++++++++++++++++ .../applications/s3-source/configuration.yaml | 38 +++++++ .../extract-text.yaml | 51 ++++++++- .../gateways.yaml | 21 +++- .../{text-processing => s3-source}/simple.pdf | Bin .../applications/text-processing/README.md | 83 -------------- .../text-processing/configuration.yaml | 34 ------ .../text-processing/write-to-astra.yaml | 51 --------- 12 files changed, 250 insertions(+), 188 deletions(-) rename examples/applications/{text-processing => s3-source}/.gitignore (100%) create mode 100644 examples/applications/s3-source/README.md create mode 100644 examples/applications/s3-source/chatbot.yaml create mode 100644 examples/applications/s3-source/configuration.yaml rename examples/applications/{text-processing => s3-source}/extract-text.yaml (63%) rename examples/applications/{text-processing => s3-source}/gateways.yaml (57%) rename examples/applications/{text-processing => s3-source}/simple.pdf (100%) delete mode 100644 examples/applications/text-processing/README.md delete mode 100644 examples/applications/text-processing/configuration.yaml delete mode 100644 examples/applications/text-processing/write-to-astra.yaml diff --git a/dev/s3_upload.sh b/dev/s3_upload.sh index 764a130b5..65412fc54 100755 --- a/dev/s3_upload.sh +++ b/dev/s3_upload.sh @@ -18,11 +18,11 @@ # Usage: ./minio-upload my-bucket my-file.zip -bucket=$1 -file=$2 +host=$1 +url=$2 +bucket=$3 +file=$4 -host=localhost -url=http://localhost:9000 s3_key=minioadmin s3_secret=minioadmin diff --git a/examples/applications/docker-chatbot/crawler.yaml b/examples/applications/docker-chatbot/crawler.yaml index a046d608b..a3374a463 100644 --- a/examples/applications/docker-chatbot/crawler.yaml +++ b/examples/applications/docker-chatbot/crawler.yaml @@ -42,21 +42,12 @@ pipeline: allowed-domains: ["https://docs.langstream.ai"] forbidden-paths: [] min-time-between-requests: 500 - reindex-interval-seconds: 3600 max-error-count: 5 max-urls: 1000 max-depth: 50 handle-robots-file: true - user-agent: "" # this is computed automatically, but you can override it scan-html-documents: true - http-timeout: 10000 - handle-cookies: true - max-unflushed-pages: 100 - bucketName: "${secrets.s3.bucket-name}" - endpoint: "${secrets.s3.endpoint}" - access-key: "${secrets.s3.access-key}" - secret-key: "${secrets.s3.secret}" - region: "${secrets.s3.region}" + state-storage: disk - name: "Extract text" type: "text-extractor" - name: "Normalise text" diff --git a/examples/applications/text-processing/.gitignore b/examples/applications/s3-source/.gitignore similarity index 100% rename from examples/applications/text-processing/.gitignore rename to examples/applications/s3-source/.gitignore diff --git a/examples/applications/s3-source/README.md b/examples/applications/s3-source/README.md new file mode 100644 index 000000000..a891c2e49 --- /dev/null +++ b/examples/applications/s3-source/README.md @@ -0,0 +1,40 @@ +# Preprocessing Text + +This sample application shows how to use some common NLP techniques to preprocess text data and then write the results to a Vector Database. + +We have two pipelines: + +The extract-text.yaml file defines a pipeline that will: + +- Extract text from document files (PDF, Word...) +- Detect the language and filter out non-English documents +- Normalize the text +- Split the text into chunks +- Write the chunks to a Vector Database, in this case DataStax Astra DB + +## Prerequisites + +Prepare some PDF files and upload them to a bucket in S3. + +## Deploy the LangStream application + +``` +./bin/langstream docker run test -app examples/applications/s3-source -s examples/secrets/secrets.yaml --docker-args="-p9900:9000" +``` + +Please note that here we are adding --docker-args="-p9900:9000" to expose the S3 API on port 9900. + + +## Write some documents in the S3 bucket + +``` +# Upload a document to the S3 bucket +dev/s3_upload.sh localhost http://localhost:9900 documents README.md +dev/s3_upload.sh localhost http://localhost:9900 documents examples/applications/s3-source/simple.pdf +``` + +## Interact with the Chatbot + +Now you can use the developer UI to ask questions to the chatbot about your documents. + +If you have uploaded the README file then you should be able to ask "what is LangStream ?" \ No newline at end of file diff --git a/examples/applications/s3-source/chatbot.yaml b/examples/applications/s3-source/chatbot.yaml new file mode 100644 index 000000000..0c31e2a66 --- /dev/null +++ b/examples/applications/s3-source/chatbot.yaml @@ -0,0 +1,101 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "questions-topic" + creation-mode: create-if-not-exists + - name: "answers-topic" + creation-mode: create-if-not-exists + - name: "log-topic" + creation-mode: create-if-not-exists +errors: + on-failure: "skip" +pipeline: + - name: "convert-to-structure" + type: "document-to-json" + input: "questions-topic" + configuration: + text-field: "question" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model + embeddings-field: "value.question_embeddings" + text: "{{ value.question }}" + flush-interval: 0 + - name: "lookup-related-documents" + type: "query-vector-db" + configuration: + datasource: "JdbcDatasource" + query: "SELECT text,embeddings_vector FROM documents ORDER BY cosine_similarity(embeddings_vector, CAST(? as FLOAT ARRAY)) DESC LIMIT 20" + fields: + - "value.question_embeddings" + output-field: "value.related_documents" + - name: "re-rank documents with MMR" + type: "re-rank" + configuration: + max: 5 # keep only the top 5 documents, because we have an hard limit on the prompt size + field: "value.related_documents" + query-text: "value.question" + query-embeddings: "value.question_embeddings" + output-field: "value.related_documents" + text-field: "record.text" + embeddings-field: "record.embeddings_vector" + algorithm: "MMR" + lambda: 0.5 + k1: 1.2 + b: 0.75 + - name: "ai-chat-completions" + type: "ai-chat-completions" + + configuration: + model: "${secrets.open-ai.chat-completions-model}" # This needs to be set to the model deployment name, not the base name + # on the log-topic we add a field with the answer + completion-field: "value.answer" + # we are also logging the prompt we sent to the LLM + log-field: "value.prompt" + # here we configure the streaming behavior + # as soon as the LLM answers with a chunk we send it to the answers-topic + stream-to-topic: "answers-topic" + # on the streaming answer we send the answer as whole message + # the 'value' syntax is used to refer to the whole value of the message + stream-response-completion-field: "value" + # we want to stream the answer as soon as we have 20 chunks + # in order to reduce latency for the first message the agent sends the first message + # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value + # eventually we want to send bigger messages to reduce the overhead of each message on the topic + min-chunks-per-message: 20 + messages: + - role: system + content: | + An user is going to perform a questions, The documents below may help you in answering to their questions. + Please try to leverage them in your answer as much as possible. + If you provide code or YAML snippets, please explicitly state that they are examples. + Do not provide information that is not related to the documents provided below. + + Documents: + {{# value.related_documents}} + {{ text}} + {{/ value.related_documents}} + - role: user + content: "{{ value.question}}" + - name: "cleanup-response" + type: "drop-fields" + output: "log-topic" + configuration: + fields: + - "question_embeddings" + - "related_documents" \ No newline at end of file diff --git a/examples/applications/s3-source/configuration.yaml b/examples/applications/s3-source/configuration.yaml new file mode 100644 index 000000000..15675fce2 --- /dev/null +++ b/examples/applications/s3-source/configuration.yaml @@ -0,0 +1,38 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "datasource" + name: "JdbcDatasource" + configuration: + service: "jdbc" + driverClass: "herddb.jdbc.Driver" + url: "${secrets.herddb.url}" + user: "${secrets.herddb.user}" + password: "${secrets.herddb.password}" + - type: "open-ai-configuration" + name: "OpenAI Azure configuration" + configuration: + url: "${secrets.open-ai.url}" + access-key: "${secrets.open-ai.access-key}" + provider: "${secrets.open-ai.provider}" + dependencies: + - name: "HerdDB.org JDBC Driver" + url: "https://repo1.maven.org/maven2/org/herddb/herddb-jdbc/0.28.0/herddb-jdbc-0.28.0-thin.jar" + sha512sum: "d8ea8fbb12eada8f860ed660cbc63d66659ab3506bc165c85c420889aa8a1dac53dab7906ef61c4415a038c5a034f0d75900543dd0013bdae50feafd46f51c8e" + type: "java-library" \ No newline at end of file diff --git a/examples/applications/text-processing/extract-text.yaml b/examples/applications/s3-source/extract-text.yaml similarity index 63% rename from examples/applications/text-processing/extract-text.yaml rename to examples/applications/s3-source/extract-text.yaml index e5a7ef8df..28d43eb8c 100644 --- a/examples/applications/text-processing/extract-text.yaml +++ b/examples/applications/s3-source/extract-text.yaml @@ -15,9 +15,23 @@ # name: "Extract and manipulate text" -topics: - - name: "chunks-topic" +assets: + - name: "documents-table" + asset-type: "jdbc-table" creation-mode: create-if-not-exists + config: + table-name: "documents" + datasource: "JdbcDatasource" + create-statements: + - | + CREATE TABLE documents ( + filename TEXT, + chunk_id int, + num_tokens int, + lang TEXT, + text TEXT, + embeddings_vector FLOATA, + PRIMARY KEY (filename, chunk_id)); pipeline: - name: "Read from S3" type: "s3-source" @@ -27,7 +41,6 @@ pipeline: access-key: "${secrets.s3.access-key}" secret-key: "${secrets.s3.secret}" region: "${secrets.s3.region}" - idle-time: 5 - name: "Extract text" type: "text-extractor" - name: "Normalise text" @@ -73,10 +86,40 @@ pipeline: - name: "compute-embeddings" id: "step1" type: "compute-ai-embeddings" - output: "chunks-topic" configuration: model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model embeddings-field: "value.embeddings_vector" text: "{{ value.text }}" batch-size: 10 flush-interval: 500 + - name: "Delete stale chunks" + type: "query" + configuration: + datasource: "JdbcDatasource" + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + mode: "execute" + query: "DELETE FROM documents WHERE filename = ? AND chunk_id > ?" + output-field: "value.delete-results" + fields: + - "value.filename" + - "fn:toInt(value.chunk_id)" + - name: "Write" + type: "vector-db-sink" + configuration: + datasource: "JdbcDatasource" + table-name: "documents" + fields: + - name: "filename" + expression: "value.filename" + primary-key: true + - name: "chunk_id" + expression: "value.chunk_id" + primary-key: true + - name: "embeddings_vector" + expression: "fn:toListOfFloat(value.embeddings_vector)" + - name: "lang" + expression: "value.language" + - name: "text" + expression: "value.text" + - name: "num_tokens" + expression: "value.chunk_num_tokens" \ No newline at end of file diff --git a/examples/applications/text-processing/gateways.yaml b/examples/applications/s3-source/gateways.yaml similarity index 57% rename from examples/applications/text-processing/gateways.yaml rename to examples/applications/s3-source/gateways.yaml index e5b0eb43d..eb49bdd4e 100644 --- a/examples/applications/text-processing/gateways.yaml +++ b/examples/applications/s3-source/gateways.yaml @@ -16,6 +16,23 @@ # gateways: - - id: "consume-chunks" + - id: "user-input" + type: produce + topic: "questions-topic" + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: "bot-output" type: consume - topic: "chunks-topic" + topic: "answers-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId diff --git a/examples/applications/text-processing/simple.pdf b/examples/applications/s3-source/simple.pdf similarity index 100% rename from examples/applications/text-processing/simple.pdf rename to examples/applications/s3-source/simple.pdf diff --git a/examples/applications/text-processing/README.md b/examples/applications/text-processing/README.md deleted file mode 100644 index 04bf05ef5..000000000 --- a/examples/applications/text-processing/README.md +++ /dev/null @@ -1,83 +0,0 @@ -# Preprocessing Text - -This sample application shows how to use some common NLP techniques to preprocess text data and then write the results to a Vector Database. - -We have two pipelines: - -The extract-text.yaml file defines a pipeline that will: - -- Extract text from document files (PDF, Word...) -- Detect the language and filter out non-English documents -- Normalize the text -- Split the text into chunks - -The write-to-db.yaml file defines a pipeline that will: -- Write the chunks to a Vector Database, in this case DataStax Astra DB - -You could write a single pipeline file, but in this example we are keeping them as separate files -for demonstration purposes. - -When you deploy the application all the files are deployed to the cluster as a single unit. - -## Prerequisites - -Prepare some PDF files and upload them to a bucket in S3. - -Create a table in Astra DB with the following schema. -This example assumes that you have a KEYSPACE named `documents` and a TABLE named `documents`. - -``` -USE documents; -CREATE TABLE IF NOT EXISTS documents ( - filename TEXT, - chunk_id int, - num_tokens int, - language TEXT, - text TEXT, - embeddings_vector VECTOR, - PRIMARY KEY (filename, chunk_id) -); -CREATE CUSTOM INDEX IF NOT EXISTS ann_index - ON documents(embeddings_vector) USING 'StorageAttachedIndex'; -``` - -## Configure access to the Vector Database - -Export some ENV variables in order to configure access to the database: - -``` -export ASTRA_TOKEN=... -export ASTRA_CLIENT_ID=... -export ASTRA_SECRET=... -export ASTRA_DATABASE=... -``` - -You can find the credentials in the Astra DB console when you create a Token. - -The examples/secrets/secrets.yaml resolves those environment variables for you. -When you go in production you are supposed to create a dedicated secrets.yaml file for each environment. - - -## Deploy the LangStream application - -``` -./bin/langstream apps deploy text-extractor -app examples/applications/text-processing -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml -``` - -## Write a document in the S3 bucket - -``` -# Activate port forwarding of the MinIO service in order to be able to upload files to the S3 bucket -kubectl -n minio-dev port-forward minio 9000:9000 & - -# Upload a document to the S3 bucket -dev/s3_upload.sh documents examples/applications/text-processing/simple.pdf -``` - -## Start a Consumer - -Use the gateway to start a consumer that will read the output of the application. - -``` -./bin/langstream gateway consume text-extractor consume-chunks -``` \ No newline at end of file diff --git a/examples/applications/text-processing/configuration.yaml b/examples/applications/text-processing/configuration.yaml deleted file mode 100644 index cc91554dd..000000000 --- a/examples/applications/text-processing/configuration.yaml +++ /dev/null @@ -1,34 +0,0 @@ -# -# -# Copyright DataStax, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -configuration: - resources: - - type: "open-ai-configuration" - name: "OpenAI Azure configuration" - configuration: - url: "${secrets.open-ai.url}" - access-key: "${secrets.open-ai.access-key}" - provider: "${secrets.open-ai.provider}" - - type: "datasource" - name: "AstraDatasource" - configuration: - service: "astra" - clientId: "${secrets.astra.clientId}" - secret: "${secrets.astra.secret}" - secureBundle: "${secrets.astra.secureBundle}" - database: "${secrets.astra.database}" - token: "${secrets.astra.token}" \ No newline at end of file diff --git a/examples/applications/text-processing/write-to-astra.yaml b/examples/applications/text-processing/write-to-astra.yaml deleted file mode 100644 index 801b353cf..000000000 --- a/examples/applications/text-processing/write-to-astra.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# -# Copyright DataStax, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name: "Write to AstraDB" -topics: - - name: "chunks-topic" - creation-mode: create-if-not-exists -assets: - - name: "documents-table" - asset-type: "cassandra-table" - creation-mode: create-if-not-exists - config: - table-name: "documents" - keyspace: "documents" - datasource: "AstraDatasource" - create-statements: - - | - CREATE TABLE IF NOT EXISTS documents.documents ( - filename TEXT, - chunk_id int, - num_tokens int, - language TEXT, - text TEXT, - embeddings_vector VECTOR, - PRIMARY KEY (filename, chunk_id)); - - | - CREATE CUSTOM INDEX IF NOT EXISTS documents_ann_index ON documents.documents(embeddings_vector) USING 'StorageAttachedIndex'; -pipeline: - - name: "Write to Astra" - type: "vector-db-sink" - input: "chunks-topic" - resources: - size: 2 - configuration: - datasource: "AstraDatasource" - table-name: "documents" - keyspace: "documents" - mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens" \ No newline at end of file