Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
Couchbase-hybrid-search (#52)
Browse files Browse the repository at this point in the history
writer working, fetch not

---------

Co-authored-by: benfrank241 <[email protected]>
Co-authored-by: Chris Bartholomew <[email protected]>
  • Loading branch information
3 people authored Jun 5, 2024
1 parent 098b0b9 commit 3414cfb
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 93 deletions.
13 changes: 12 additions & 1 deletion examples/applications/query-couchbase/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ Do the same but with the scope you just created to create a collection.
6. **Verify the Index Creation:**
- Once created, the index should appear in the list of indexes. You can manage or modify the index from this interface as needed.

## Deploy the LangStream application
Allow your IP to connect to the capella server

Settings > Networking > Allowed IP Addresses

Add Allowed IP

Add Current IP Address

Add Allowed IP


## Deploy the LangStream application

Expand All @@ -82,7 +93,7 @@ Using the docker image:
## Send a message using the gateway to upload a document

```
bin/langstream gateway produce test write-topic -v "{\"id\":\"myid\",\"document\":\"Kafkaesque: extremely unpleasant, frightening, and confusing, and similar to situations described in the novels of Franz Kafka.\"}" -p sessionId=$(uuidgen)
bin/langstream gateway produce test write-topic -v "{\"id\":\"animal2\",\"document\":\"Crocodile.\",\"filename\":\"private/us-east-1:abcefgh123\",\"vecPlanId\":\"ijklmn-opq-4567\",\"chunkId\":\"10\"}" -p sessionId=$(uuidgen)
```
You can view the uploaded document in the example scope and default collection of the bucket you selected.
Expand Down
3 changes: 0 additions & 3 deletions examples/applications/query-couchbase/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,3 @@ configuration:
connection-string: "${secrets.couchbase.connection-string}"
username: "${secrets.couchbase.username}"
password: "${secrets.couchbase.password}"
bucket-name: "${secrets.couchbase.bucket-name}"
scope-name: "example"
collection-name: "default"
4 changes: 4 additions & 0 deletions examples/applications/query-couchbase/query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ pipeline:
type: "query-vector-db"
configuration:
datasource: "CouchbaseDatasource"
bucket-name: "vectorize"
scope-name: "example"
collection-name: "default"
query: |
{
"vector": ?,
"topK": 5,
"vecPlanId": "ijklmn-opq-4567",
"filter":
{"genre": {"$eq": "comedy"}}
}
Expand Down
5 changes: 1 addition & 4 deletions examples/applications/query-couchbase/write.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ pipeline:
type: "vector-db-sink"
configuration:
datasource: "CouchbaseDatasource"
bucket-name: "${secrets.couchbase.bucket-name}"
username: "${secrets.couchbase.username}"
password: "${secrets.couchbase.password}"
connection-string: "${secrets.couchbase.connection-string}"
bucket-name: "vectorize"
scope-name: "example"
collection-name: "default"
vector.id: "value.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,13 @@ public class QueryConfig extends StepConfig {

@JsonProperty("generated-keys")
private List<String> generatedKeys;

@JsonProperty("bucket-name")
private String bucketName;

@JsonProperty("scope-name")
private String scopeName;

@JsonProperty("collection-name")
private String collectionName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,18 @@ components:
type:
- string
description: The record fields to pass as query parameters.
bucket-name:
type:
- string
description: The bucket name.
scope-name:
type:
- string
description: The scope name.
collection-name:
type:
- string
description: The collection name.
- "$ref": "#/components/schemas/Part"

type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@
import ai.langstream.ai.agents.commons.jstl.JstlFunctions;
import ai.langstream.ai.agents.datasource.DataSourceProvider;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.GetResult;
import com.couchbase.client.java.search.SearchQuery;
import com.couchbase.client.java.search.SearchRequest;
import com.couchbase.client.java.search.result.SearchResult;
import com.couchbase.client.java.search.result.SearchRow;
import com.couchbase.client.java.search.vector.VectorQuery;
import com.couchbase.client.java.search.vector.VectorSearch;
import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.Getter;
Expand All @@ -51,20 +57,11 @@ public static final class CouchbaseConfig {
@JsonProperty(value = "connection-string", required = true)
private String connectionString;

@JsonProperty(value = "bucket-name", required = true)
private String bucketName;

@JsonProperty(value = "username", required = true)
private String username;

@JsonProperty(value = "password", required = true)
private String password;

@JsonProperty(value = "scope-name", required = true)
private String scopeName;

@JsonProperty(value = "collection-name", required = true)
private String collectionName;
}

@Override
Expand Down Expand Up @@ -92,7 +89,7 @@ public void initialize(Map<String, Object> config) {
clientConfig.connectionString,
clientConfig.username,
clientConfig.password);
log.info("Connected to Couchbase Bucket: {}", clientConfig.bucketName);
log.info("Connected to Couchbase: {}", clientConfig.connectionString);
}

