Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
Update to latest Pinecone SDK that works with servless indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
cdbartholomew committed Mar 26, 2024
1 parent 9a85746 commit 1ada050
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 87 deletions.
36 changes: 34 additions & 2 deletions examples/applications/query-pinecone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,23 @@ You also have to set your OpenAI API keys in the secrets.yaml file.
Export some ENV variables in order to configure access to the database:

```
export PINECONE_SERVICE = ...
export PINECONE_ACCESS_KEY...
export PINECONE_ACCESS_KEY=...
export PINECONE_PROJECT_NAME...
export PINECONE_ENVIRONMENT=...
export PINECONE_INDEX_NAME=...
```

The access key can be created from the Pinecone web interface. If you create a serverless
index, you can determine the project and environment from the listed URL. For example:

```
https://example-index-lvkf6c1.svc.apw5-4e34-81fa.pinecone.io
```

The project name follows the index name in the first level of the DNS name. In this
example, the project is `lvkf6c1`. The environment is after `svc` in the DNS name,
so `apw5-4e34-81fa`.

The examples/secrets/secrets.yaml resolves those environment variables for you.
When you go in production you are supposed to create a dedicated secrets.yaml file for each environment.

Expand All @@ -45,6 +55,28 @@ When you go in production you are supposed to create a dedicated secrets.yaml fi
```
./bin/langstream apps deploy test -app examples/applications/query-pinecone -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml
```
Using the docker image:

```
./bin/langstream docker run test -app examples/applications/query-pinecone -s examples/secrets/secrets.yaml
```

## Send a message using the gateway to index a document

```
bin/langstream gateway produce test write-topic -v "{\"id\":\"myid\",\"document\":\"Hello\",\"genre\":\"comedy\"}" -p sessionId=$(uuidgen)
```
## Start a chat using the gateway to query the index

```
bin/langstream gateway chat test -pg produce-input -cg consume-output -p sessionId=$(uuidgen)
```

Send a JSON string with at matching question:

```
{"question": "Hello"}
```

## Start a Producer to index a document

Expand Down
50 changes: 50 additions & 0 deletions examples/applications/query-pinecone/gateways.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: write-topic
type: produce
topic: vectors-topic
parameters:
- sessionId
produceOptions:
headers:
- key: client_session_id
valueFromParameters: sessionId


- id: produce-input
type: produce
topic: input-topic
parameters:
- sessionId
produceOptions:
headers:
- key: client_session_id
valueFromParameters: sessionId

- id: consume-output
type: consume
topic: output-topic
parameters:
- sessionId
consumeOptions:
filters:
headers:
- key: client_session_id
valueFromParameters: sessionId

