Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
[milvus] Add support for Milvus.io - part 2 (LangStream#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Sep 25, 2023
1 parent 084636f commit 1f99ec2
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 194 deletions.
6 changes: 3 additions & 3 deletions examples/applications/query-milvus/chatbot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ pipeline:
text: "{{% value.question }}"
flush-interval: 0
- name: "lookup-related-documents-in-llm"
type: "query"
type: "query-vector-db"
configuration:
datasource: "MilvusDatasource"
query: |
{
"collection-name": "documents",
"collection-name": "docs",
"vectors": ?,
"top-k": 1
"top-k": 10,
"output-fields": ["text"]
}
fields:
Expand Down
60 changes: 58 additions & 2 deletions examples/applications/query-milvus/crawler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,45 @@ name: "Crawl a website"
topics:
- name: "chunks-topic"
creation-mode: create-if-not-exists
assets:
- name: "documents-table"
asset-type: "milvus-collection"
creation-mode: create-if-not-exists
deletion-mode: delete
config:
collection-name: "docs"
database-name: "default"
datasource: "MilvusDatasource"
create-statements:
- |
{
"command": "create-collection",
"collection-name": "docs",
"database-name": "default",
"field-types": [
{
"name": "filename_and_chunkid",
"primary-key": true,
"data-type": "Varchar",
"max-length": 1024
},
{
"name": "text",
"data-type": "Varchar",
"max-length": 65535
},
{
"name": "language",
"data-type": "Varchar",
"max-length": 3
},
{
"name": "vector",
"data-type": "FloatVector",
"dimension": 1536
}
]
}
resources:
size: 2
pipeline:
Expand Down Expand Up @@ -88,10 +127,27 @@ pipeline:
- name: "compute-embeddings"
id: "step1"
type: "compute-ai-embeddings"
output: "chunks-topic"
output: chunks-topic
configuration:
model: "text-embedding-ada-002" # This needs to match the name of the model deployment, not the base model
embeddings-field: "value.embeddings_vector"
text: "{{% value.text }}"
batch-size: 10
flush-interval: 500
flush-interval: 500
- name: "Write to Milvus"
type: "vector-db-sink"
input: chunks-topic
configuration:
datasource: "MilvusDatasource"
collection-name: "docs"
fields:
- name: "filename_and_chunkid"
expression: "fn:concat(value.filename, value.chunk_id)"
- name: "vector"
expression: "fn:toListOfFloat(value.embeddings_vector)"
- name: "language"
expression: "value.language"
- name: "text"
expression: "value.text"
- name: "num_tokens"
expression: "value.chunk_num_tokens"
93 changes: 0 additions & 93 deletions examples/applications/query-milvus/write-to-database.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion examples/secrets/secrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ secrets:
username: "${MILVUS_USERNAME:-}"
password: "${MILVUS_PASSWORD:-}"
host: "${MILVUS_HOST:-}"
port: "${MILVUS_PORT:19530}"
port: "${MILVUS_PORT:-19530}"
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public static TransformContext recordToTransformContext(
context.setKeyNativeSchema(((GenericRecord) record.key()).getSchema());
}
context.setValueObject(record.value());
context.setValueSchemaType(getSchemaType(record.value().getClass()));
context.setValueSchemaType(
record.value() == null ? null : getSchemaType(record.value().getClass()));
// TODO: temporary hack. We should be able to get the schema from the record
if (record.value() instanceof GenericRecord) {
context.setKeyNativeSchema(((GenericRecord) record.value()).getSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@
import ai.langstream.api.runner.assets.AssetManager;
import ai.langstream.api.runner.assets.AssetManagerProvider;
import ai.langstream.api.util.ConfigurationUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DescribeCollectionResponse;
import io.milvus.param.Constant;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.CreateCollectionParam;
import io.milvus.param.collection.DescribeCollectionParam;
import io.milvus.param.collection.DropCollectionParam;
import io.milvus.param.collection.HasCollectionParam;
import io.milvus.param.collection.LoadCollectionParam;
import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
import io.milvus.param.index.CreateIndexParam;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -80,9 +87,7 @@ public boolean assetExists() throws Exception {
.withDatabaseName(databaseName)
.build());

if (hasCollection.getException() != null) {
throw hasCollection.getException();
}
MilvusModel.handleException(hasCollection);

if (hasCollection.getData() != null && hasCollection.getData()) {
log.info("Table {} exists", collectionName);
Expand Down Expand Up @@ -114,31 +119,112 @@ public void deployAsset() throws Exception {
List<String> statements =
ConfigurationUtils.getList("create-statements", assetDefinition.getConfig());
execStatements(statements);

MilvusServiceClient milvusClient = datasource.getMilvusClient();
R<DescribeCollectionResponse> describeCollectionResponse =
milvusClient.describeCollection(
DescribeCollectionParam.newBuilder()
.withCollectionName(getCollectionName())
.withDatabaseName(getDatabaseName())
.build());

MilvusModel.handleException(describeCollectionResponse);

log.info(
"Describe collection {} in database {}",
getCollectionName(),
getDatabaseName());
log.info("Result: {}", describeCollectionResponse.getData());
}

private void execStatements(List<String> statements) {
private void execStatements(List<String> statements) throws Exception {
MilvusServiceClient milvusClient = datasource.getMilvusClient();
for (String statement : statements) {
log.info("Executing: {}", statement);

if (statement.contains("fieldTypes")) {
CreateCollectionParam parsedQuery =
buildObjectFromJson(
statement,
CreateCollectionParam.Builder.class,
List.of(),
MilvusModel.getMapper())
.build();
milvusClient.createCollection(parsedQuery);
} else {
CreateSimpleCollectionParam parsedQuery =
buildObjectFromJson(
statement,
CreateSimpleCollectionParam.Builder.class,
List.of(),
MilvusModel.getMapper())
.build();
milvusClient.createCollection(parsedQuery);
Map<String, Object> statementMap =
MilvusModel.getMapper()
.readValue(statement, new TypeReference<Map<String, Object>>() {});
String command = ConfigurationUtils.getString("command", "", statementMap);

if (command.isEmpty()) {
throw new IllegalArgumentException("Command is empty");
}

switch (command) {
case "create-collection":
{
CreateCollectionParam parsedQuery =
buildObjectFromJson(
statement,
CreateCollectionParam.Builder.class,
List.of(),
MilvusModel.getMapper())
.build();
log.info("Command: {}", parsedQuery);
R<RpcStatus> resultCreate = milvusClient.createCollection(parsedQuery);

MilvusModel.handleException(resultCreate);

R<RpcStatus> indexResult =
milvusClient.createIndex(
CreateIndexParam.newBuilder()
.withCollectionName(
parsedQuery.getCollectionName())
.withDatabaseName(parsedQuery.getDatabaseName())
.withIndexType(IndexType.AUTOINDEX)
.withIndexName(
Constant.VECTOR_INDEX_NAME_DEFAULT)
.withFieldName(
Constant.VECTOR_FIELD_NAME_DEFAULT)
.withMetricType(MetricType.L2)
.withSyncMode(true)
.build());
MilvusModel.handleException(indexResult);

R<RpcStatus> result =
milvusClient.loadCollection(
LoadCollectionParam.newBuilder()
.withCollectionName(
parsedQuery.getCollectionName())
.withDatabaseName(parsedQuery.getDatabaseName())
.withSyncLoad(true)
.withSyncLoadWaitingInterval(2000L)
.build());

MilvusModel.handleException(result);
break;
}
case "load-collection":
{
R<RpcStatus> result =
milvusClient.loadCollection(
LoadCollectionParam.newBuilder()
.withCollectionName(getCollectionName())
.withDatabaseName(getDatabaseName())
.withSyncLoad(true)
.withSyncLoadWaitingInterval(2000L)
.build());

MilvusModel.handleException(result);
break;
}
case "create-simple-collection":
{
CreateSimpleCollectionParam parsedQuery =
buildObjectFromJson(
statement,
CreateSimpleCollectionParam.Builder.class,
List.of(),
MilvusModel.getMapper())
.build();
log.info("Command: {}", parsedQuery);
R<RpcStatus> createResult = milvusClient.createCollection(parsedQuery);
MilvusModel.handleException(createResult);
break;
}
default:
throw new IllegalStateException("Unexpected command: " + command);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ public List<Map<String, String>> fetchData(String query, List<Object> params) {
params,
MilvusModel.getMapper())
.build();
if (log.isDebugEnabled()) {
log.debug("Command {}", searchParam);
}
R<SearchResponse> respSearch = milvusClient.search(searchParam);

if (respSearch.getException() != null) {
throw new RuntimeException(respSearch.getException());
if (log.isDebugEnabled()) {
log.debug("Response {}", respSearch);
}
MilvusModel.handleException(respSearch);

SearchResponse data = respSearch.getData();

Expand Down
Loading

0 comments on commit 1f99ec2

Please sign in to comment.