@Override
Expand All @@ -103,37 +100,110 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {
if (queryMap.isEmpty()) {
throw new UnsupportedOperationException("Query is empty");
}
log.info("QueryMap: {}", queryMap);
log.info("Params: {}", params);

// todo get bucketname scopename collection name to be populated - it doesn't appear
// to be in the query or params ATM

float[] vector = JstlFunctions.toArrayOfFloat(queryMap.remove("vector"));
Integer topK = (Integer) queryMap.remove("topK");
// scope namen comes from querymap
String vecPlanId = (String) queryMap.remove("vecPlanId");
String bucketName = (String) params.get(0);
String scopeName = (String) params.get(1);
String collectionName = (String) params.get(2);
log.info("vecPlanId: {}", vecPlanId);
log.info("bucketName: {}", bucketName);
log.info("scopeName: {}", scopeName);
log.info("collectionName: {}", collectionName);
// log.info("username: {}", vector);
// Perform the term search for vecPlanId first
SearchRequest termSearchRequest =
SearchRequest.create(SearchQuery.match(vecPlanId).field("vecPlanId"));
log.info("Term SearchRequest created: {}", termSearchRequest);

SearchResult termSearchResult =
cluster.search(
bucketName + "." + scopeName + ".semantic", termSearchRequest);

List<SearchRow> termSearchRows = termSearchResult.rows();
Set<String> validIds =
termSearchRows.stream().map(SearchRow::id).collect(Collectors.toSet());
log.info("Term Search Result IDs: {}", validIds);

SearchRequest request =
SearchRequest.create(
VectorSearch.create(
VectorQuery.create("embeddings", vector)
.numCandidates(topK)));
log.debug("SearchRequest created: {}", request);
if (validIds.isEmpty()) {
return Collections.emptyList();
}

Map<String, Double> termSearchScores =
termSearchRows.stream()
.collect(Collectors.toMap(SearchRow::id, SearchRow::score));

log.info("Term Search Scores: {}", termSearchScores);

SearchResult result =
// Perform the vector search on the filtered documents
SearchRequest vectorSearchRequest =
SearchRequest.create(SearchQuery.match(vecPlanId).field("vecPlanId"))
.vectorSearch(
VectorSearch.create(
VectorQuery.create("embeddings", vector)
.numCandidates(topK)));
log.info("Vector SearchRequest created: {}", vectorSearchRequest);

SearchResult vectorSearchResult =
cluster.search(
""
+ clientConfig.bucketName
+ "."
+ clientConfig.scopeName
+ ".vector-search",
request);

return result.rows().stream()
.map(
hit -> {
Map<String, Object> r = new HashMap<>();
r.put("id", hit.id());
r.put("similarity", hit.score()); // Adds the similarity score

return r;
})
.collect(Collectors.toList());
bucketName + "." + scopeName + ".vector-search",
vectorSearchRequest);

for (SearchRow row : vectorSearchResult.rows()) {
log.info("ID: {}", row.id());
log.info("Score: {}", row.score());
// Log the full row content if available
// log.info("Row: {}", row.fieldsAs(JsonObject.class));
}

// Process and collect results
List<Map<String, Object>> results =
vectorSearchResult.rows().stream()
.filter(hit -> validIds.contains(hit.id()))
.limit(topK)
.map(
hit -> {
Map<String, Object> result = new HashMap<>();
double adjustedScore =
hit.score() - termSearchScores.get(hit.id());
result.put("similarity", adjustedScore);
result.put("id", hit.id());

// Fetch and add the document content using collection
// API
try {
String documentId = hit.id();
GetResult getResult =
cluster.bucket(bucketName)
.scope(scopeName)
.collection(collectionName)
.get(documentId);
if (getResult != null) {
JsonObject content =
getResult.contentAsObject();
content.removeKey("embeddings");
result.putAll(content.toMap());
}
} catch (Exception e) {
log.error(
"Error retrieving document content for ID: {}",
hit.id(),
e);
}

return result;
})
.collect(Collectors.toList());

