Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
new params to query - search running (#54)
Browse files Browse the repository at this point in the history
added bucket, scope and collection, vecPlanId, vector search index and
semantic search index to the query.

added bucket, scope and collection to the writer aswell.

---------

Co-authored-by: benfrank241 <[email protected]>
  • Loading branch information
benfrank241 and benfrank241 authored Jun 10, 2024
1 parent ebff9d4 commit 7ca5805
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 77 deletions.
30 changes: 23 additions & 7 deletions examples/applications/query-couchbase/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ You also have to set your OpenAI API keys in the secrets.yaml file.
Export some ENV variables in order to configure access to the database:

```
export COUCHBASE_BUCKET_NAME=...
export COUCHBASE_USERNAME=...
export COUCHBASE_PASSWORD=...
export COUCHBASE_CONNECTION_STRING=...
Expand All @@ -33,7 +32,7 @@ For the username and password you will need to create a new 'Database Access' us
You can find the connection string from the Couchbase web interface in the Connect tab.

```
couchbases://cb.shnnjztaidekg6i.cloud.couchbase.com
couchbases://cb.lans3la99acks.cloud.couchbase.com
```

The above is an example of a connection string.
Expand Down Expand Up @@ -93,22 +92,39 @@ Using the docker image:
## Send a message using the gateway to upload a document

```
bin/langstream gateway produce test write-topic -v "{\"id\":\"animal2\",\"document\":\"Crocodile.\",\"filename\":\"private/us-east-1:abcefgh123\",\"vecPlanId\":\"ijklmn-opq-4567\",\"chunkId\":\"10\"}" -p sessionId=$(uuidgen)
bin/langstream gateway produce test write-topic -v "{\"id\":\"animal3\",\"document\":\"Monkey.\",\"filename\":\"private/us-east-1:abcefgh123\",\"vecPlanId\":\"ijklmn-opq-4567\",\"chunkId\":\"10\",\"bucket\":\"vectorize\",\"scope\":\"example\",\"collection\":\"default\"}" -p sessionId=$(uuidgen)
```
You can view the uploaded document in the example scope and default collection of the bucket you selected.


## Query using langstream UI

Select write-topic in the UI for

Send a JSON object with the query parameters:

```
{
"question": "What's the definition of Kafkaesque?",
"vecPlanId": "ijklmn-opq-4567",
"bucket": "vectorize",
"scope": "example",
"collection": "default",
"vectorSearch":"vector-search",
"semanticSearch":"semantic"
}
```

## Start a chat using the gateway to query the document

```
bin/langstream gateway chat test -pg produce-input -cg consume-output -p sessionId=$(uuidgen)
```

Send a JSON string with at matching question:
Send a JSON string with a question:

```
{"question": "What's the definition of Kafkaesque?"}
```
"{\"question\":\"Who is the President?\",\"vecPlanId\":\"ijklmn-opq-4567\",\"bucket\":\"vectorize\",\"scope\":\"example\",\"collection\":\"default\",\"vectorSearch\":\"vector-search\",\"semanticSearch\":\"semantic\"}"



18 changes: 12 additions & 6 deletions examples/applications/query-couchbase/query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,25 @@ pipeline:
type: "query-vector-db"
configuration:
datasource: "CouchbaseDatasource"
bucket-name: "vectorize"
scope-name: "example"
collection-name: "default"
query: |
{
"vector": ?,
"topK": 5,
"vecPlanId": "ijklmn-opq-4567",
"filter":
{"genre": {"$eq": "comedy"}}
"vecPlanId": ?,
"bucket-name": ?,
"scope-name": ?,
"collection-name": ?,
"vector-name": ?,
"semantic-name": ?
}
fields:
- "value.embeddings"
- "value.vecPlanId"
- "value.bucket"
- "value.scope"
- "value.collection"
- "value.vectorSearch"
- "value.semanticSearch"
output-field: "value.query-result"
- name: "Remove embeddings from the output"
type: "drop-fields"
Expand Down
8 changes: 4 additions & 4 deletions examples/applications/query-couchbase/write.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ pipeline:
type: "vector-db-sink"
configuration:
datasource: "CouchbaseDatasource"
bucket-name: "vectorize"
scope-name: "example"
collection-name: "default"
vector.id: "value.id"
bucket-name: "value.bucket"
scope-name: "value.scope"
collection-name: "value.collection"
vector.id: "value.id"
vector.vector: "value.embeddings"

Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public static class CouchbaseQueryStepDataSource implements QueryStepDataSource
@Getter private final CouchbaseConfig clientConfig;
private Cluster cluster;

// private Collection collection;

public CouchbaseQueryStepDataSource(CouchbaseConfig clientConfig) {
this.clientConfig = clientConfig;
}
Expand All @@ -103,28 +101,23 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {
log.info("QueryMap: {}", queryMap);
log.info("Params: {}", params);

// todo get bucketname scopename collection name to be populated - it doesn't appear
// to be in the query or params ATM

float[] vector = JstlFunctions.toArrayOfFloat(queryMap.remove("vector"));
Integer topK = (Integer) queryMap.remove("topK");
String vecPlanId = (String) queryMap.remove("vecPlanId");
String bucketName = (String) params.get(0);
String scopeName = (String) params.get(1);
String collectionName = (String) params.get(2);
log.info("vecPlanId: {}", vecPlanId);
log.info("bucketName: {}", bucketName);
log.info("scopeName: {}", scopeName);
log.info("collectionName: {}", collectionName);
// log.info("username: {}", vector);
String bucketName = (String) queryMap.remove("bucket-name");
String scopeName = (String) queryMap.remove("scope-name");
String collectionName = (String) queryMap.remove("collection-name");
String vectorIndexName = (String) queryMap.remove("vector-name");
String semanticIndexName = (String) queryMap.remove("semantic-name");

// Perform the term search for vecPlanId first
SearchRequest termSearchRequest =
SearchRequest.create(SearchQuery.match(vecPlanId).field("vecPlanId"));
log.info("Term SearchRequest created: {}", termSearchRequest);

SearchResult termSearchResult =
cluster.search(
bucketName + "." + scopeName + ".semantic", termSearchRequest);
bucketName + "." + scopeName + "." + semanticIndexName,
termSearchRequest);

List<SearchRow> termSearchRows = termSearchResult.rows();
Set<String> validIds =
Expand All @@ -148,20 +141,12 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {
VectorSearch.create(
VectorQuery.create("embeddings", vector)
.numCandidates(topK)));
log.info("Vector SearchRequest created: {}", vectorSearchRequest);

SearchResult vectorSearchResult =
cluster.search(
bucketName + "." + scopeName + ".vector-search",
bucketName + "." + scopeName + "." + vectorIndexName,
vectorSearchRequest);

for (SearchRow row : vectorSearchResult.rows()) {
log.info("ID: {}", row.id());
log.info("Score: {}", row.score());
// Log the full row content if available
// log.info("Row: {}", row.fieldsAs(JsonObject.class));
}

// Process and collect results
List<Map<String, Object>> results =
vectorSearchResult.rows().stream()
Expand All @@ -177,6 +162,7 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {

// Fetch and add the document content using collection
// API

try {
String documentId = hit.id();
GetResult getResult =
Expand All @@ -201,8 +187,6 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {
})
.collect(Collectors.toList());

log.info("Final Intersected Results: {}", results);

return results;

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public static class CouchbaseDatabaseWriter implements VectorDatabaseWriter, Aut
private JstlEvaluator vectorFunction;
private JstlEvaluator fileName;
private JstlEvaluator vecPlanId;
private String scopeName;
private String bucketName;
private String collectionName;
private JstlEvaluator scopeName;
private JstlEvaluator bucketName;
private JstlEvaluator collectionName;

public CouchbaseDatabaseWriter(Map<String, Object> datasourceConfig) {
String username = (String) datasourceConfig.get("username");
Expand Down Expand Up @@ -90,28 +90,20 @@ public void close() throws Exception {

@Override
public void initialise(Map<String, Object> agentConfiguration) throws Exception {
// log.info(
// "Initializing CouchbaseDatabaseWriter with configuration: {}",
// agentConfiguration);

this.idFunction = buildEvaluator(agentConfiguration, "vector.id", String.class);
this.vectorFunction = buildEvaluator(agentConfiguration, "vector.vector", List.class);
this.fileName = buildEvaluator(agentConfiguration, "vector.filename", String.class);
this.vecPlanId = buildEvaluator(agentConfiguration, "vector.planId", String.class);
this.bucketName = (String) agentConfiguration.get("bucket-name");
this.scopeName = (String) agentConfiguration.get("scope-name");
this.collectionName = (String) agentConfiguration.get("collection-name");

// Get the bucket, scope, and collection
Bucket bucket = cluster.bucket(bucketName);
bucket.waitUntilReady(Duration.ofSeconds(10));

Scope scope = bucket.scope(scopeName);
collection = scope.collection(collectionName);
// make more generic
this.bucketName = buildEvaluator(agentConfiguration, "bucket-name", String.class);
this.scopeName = buildEvaluator(agentConfiguration, "scope-name", String.class);
this.collectionName =
buildEvaluator(agentConfiguration, "collection-name", String.class);
}

@Override
public CompletableFuture<Void> upsert(Record record, Map<String, Object> context) {

CompletableFuture<Void> handle = new CompletableFuture<>();
return CompletableFuture.runAsync(
() -> {
Expand All @@ -131,6 +123,27 @@ public CompletableFuture<Void> upsert(Record record, Map<String, Object> context
"docId is null, cannot upsert document");
}

String bucketS =
bucketName != null
? (String) bucketName.evaluate(mutableRecord)
: null;
String scopeS =
scopeName != null
? (String) scopeName.evaluate(mutableRecord)
: null;
String collectionS =
collectionName != null
? (String)
collectionName.evaluate(mutableRecord)
: null;

// Get the bucket, scope, and collection
Bucket bucket = cluster.bucket(bucketS);
bucket.waitUntilReady(Duration.ofSeconds(10));

Scope scope = bucket.scope(scopeS);
collection = scope.collection(collectionS);

Object value = record.value();
Map<String, Object> content;

Expand Down
Loading

0 comments on commit 7ca5805

Please sign in to comment.