From 78865877409a8929ca5e8e22b5b053993ff9e848 Mon Sep 17 00:00:00 2001 From: Ben <62250174+benfrank241@users.noreply.github.com> Date: Fri, 14 Jun 2024 11:26:37 -0400 Subject: [PATCH] Couchbase-disable-test (#67) Disabled couchbase writer test --------- Co-authored-by: benfrank241 --- .../vector/couchbase/CouchbaseDataSource.java | 2 +- .../vector/couchbase/CouchbaseWriter.java | 145 ++++++++---------- .../datasource/impl/CouchbaseWriterTest.java | 3 +- 3 files changed, 65 insertions(+), 85 deletions(-) diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java index 502b50cee..8a00d994b 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java @@ -216,7 +216,7 @@ public List> fetchData(String query, List params) { private double computeCosineSimilarity(float[] vector1, double[] vector2) { // Log the first 5 elements of each vector and the operation - log.info( + log.debug( "Vector1 (first 5 elements): {}..., Vector2 (first 5 elements): {}..., Computing cosine similarity between vectors", Arrays.toString(Arrays.copyOfRange(vector1, 0, 5)), Arrays.toString(Arrays.copyOfRange(vector2, 0, 5))); diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java index 5254a04da..3719f9654 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java @@ -106,92 +106,71 @@ public void initialise(Map agentConfiguration) throws Exception public CompletableFuture upsert(Record record, Map context) { CompletableFuture handle = new CompletableFuture<>(); - return CompletableFuture.runAsync( - () -> { - try { - - MutableRecord mutableRecord = - recordToMutableRecord(record, true); - - // Evaluate the ID using the idFunction - String docId = - idFunction != null - ? (String) idFunction.evaluate(mutableRecord) - : null; - - if (docId == null) { - throw new IllegalArgumentException( - "docId is null, cannot upsert document"); - } - - String bucketS = - bucketName != null - ? (String) bucketName.evaluate(mutableRecord) - : null; - String scopeS = - scopeName != null - ? (String) scopeName.evaluate(mutableRecord) - : null; - String collectionS = - collectionName != null - ? (String) - collectionName.evaluate(mutableRecord) - : null; - - // Get the bucket, scope, and collection - Bucket bucket = cluster.bucket(bucketS); - bucket.waitUntilReady(Duration.ofSeconds(10)); - - Scope scope = bucket.scope(scopeS); - collection = scope.collection(collectionS); - - // Prepare the content map - Map content = new HashMap<>(); - - // Add the vector embedding - List vector = - vectorFunction != null - ? (List) - vectorFunction.evaluate(mutableRecord) - : null; - if (vector != null) { - content.put("vector", vector); - } - - // Add metadata - metadataFunctions.forEach( - (key, evaluator) -> { - Object value = evaluator.evaluate(mutableRecord); - content.put(key, value); - }); - - // Perform the upsert - MutationResult result = - collection.upsert( - docId, content, UpsertOptions.upsertOptions()); - - // Logging the result of the upsert operation - log.info("Upsert successful for document ID '{}'", docId); - - handle.complete(null); // Completing the future successfully - } catch (Exception e) { - log.error( - "Failed to upsert document with ID '{}'", - record.key(), - e); - handle.completeExceptionally( - e); // Completing the future exceptionally - } - }) - .exceptionally( - e -> { - log.error("Exception in upsert operation: ", e); - return null; - }); + try { + + MutableRecord mutableRecord = recordToMutableRecord(record, true); + + // Evaluate the ID using the idFunction + String docId = + idFunction != null ? (String) idFunction.evaluate(mutableRecord) : null; + + if (docId == null) { + throw new IllegalArgumentException("docId is null, cannot upsert document"); + } + + String bucketS = + bucketName != null ? (String) bucketName.evaluate(mutableRecord) : null; + String scopeS = + scopeName != null ? (String) scopeName.evaluate(mutableRecord) : null; + String collectionS = + collectionName != null + ? (String) collectionName.evaluate(mutableRecord) + : null; + + // Get the bucket, scope, and collection + Bucket bucket = cluster.bucket(bucketS); + bucket.waitUntilReady(Duration.ofSeconds(10)); + + Scope scope = bucket.scope(scopeS); + collection = scope.collection(collectionS); + + // Prepare the content map + Map content = new HashMap<>(); + + // Add the vector embedding + List vector = + vectorFunction != null + ? (List) vectorFunction.evaluate(mutableRecord) + : null; + if (vector != null) { + content.put("vector", vector); + } + + // Add metadata + metadataFunctions.forEach( + (key, evaluator) -> { + Object value = evaluator.evaluate(mutableRecord); + content.put(key, value); + }); + + // Perform the upsert + MutationResult result = + collection.upsert(docId, content, UpsertOptions.upsertOptions()); + + // Logging the result of the upsert operation + log.info("Upsert successful for document ID '{}'", docId); + + handle.complete(null); // Completing the future successfully + } catch (Exception e) { + log.error("Failed to upsert document with ID '{}'", record.key(), e); + handle.completeExceptionally(e); // Completing the future exceptionally + } + + return handle; } } - private static JstlEvaluator buildEvaluator( + public static JstlEvaluator buildEvaluator( Map agentConfiguration, String param, Class type) { String expression = agentConfiguration.getOrDefault(param, "").toString(); if (expression == null || expression.isEmpty()) { diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java index e68ae9556..06d167bc9 100644 --- a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.couchbase.BucketDefinition; @@ -56,7 +57,7 @@ @Slf4j @Testcontainers -// @Disabled +@Disabled class CouchbaseWriterTest { BucketDefinition bucketDefinition = new BucketDefinition("bucket-name");