Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Nov 22, 2023
1 parent 9d18f7b commit 899c3a8
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public Map<String, Object> executeStatement(
CollectionClient collection = this.getAstraDB().collection(collectionName);

String action = (String) queryMap.remove("action");
if (action == null) {
action = "findOneAndUpdate";
}

switch (action) {
case "findOneAndUpdate":
Expand All @@ -192,9 +195,10 @@ public Map<String, Object> executeStatement(
}
Map<String, Object> updateMap =
(Map<String, Object>) queryMap.remove("update");
if (updateMap != null) {
update.set(builder, updateMap);
if (updateMap == null || updateMap.isEmpty()) {
throw new IllegalArgumentException("update map cannot be empty");
}
update.set(builder, updateMap);

UpdateQuery updateQuery = builder.build();
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ ai.langstream.agents.vector.cassandra.CassandraWriter
ai.langstream.agents.vector.milvus.MilvusWriter
ai.langstream.agents.vector.jdbc.JdbcWriter
ai.langstream.agents.vector.solr.SolrWriter
ai.langstream.agents.vector.opensearch.OpenSearchWriter
ai.langstream.agents.vector.opensearch.OpenSearchWriter
ai.langstream.agents.vector.astra.AstraVectorDBWriter
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,14 @@ public static class AstraCollectionConfig {
required = true)
@JsonProperty("collection-name")
private String collectionName;

@ConfigProperty(
description =
"""
Size of the vector.
""",
required = true)
@JsonProperty("vector-dimension")
private int vectorDimension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ai.langstream.impl.resources;

import ai.langstream.impl.resources.datasource.AstraDatasourceConfig;
import ai.langstream.impl.resources.datasource.AstraVectorDBDatasourceConfig;
import ai.langstream.impl.resources.datasource.CassandraDatasourceConfig;
import ai.langstream.impl.resources.datasource.MilvusDatasourceConfig;
import ai.langstream.impl.resources.datasource.OpenSearchDatasourceConfig;
Expand All @@ -34,6 +35,7 @@ public VectorDatabaseResourceProvider() {
"pinecone", PineconeDatasourceConfig.CONFIG,
"milvus", MilvusDatasourceConfig.CONFIG,
"solr", SolrDatasourceConfig.CONFIG,
"astra-vector-db", AstraVectorDBDatasourceConfig.CONFIG,
"opensearch", OpenSearchDatasourceConfig.CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import ai.langstream.impl.agents.ai.steps.QueryConfiguration;
import ai.langstream.impl.uti.ClassConfigValidator;
import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime;
import ai.langstream.runtime.impl.k8s.agents.vectors.AstraVectorDBVectorDatabaseWriterConfig;
import ai.langstream.runtime.impl.k8s.agents.vectors.CassandraVectorDatabaseWriterConfig;
import ai.langstream.runtime.impl.k8s.agents.vectors.JDBCVectorDatabaseWriterConfig;
import ai.langstream.runtime.impl.k8s.agents.vectors.MilvusVectorDatabaseWriterConfig;
Expand Down Expand Up @@ -80,7 +81,8 @@ public abstract static class VectorDatabaseWriterConfig {
"pinecone", PineconeVectorDatabaseWriterConfig.INSTANCE,
"opensearch", OpenSearchVectorDatabaseWriterConfig.INSTANCE,
"solr", SolrVectorDatabaseWriterConfig.INSTANCE,
"milvus", MilvusVectorDatabaseWriterConfig.INSTANCE);
"milvus", MilvusVectorDatabaseWriterConfig.INSTANCE,
"astra-vector-db", AstraVectorDBVectorDatabaseWriterConfig.INSTANCE);

public QueryVectorDBAgentProvider() {
super(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.runtime.impl.k8s.agents.vectors;

import ai.langstream.api.doc.AgentConfig;
import ai.langstream.api.doc.ConfigProperty;
import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import lombok.Data;

@Data
@AgentConfig(
name = "Astra Vector DB",
description =
"""
Writes data to Apache Cassandra.
All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
""")
public class AstraVectorDBVectorDatabaseWriterConfig
extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig {

public static final QueryVectorDBAgentProvider.VectorDatabaseWriterConfig INSTANCE =
new AstraVectorDBVectorDatabaseWriterConfig();

@Override
public Class getAgentConfigModelClass() {
return AstraVectorDBVectorDatabaseWriterConfig.class;
}

@Override
public boolean isAgentConfigModelAllowUnknownProperties() {
return true;
}

@ConfigProperty(
description =
"The name of the collection to write to. The collection must already exist.",
required = true)
@JsonProperty("collection-name")
String collectionName;

@Data
public static class CollectionField {

@ConfigProperty(description = "Field name", required = true)
String name;

@ConfigProperty(
description = "JSTL Expression for computing the field value.",
required = true)
String expression;
}

@ConfigProperty(description = "Fields of the collection to write to.", required = true)
List<CollectionField> fields;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.kafka;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

@Slf4j
@Disabled
class AstraVectorDBAssetQueryWriteIT extends AbstractKafkaApplicationRunner {

static final String SECRETS_PATH = "";

@Test
public void testAstra() throws Exception {
String tenant = "tenant";
String[] expectedAgents = {"app-step1"};

String secrets = Files.readString(Paths.get(SECRETS_PATH));

Map<String, String> application =
Map.of(
"configuration.yaml",
"""
configuration:
resources:
- type: "vector-database"
name: "AstraDBDatasource"
configuration:
service: "astra-vector-db"
token: "${ secrets.astra.token }"
endpoint: "${ secrets.astra.endpoint }"
""",
"pipeline.yaml",
"""
assets:
- name: "documents-collection"
asset-type: "astra-collection"
creation-mode: create-if-not-exists
deletion-mode: delete
config:
collection-name: "documents"
datasource: "AstraDBDatasource"
vector-dimension: 3
topics:
- name: "input-topic"
creation-mode: create-if-not-exists
pipeline:
- id: step1
name: "Write a document using a query"
type: "query-vector-db"
input: "input-topic"
configuration:
mode: execute
datasource: "AstraDBDatasource"
query: |
{
"action": "insertOne",
"collection-name": "documents",
"document": {
"id": "the-id",
"name": "A name",
"text": "A text",
"vector": [1,2,3]
}
}
output-field: "value.insertresult"
fields:
- "value.documentId"
- name: "Read the document using a query"
type: "query-vector-db"
configuration:
datasource: "AstraDBDatasource"
query: |
{
"collection-name": "documents",
"filter": {
"id": ?
},
"vector": [1,2,3]
}
only-first: true
output-field: "value.queryresult"
fields:
- "value.insertresult.id"
- id: step2
name: "Write a new record to Astra"
type: "vector-db-sink"
configuration:
datasource: "AstraDBDatasource"
collection-name: "documents"
fields:
- name: "id"
expression: "fn:toString('new-id')"
- name: "vector"
expression: "value.queryresult.vector"
- name: "text"
expression: "value.queryresult.text"
- name: "name"
expression: "value.queryresult.name"
""");

try (ApplicationRuntime applicationRuntime =
deployApplicationWithSecrets(
tenant, "app", application, buildInstanceYaml(), secrets, expectedAgents)) {
try (KafkaProducer<String, String> producer = createProducer(); ) {

sendMessage("input-topic", "{\"documentId\":1}", producer);

executeAgentRunners(applicationRuntime);
}

applicationDeployer.cleanup(tenant, applicationRuntime.implementation());
}
}
}

0 comments on commit 899c3a8

Please sign in to comment.