Skip to content

Commit

Permalink
More work
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Nov 4, 2023
1 parent 54c2c68 commit 4b32489
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -176,15 +176,15 @@ public void initialise(Map<String, Object> agentConfiguration) {
processor.start(configuration);
}

private final AtomicReference<CompletableFuture<?>> currentRecordStatus =
new AtomicReference<>();
private final Map<Record, CompletableFuture<?>> currentRecordStatus =
new ConcurrentHashMap<>();

@Override
public CompletableFuture<?> upsert(Record record, Map<String, Object> context) {
// we must handle one record at a time
// so we block until the record is processed
CompletableFuture<?> handle = new CompletableFuture();
currentRecordStatus.set(handle);
currentRecordStatus.put(record, handle);
processor.put(List.of(new LangStreamSinkRecordAdapter(record)));
return handle;
}
Expand All @@ -208,7 +208,8 @@ public String applicationName() {
@Override
protected void handleSuccess(AbstractSinkRecord abstractRecord) {
Record record = ((LangStreamSinkRecordAdapter) abstractRecord).getRecord();
currentRecordStatus.get().complete(null);
CompletableFuture<?> remove = currentRecordStatus.remove(record);
remove.complete(null);
}

@Override
Expand Down Expand Up @@ -242,12 +243,14 @@ protected void handleFailure(
log.warn("Error decoding/mapping Kafka record {}: {}", record, e.getMessage());
}

CompletableFuture<?> remove = currentRecordStatus.remove(record);

if (ignoreErrors == CassandraSinkConfig.IgnoreErrorsPolicy.NONE
|| (ignoreErrors == CassandraSinkConfig.IgnoreErrorsPolicy.DRIVER
&& !driverFailure)) {
currentRecordStatus.get().completeExceptionally(e);
remove.completeExceptionally(e);
} else {
currentRecordStatus.get().complete(null);
remove.complete(null);
}

failCounter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.junit.jupiter.Container;
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testCassandraCassioSchema() throws Exception {
keyspace: "cassio"
datasource: "CassandraDatasource"
create-statements:
- "CREATE KEYSPACE cassio WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};"
- "CREATE KEYSPACE IF NOT EXISTS cassio WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};"
delete-statements:
- "DROP KEYSPACE IF EXISTS cassio;"
- name: "documents-table"
Expand All @@ -240,51 +241,89 @@ public void testCassandraCassioSchema() throws Exception {
datasource: "CassandraDatasource"
create-statements:
- |
CREATE TABLE cassio.documents (
CREATE TABLE IF NOT EXISTS cassio.documents (
row_id text PRIMARY KEY,
attributes_blob text,
body_blob text,
metadata_s map <text,text>,
vector vector <float,5>
);
- |
CREATE INDEX IF NOT EXISTS documents_metadata ON cassio.documents (ENTRIES(metadata_s));
topics:
- name: "input-topic-cassio"
creation-mode: create-if-not-exists
pipeline:
- id: step1
type: compute
name: "Compute metadata"
name: "Split into chunks"
type: "text-splitter"
input: "input-topic-cassio"
configuration:
chunk_size: 10
chunk_overlap: 0
keep_separator: true
length_function: "length"
- name: "Convert to structured data"
type: "document-to-json"
configuration:
text-field: text
copy-properties: true
- name: "Find old chunks"
type: "query"
configuration:
datasource: "CassandraDatasource"
when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)"
mode: "query"
query: "SELECT row_id, metadata_s['chunk_id'] as chunk_id FROM cassio.documents WHERE metadata_s['filename'] = ?"
output-field: "value.stale_chunks"
fields:
- "properties.filename"
- name: "Delete stale chunks"
type: "query"
configuration:
datasource: "CassandraDatasource"
when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)"
loop-over: "value.stale_chunks"
mode: "execute"
query: "DELETE FROM cassio.documents WHERE row_id = ?"
output-field: "value.delete-results"
fields:
- "record.row_id"
- type: compute
name: "Compute metadata"
configuration:
fields:
- name: "value.metadata_s"
expression: "fn:mapOf('filename', value.filename, 'chunk_id', value.chunk_id)"
expression: "fn:mapOf('filename', properties.filename, 'chunk_id', properties.chunk_id)"
- name: "value.row_id"
expression: "fn:concat(properties.filename, '-', properties.chunk_id)"
- name: "value.vector"
expression: "fn:listOf(0,1,2,3,4)"
- name: "value.attributes_blob"
expression: "fn:str('')"
- type: "log-event"
name: "Log event"
- name: "Write a new record to Cassandra"
type: "vector-db-sink"
configuration:
datasource: "CassandraDatasource"
table-name: "documents"
keyspace: "cassio"
mapping: "row_id=value.row_id,attributes_blob=value.attributes_blob,body_blob=value.body_blob,metadata_s=value.metadata_s,vector=value.vector"
mapping: "row_id=value.row_id,attributes_blob=value.attributes_blob,body_blob=value.text,metadata_s=value.metadata_s,vector=value.vector"
""");

try (ApplicationRuntime applicationRuntime =
deployApplication(
tenant, "app", application, buildInstanceYaml(), expectedAgents)) {
try (KafkaProducer<String, String> producer = createProducer(); ) {

String filename = "doc.txt";
sendMessage(
"input-topic-cassio",
filename,
"""
{
"row_id": "some id",
"attributes_blob": "some attributes",
"body_blob": "some text",
"filename": "doc.pdf",
"chunk_id": 1,
"vector": [1.0, 2.0, 3.0, 4.0, 5.0]
}
""",
This is some very long long long long long long long long long long long long text""",
List.of(new RecordHeader("filename", filename.getBytes())),
producer);

executeAgentRunners(applicationRuntime);
Expand All @@ -307,7 +346,35 @@ public void testCassandraCassioSchema() throws Exception {
row.getObject(i));
}
});
assertEquals(1, all.size());
assertEquals(9, all.size());
}

sendMessage(
"input-topic-cassio",
filename,
"""
Now the text is shorter""",
List.of(new RecordHeader("filename", filename.getBytes())),
producer);

executeAgentRunners(applicationRuntime);

try (CqlSession cqlSession = builder.build(); ) {
ResultSet execute = cqlSession.execute("SELECT * FROM cassio.documents");
List<Row> all = execute.all();
log.info("final records {}", all);
all.forEach(
row -> {
log.info("row id {}", row.get("row_id", String.class));
ColumnDefinitions columnDefinitions = row.getColumnDefinitions();
for (int i = 0; i < columnDefinitions.size(); i++) {
log.info(
"column {} value {}",
columnDefinitions.get(i).getName(),
row.getObject(i));
}
});
assertEquals(3, all.size());
}

applicationDeployer.cleanup(tenant, applicationRuntime.implementation());
Expand Down

0 comments on commit 4b32489

Please sign in to comment.