Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
During upsert, don't give an NPE if the record doesn't contain value.… (
Browse files Browse the repository at this point in the history
#92)

… Ignore like other sinks.
  • Loading branch information
cdbartholomew authored Jul 3, 2024
1 parent af4b4c6 commit d074bb8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,26 @@ public static Object valueToObject(Value value) {

private Struct buildFilter(Map<String, Object> filter) {
Struct.Builder builder = Struct.newBuilder();
filter.forEach((key, value) -> builder.putFields(key, convertToValue(value)));
filter.forEach(
(key, value) -> {
if (log.isDebugEnabled()) {
log.debug("Key: {}, value: {}", key, value);
}
if (value != null && !isNullNested(value)) {
builder.putFields(key, convertToValue(value));
}
});
return builder.build();
}

private boolean isNullNested(Object value) {
if (value instanceof Map) {
Map<String, Object> map = (Map<String, Object>) value;
return map.values().stream().anyMatch(v -> v == null);
}
return false;
}

@Override
public void close() {
if (connection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,22 @@ public CompletableFuture<?> upsert(Record record, Map<String, Object> context) {
vectorFunction != null
? (List<Object>) vectorFunction.evaluate(mutableRecord)
: null;

Map<String, Object> metadata =
metadataFunctions.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().evaluate(mutableRecord)));
.filter(e -> e.getValue() != null) // Ensure the evaluator itself is
// not null
.map(
e -> {
Object value = e.getValue().evaluate(mutableRecord);
return value != null
? Map.entry(e.getKey(), value)
: null;
})
.filter(entry -> entry != null) // Filter out null entries after
// evaluation
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Struct metadataStruct =
Struct.newBuilder()
.putAllFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,6 +68,8 @@ void testPineconeWrite() throws Exception {
configuration.put("vector.vector", "value.vector");
configuration.put("vector.namespace", "");
configuration.put("vector.metadata.genre", "value.genre");
configuration.put("vector.metadata.artist", "value.artist");
configuration.put("vector.metadata.title", "value.title");

AgentContext agentContext = mock(AgentContext.class);
when(agentContext.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED);
Expand All @@ -75,11 +78,13 @@ void testPineconeWrite() throws Exception {
agent.setContext(agentContext);
List<Record> committed = new CopyOnWriteArrayList<>();
String genre = "random" + UUID.randomUUID();
String title = "title" + UUID.randomUUID();
List<Float> vector = new ArrayList<>();
for (int i = 1; i <= 1536; i++) {
vector.add(1f / i);
}
Map<String, Object> value = Map.of("id", "2", "vector", vector, "genre", genre);
Map<String, Object> value =
Map.of("id", "2", "vector", vector, "genre", genre, "title", title);
SimpleRecord record = SimpleRecord.of(null, new ObjectMapper().writeValueAsString(value));
agent.write(record).thenRun(() -> committed.add(record)).get();

Expand All @@ -89,7 +94,7 @@ void testPineconeWrite() throws Exception {
// work with a 5s sleep.
// Sleep for a while to allow the data to be indexed
log.info("Sleeping for 5 seconds to allow the data to be indexed");
Thread.sleep(5000);
Thread.sleep(10000);

PineconeDataSource dataSource = new PineconeDataSource();
QueryStepDataSource implementation =
Expand All @@ -101,14 +106,57 @@ void testPineconeWrite() throws Exception {
{
"vector": ?,
"topK": 5,
"filter":
{"genre": {"$eq": ?}}
"filter": {
"genre": {"$eq": ?}
}
}
""";
List<Object> params = List.of(vector, genre);
List<Object> params = new ArrayList<>(Arrays.asList(vector, genre));
List<Map<String, Object>> results = implementation.fetchData(query, params);
log.info("Results: {}", results);

assertEquals(1, results.size());

// Query with genre resolved to null (which removes the filter)
params = new ArrayList<>(Arrays.asList(vector, null));
results = implementation.fetchData(query, params);
log.info("Results: {}", results);

assertEquals(3, results.size());

// Query with genre and title
query =
"""
{
"vector": ?,
"topK": 5,
"filter": {
"genre": {"$eq": ?},
"title": {"$eq": ?}
}
}
""";

params = new ArrayList<>(Arrays.asList(vector, genre, title));
results = implementation.fetchData(query, params);
log.info("Results: {}", results);

assertEquals(1, results.size());

// Query with genre resolved to null (which removes the filter)

params = new ArrayList<>(Arrays.asList(vector, null, title));
results = implementation.fetchData(query, params);
log.info("Results: {}", results);

assertEquals(1, results.size());

// Query with genre and title resolved to null (which removes the filters)

params = new ArrayList<>(Arrays.asList(vector, null, null));
results = implementation.fetchData(query, params);
log.info("Results: {}", results);

assertEquals(3, results.size());
}
}

0 comments on commit d074bb8

Please sign in to comment.