diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java index bcd578564..d3e41252c 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -176,15 +176,15 @@ public void initialise(Map agentConfiguration) { processor.start(configuration); } - private final AtomicReference> currentRecordStatus = - new AtomicReference<>(); + private final Map> currentRecordStatus = + new ConcurrentHashMap<>(); @Override public CompletableFuture upsert(Record record, Map context) { // we must handle one record at a time // so we block until the record is processed CompletableFuture handle = new CompletableFuture(); - currentRecordStatus.set(handle); + currentRecordStatus.put(record, handle); processor.put(List.of(new LangStreamSinkRecordAdapter(record))); return handle; } @@ -208,7 +208,8 @@ public String applicationName() { @Override protected void handleSuccess(AbstractSinkRecord abstractRecord) { Record record = ((LangStreamSinkRecordAdapter) abstractRecord).getRecord(); - currentRecordStatus.get().complete(null); + CompletableFuture remove = currentRecordStatus.remove(record); + remove.complete(null); } @Override @@ -242,12 +243,14 @@ protected void handleFailure( log.warn("Error decoding/mapping Kafka record {}: {}", record, e.getMessage()); } + CompletableFuture remove = currentRecordStatus.remove(record); + if (ignoreErrors == CassandraSinkConfig.IgnoreErrorsPolicy.NONE || (ignoreErrors == CassandraSinkConfig.IgnoreErrorsPolicy.DRIVER && !driverFailure)) { - currentRecordStatus.get().completeExceptionally(e); + remove.completeExceptionally(e); } else { - currentRecordStatus.get().complete(null); + remove.complete(null); } failCounter.run(); diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java index ca0241c0f..ee8956e0c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java @@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.header.internals.RecordHeader; import org.junit.jupiter.api.Test; import org.testcontainers.containers.CassandraContainer; import org.testcontainers.junit.jupiter.Container; @@ -228,7 +229,7 @@ public void testCassandraCassioSchema() throws Exception { keyspace: "cassio" datasource: "CassandraDatasource" create-statements: - - "CREATE KEYSPACE cassio WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};" + - "CREATE KEYSPACE IF NOT EXISTS cassio WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};" delete-statements: - "DROP KEYSPACE IF EXISTS cassio;" - name: "documents-table" @@ -240,32 +241,75 @@ public void testCassandraCassioSchema() throws Exception { datasource: "CassandraDatasource" create-statements: - | - CREATE TABLE cassio.documents ( + CREATE TABLE IF NOT EXISTS cassio.documents ( row_id text PRIMARY KEY, attributes_blob text, body_blob text, metadata_s map , vector vector ); + - | + CREATE INDEX IF NOT EXISTS documents_metadata ON cassio.documents (ENTRIES(metadata_s)); topics: - name: "input-topic-cassio" creation-mode: create-if-not-exists pipeline: - id: step1 - type: compute - name: "Compute metadata" + name: "Split into chunks" + type: "text-splitter" input: "input-topic-cassio" + configuration: + chunk_size: 10 + chunk_overlap: 0 + keep_separator: true + length_function: "length" + - name: "Convert to structured data" + type: "document-to-json" + configuration: + text-field: text + copy-properties: true + - name: "Find old chunks" + type: "query" + configuration: + datasource: "CassandraDatasource" + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + mode: "query" + query: "SELECT row_id, metadata_s['chunk_id'] as chunk_id FROM cassio.documents WHERE metadata_s['filename'] = ?" + output-field: "value.stale_chunks" + fields: + - "properties.filename" + - name: "Delete stale chunks" + type: "query" + configuration: + datasource: "CassandraDatasource" + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + loop-over: "value.stale_chunks" + mode: "execute" + query: "DELETE FROM cassio.documents WHERE row_id = ?" + output-field: "value.delete-results" + fields: + - "record.row_id" + - type: compute + name: "Compute metadata" configuration: fields: - name: "value.metadata_s" - expression: "fn:mapOf('filename', value.filename, 'chunk_id', value.chunk_id)" + expression: "fn:mapOf('filename', properties.filename, 'chunk_id', properties.chunk_id)" + - name: "value.row_id" + expression: "fn:concat(properties.filename, '-', properties.chunk_id)" + - name: "value.vector" + expression: "fn:listOf(0,1,2,3,4)" + - name: "value.attributes_blob" + expression: "fn:str('')" + - type: "log-event" + name: "Log event" - name: "Write a new record to Cassandra" type: "vector-db-sink" configuration: datasource: "CassandraDatasource" table-name: "documents" keyspace: "cassio" - mapping: "row_id=value.row_id,attributes_blob=value.attributes_blob,body_blob=value.body_blob,metadata_s=value.metadata_s,vector=value.vector" + mapping: "row_id=value.row_id,attributes_blob=value.attributes_blob,body_blob=value.text,metadata_s=value.metadata_s,vector=value.vector" """); try (ApplicationRuntime applicationRuntime = @@ -273,18 +317,13 @@ public void testCassandraCassioSchema() throws Exception { tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer(); ) { + String filename = "doc.txt"; sendMessage( "input-topic-cassio", + filename, """ - { - "row_id": "some id", - "attributes_blob": "some attributes", - "body_blob": "some text", - "filename": "doc.pdf", - "chunk_id": 1, - "vector": [1.0, 2.0, 3.0, 4.0, 5.0] - } - """, + This is some very long long long long long long long long long long long long text""", + List.of(new RecordHeader("filename", filename.getBytes())), producer); executeAgentRunners(applicationRuntime); @@ -307,7 +346,35 @@ public void testCassandraCassioSchema() throws Exception { row.getObject(i)); } }); - assertEquals(1, all.size()); + assertEquals(9, all.size()); + } + + sendMessage( + "input-topic-cassio", + filename, + """ + Now the text is shorter""", + List.of(new RecordHeader("filename", filename.getBytes())), + producer); + + executeAgentRunners(applicationRuntime); + + try (CqlSession cqlSession = builder.build(); ) { + ResultSet execute = cqlSession.execute("SELECT * FROM cassio.documents"); + List all = execute.all(); + log.info("final records {}", all); + all.forEach( + row -> { + log.info("row id {}", row.get("row_id", String.class)); + ColumnDefinitions columnDefinitions = row.getColumnDefinitions(); + for (int i = 0; i < columnDefinitions.size(); i++) { + log.info( + "column {} value {}", + columnDefinitions.get(i).getName(), + row.getObject(i)); + } + }); + assertEquals(3, all.size()); } applicationDeployer.cleanup(tenant, applicationRuntime.implementation());