2 changes: 1 addition & 1 deletion examples/applications/query-pinecone/query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pipeline:
"vector": ?,
"topK": 5,
"filter":
{"$or": [{"genre": "comedy"}, {"year":2019}]}
{"genre": {"$eq": "comedy"}}
}
fields:
- "value.embeddings"
Expand Down
2 changes: 1 addition & 1 deletion examples/secrets/secrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ secrets:
id: pinecone
data:
service: "${PINECONE_SERVICE:-}"
access-key: "${PINECONE_ACCESS_KEY:-}"
api-key: "${PINECONE_ACCESS_KEY:-}"
project-name: "${PINECONE_PROJECT_NAME:-}"
environment: "${PINECONE_ENVIRONMENT:-asia-southeast1-gcp-free}"
index-name: "${PINECONE_INDEX_NAME:-example-index}"
Expand Down
2 changes: 1 addition & 1 deletion langstream-agents/langstream-vector-agents/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<!-- Pinecone connector -->
<groupId>io.pinecone</groupId>
<artifactId>pinecone-client</artifactId>
<version>0.2.3</version>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>io.milvus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import io.pinecone.PineconeConnectionConfig;
import io.pinecone.proto.QueryRequest;
import io.pinecone.proto.QueryResponse;
import io.pinecone.proto.QueryVector;
import io.pinecone.proto.SparseValues;
import io.pinecone.proto.ScoredVector;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -125,6 +124,7 @@ public List<Map<String, Object>> fetchData(String query, List<Object> params) {
List<Map<String, Object>> results;

if (clientConfig.getEndpoint() == null) {
log.debug("Executing query using Pinecone client");
results = executeQueryUsingClien(batchQueryRequest, parsedQuery);
} else {
results = executeQueryWithMockHttpService(batchQueryRequest);
Expand Down Expand Up @@ -153,102 +153,104 @@ private List<Map<String, Object>> executeQueryWithMockHttpService(
private List<Map<String, Object>> executeQueryUsingClien(
QueryRequest batchQueryRequest, Query parsedQuery) {
List<Map<String, Object>> results;

if (log.isDebugEnabled()) {
log.debug("Query request: {}", batchQueryRequest);
}
QueryResponse queryResponse = connection.getBlockingStub().query(batchQueryRequest);

if (log.isDebugEnabled()) {
log.debug("Query response: {}", queryResponse);

List<ScoredVector> matchesList = queryResponse.getMatchesList();
// Loop over matchesList and log the contents
for (ScoredVector match : matchesList) {
log.debug("Match ID: {}", match.getId());
log.debug("Match Score: {}", match.getScore());
log.debug("Match Metadata: {}", match.getMetadata());
}
}
log.info("Query response: {}", queryResponse);

log.info("Num matches: {}", queryResponse.getMatchesList().size());

results = new ArrayList<>();
queryResponse
.getResultsList()
.getMatchesList()
.forEach(
res ->
res.getMatchesList()
.forEach(
match -> {
String id = match.getId();
Map<String, Object> row = new HashMap<>();

if (parsedQuery.includeMetadata) {
// put all the metadata
if (match.getMetadata() != null) {
match.getMetadata()
.getFieldsMap()
.forEach(
(key, value) -> {
if (log
.isDebugEnabled()) {
log.debug(
"Key: {}, value: {} {}",
key,
value,
value
!= null
? value
.getClass()
: null);
}
Object
converted =
valueToObject(
value);
row.put(
key,
converted
!= null
? converted
.toString()
: null);
});
match -> {
String id = match.getId();
Map<String, Object> row = new HashMap<>();

if (parsedQuery.includeMetadata) {
// put all the metadata
if (match.getMetadata() != null) {
match.getMetadata()
.getFieldsMap()
.forEach(
(key, value) -> {
if (log.isDebugEnabled()) {
log.debug(
"Key: {}, value: {} {}",
key,
value,
value != null
? value.getClass()
: null);
}
}
row.put("id", id);
results.add(row);
}));
Object converted = valueToObject(value);
row.put(
key,
converted != null
? converted.toString()
: null);
});
}
}
row.put("id", id);
results.add(row);
});
return results;
}

@NotNull
private QueryRequest mapQueryToQueryRequest(Query parsedQuery) {
QueryVector.Builder builder = QueryVector.newBuilder();
QueryRequest.Builder requestBuilder = QueryRequest.newBuilder();
log.info("Parsed query: {}", parsedQuery);

if (parsedQuery.vector != null) {
builder.addAllValues(parsedQuery.vector);
// Set namespace if available
if (parsedQuery.namespace != null) {
requestBuilder.setNamespace(parsedQuery.namespace);
}

if (parsedQuery.sparseVector != null) {
builder.setSparseValues(
SparseValues.newBuilder()
.addAllValues(parsedQuery.sparseVector.getValues())
.addAllIndices(parsedQuery.sparseVector.getIndices())
.build());
// Add vector or sparse vector to the request
if (parsedQuery.vector != null) {
// Use addAllVector for dense vectors
Iterable<Float> iterableVector = parsedQuery.vector;
requestBuilder.addAllVector(iterableVector);
} else if (parsedQuery.sparseVector != null) {
// For sparse vectors, you would typically need to handle them differently
// This assumes your API has a way to add sparse vectors directly
// If not, you might need to convert them to a dense format or handle them as per
// your API's capability
// Example:
// requestBuilder.addAllVector(convertSparseToDense(parsedQuery.sparseVector));
// Where `convertSparseToDense` is a method you'd implement to convert sparse
// vectors to dense vectors if necessary
}

// Set filter if available
if (parsedQuery.filter != null && !parsedQuery.filter.isEmpty()) {
builder.setFilter(buildFilter(parsedQuery.filter));
log.info("Built filter: {}", buildFilter(parsedQuery.filter));
requestBuilder.setFilter(buildFilter(parsedQuery.filter));
}

if (parsedQuery.namespace != null) {
builder.setNamespace(parsedQuery.namespace);
}

QueryVector queryVector = builder.build();
QueryRequest.Builder requestBuilder = QueryRequest.newBuilder();

if (parsedQuery.namespace != null) {
requestBuilder.setNamespace(parsedQuery.namespace);
}
// Other settings from the parsed query
requestBuilder
.setTopK(parsedQuery.topK)
.setIncludeMetadata(parsedQuery.includeMetadata)
.setIncludeValues(parsedQuery.includeValues);

QueryRequest batchQueryRequest =
requestBuilder
.addQueries(queryVector)
.setTopK(parsedQuery.topK)
.setIncludeMetadata(parsedQuery.includeMetadata)
.setIncludeValues(parsedQuery.includeValues)
.build();
return batchQueryRequest;
return requestBuilder.build();
}

public static Object valueToObject(Value value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ void testPineconeQuery() throws Exception {
Map<String, Object> config =
Map.of(
"api-key", "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"environment", "asia-southeast1-gcp-free",
"project-name", "032e3d0",
"environment", "apw5-4e34-81fa",
"project-name", "lvkf6c1",
"index-name", "example-index");
QueryStepDataSource implementation = dataSource.createDataSourceImplementation(config);
implementation.initialize(null);
Expand Down
Loading

0 comments on commit 1ada050

Please sign in to comment.