log.info("Final Intersected Results: {}", results);

return results;

} catch (Exception e) {
log.error("Error executing query: {}", e.getMessage(), e);
Expand All @@ -145,7 +215,7 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {
public void close() {
if (cluster != null) {
cluster.disconnect();
log.info("Disconnected from Couchbase Bucket: {}", clientConfig.bucketName);
log.info("Disconnected from Couchbase Bucket: {}", clientConfig.connectionString);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,24 @@ public CouchbaseDatabaseWriter createImplementation(Map<String, Object> datasour
public static class CouchbaseDatabaseWriter implements VectorDatabaseWriter, AutoCloseable {

private final Cluster cluster;
public final Collection collection;
public Collection collection;
private JstlEvaluator idFunction;
private JstlEvaluator vectorFunction;
private JstlEvaluator fileName;
private JstlEvaluator vecPlanId;
private String scopeName;
private String bucketName;
private String collectionName;

public CouchbaseDatabaseWriter(Map<String, Object> datasourceConfig) {
String username = (String) datasourceConfig.get("username");
String password = (String) datasourceConfig.get("password");
String bucketName = (String) datasourceConfig.get("bucket-name");
String scopeName = (String) datasourceConfig.getOrDefault("scope-name", "_default");
// String bucketName = (String) datasourceConfig.getOrDefault("bucket-name",
// "vectorize");
// String scopeName = (String) datasourceConfig.getOrDefault("scope-name", "_default");
String connectionString = (String) datasourceConfig.get("connection-string");
String collectionName =
(String) datasourceConfig.getOrDefault("collection-name", "_default");
// String collectionName =
// (String) datasourceConfig.getOrDefault("collection-name", "_default");

// Create a cluster with the WAN profile
ClusterOptions clusterOptions =
Expand All @@ -75,13 +81,6 @@ public CouchbaseDatabaseWriter(Map<String, Object> datasourceConfig) {
});

cluster = Cluster.connect(connectionString, clusterOptions);

// Get the bucket, scope, and collection
Bucket bucket = cluster.bucket(bucketName);
bucket.waitUntilReady(Duration.ofSeconds(10));

Scope scope = bucket.scope(scopeName);
collection = scope.collection(collectionName);
}

@Override
Expand All @@ -96,6 +95,19 @@ public void initialise(Map<String, Object> agentConfiguration) throws Exception
// agentConfiguration);
this.idFunction = buildEvaluator(agentConfiguration, "vector.id", String.class);
this.vectorFunction = buildEvaluator(agentConfiguration, "vector.vector", List.class);
this.fileName = buildEvaluator(agentConfiguration, "vector.filename", String.class);
this.vecPlanId = buildEvaluator(agentConfiguration, "vector.planId", String.class);
this.bucketName = (String) agentConfiguration.get("bucket-name");
this.scopeName = (String) agentConfiguration.get("scope-name");
this.collectionName = (String) agentConfiguration.get("collection-name");

// Get the bucket, scope, and collection
Bucket bucket = cluster.bucket(bucketName);
bucket.waitUntilReady(Duration.ofSeconds(10));

Scope scope = bucket.scope(scopeName);
collection = scope.collection(collectionName);
// make more generic
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void testCouchbaseWrite() throws Exception {
Map.of(
"service", "couchbase",
"connection-string", "couchbases://",
"bucket-name", "",
"bucket-name", "", // scope namen comes from querymap
"username", "",
"password", "");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ record fields using "record.field".
@JsonProperty("generated-keys")
private List<String> generatedKeys;

@ConfigProperty(
description =
"""
The name of the bucket to use in the database.
""")
@JsonProperty("bucket-name")
private String bucketName;

@ConfigProperty(
description =
"""
The name of the scope to use in the database.
""")
@JsonProperty("scope-name")
private String scopeName;

@ConfigProperty(
description =
"""
The name of the collection to use in the database.
""")
@JsonProperty("collection-name")
private String collectionName;

enum Mode {
query,
execute
Expand Down
Loading

0 comments on commit 3414cfb

Please sign in to comment.