diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java index c324bd23e..99f6c68ac 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java @@ -273,10 +273,26 @@ public static Object valueToObject(Value value) { private Struct buildFilter(Map filter) { Struct.Builder builder = Struct.newBuilder(); - filter.forEach((key, value) -> builder.putFields(key, convertToValue(value))); + filter.forEach( + (key, value) -> { + if (log.isDebugEnabled()) { + log.debug("Key: {}, value: {}", key, value); + } + if (value != null && !isNullNested(value)) { + builder.putFields(key, convertToValue(value)); + } + }); return builder.build(); } + private boolean isNullNested(Object value) { + if (value instanceof Map) { + Map map = (Map) value; + return map.values().stream().anyMatch(v -> v == null); + } + return false; + } + @Override public void close() { if (connection != null) { diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeWriter.java index d2e66fd86..ada8a7f7f 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeWriter.java @@ -113,12 +113,22 @@ public CompletableFuture upsert(Record record, Map context) { vectorFunction != null ? (List) vectorFunction.evaluate(mutableRecord) : null; + Map metadata = metadataFunctions.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue().evaluate(mutableRecord))); + .filter(e -> e.getValue() != null) // Ensure the evaluator itself is + // not null + .map( + e -> { + Object value = e.getValue().evaluate(mutableRecord); + return value != null + ? Map.entry(e.getKey(), value) + : null; + }) + .filter(entry -> entry != null) // Filter out null entries after + // evaluation + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Struct metadataStruct = Struct.newBuilder() .putAllFields( diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/PineconeWriterTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/PineconeWriterTest.java index f5db91db3..59c154ed2 100644 --- a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/PineconeWriterTest.java +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/PineconeWriterTest.java @@ -29,6 +29,7 @@ import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,6 +68,8 @@ void testPineconeWrite() throws Exception { configuration.put("vector.vector", "value.vector"); configuration.put("vector.namespace", ""); configuration.put("vector.metadata.genre", "value.genre"); + configuration.put("vector.metadata.artist", "value.artist"); + configuration.put("vector.metadata.title", "value.title"); AgentContext agentContext = mock(AgentContext.class); when(agentContext.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED); @@ -75,11 +78,13 @@ void testPineconeWrite() throws Exception { agent.setContext(agentContext); List committed = new CopyOnWriteArrayList<>(); String genre = "random" + UUID.randomUUID(); + String title = "title" + UUID.randomUUID(); List vector = new ArrayList<>(); for (int i = 1; i <= 1536; i++) { vector.add(1f / i); } - Map value = Map.of("id", "2", "vector", vector, "genre", genre); + Map value = + Map.of("id", "2", "vector", vector, "genre", genre, "title", title); SimpleRecord record = SimpleRecord.of(null, new ObjectMapper().writeValueAsString(value)); agent.write(record).thenRun(() -> committed.add(record)).get(); @@ -89,7 +94,7 @@ void testPineconeWrite() throws Exception { // work with a 5s sleep. // Sleep for a while to allow the data to be indexed log.info("Sleeping for 5 seconds to allow the data to be indexed"); - Thread.sleep(5000); + Thread.sleep(10000); PineconeDataSource dataSource = new PineconeDataSource(); QueryStepDataSource implementation = @@ -101,14 +106,57 @@ void testPineconeWrite() throws Exception { { "vector": ?, "topK": 5, - "filter": - {"genre": {"$eq": ?}} + "filter": { + "genre": {"$eq": ?} + } } """; - List params = List.of(vector, genre); + List params = new ArrayList<>(Arrays.asList(vector, genre)); List> results = implementation.fetchData(query, params); log.info("Results: {}", results); assertEquals(1, results.size()); + + // Query with genre resolved to null (which removes the filter) + params = new ArrayList<>(Arrays.asList(vector, null)); + results = implementation.fetchData(query, params); + log.info("Results: {}", results); + + assertEquals(3, results.size()); + + // Query with genre and title + query = + """ + { + "vector": ?, + "topK": 5, + "filter": { + "genre": {"$eq": ?}, + "title": {"$eq": ?} + } + } + """; + + params = new ArrayList<>(Arrays.asList(vector, genre, title)); + results = implementation.fetchData(query, params); + log.info("Results: {}", results); + + assertEquals(1, results.size()); + + // Query with genre resolved to null (which removes the filter) + + params = new ArrayList<>(Arrays.asList(vector, null, title)); + results = implementation.fetchData(query, params); + log.info("Results: {}", results); + + assertEquals(1, results.size()); + + // Query with genre and title resolved to null (which removes the filters) + + params = new ArrayList<>(Arrays.asList(vector, null, null)); + results = implementation.fetchData(query, params); + log.info("Results: {}", results); + + assertEquals(3, results.size()); } }