diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java index cefc3e04c..2f9bcd21e 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.net.URL; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -26,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -88,10 +90,7 @@ public VoyageEmbeddingService(VoyageApiConfig conf) throws MalformedURLException this.token = conf.accessKey; this.modelUrl = new URL(conf.vgUrl); - this.httpClient = - HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_1_1) // Force HTTP/1.1 - .build(); + this.httpClient = HttpClient.newBuilder().build(); } @Override @@ -99,7 +98,6 @@ public CompletableFuture>> computeEmbeddings(List text VoyagePojo.VoyagePojoBuilder pojoBuilder = VoyagePojo.builder().input(texts).model(this.model); - // Conditionally add properties if they are not null if (this.conf.input_type != null) { pojoBuilder.inputType(this.conf.input_type); } @@ -111,75 +109,93 @@ public CompletableFuture>> computeEmbeddings(List text } VoyagePojo pojo = pojoBuilder.build(); + String jsonContent; try { - String jsonContent = om.writeValueAsString(pojo); - - CompletableFuture bodyHandle = query(jsonContent); - return bodyHandle.thenApply( - body -> { - log.info("Got a query response from model {}", model); - try { - JsonNode rootNode = om.readTree(body); - JsonNode dataNode = rootNode.path("data"); - - List> embeddings = new ArrayList<>(); - if (dataNode.isArray()) { - for (JsonNode dataItem : dataNode) { - JsonNode embeddingNode = dataItem.path("embedding"); - if (embeddingNode.isArray()) { - List embedding = new ArrayList<>(); - for (JsonNode value : embeddingNode) { - embedding.add(value.asDouble()); + jsonContent = om.writeValueAsString(pojo); + } catch (Exception e) { + log.error("Failed to serialize request", e); + return CompletableFuture.failedFuture(e); + } + + CompletableFuture bodyHandle = query(jsonContent); + return bodyHandle + .thenApply( + body -> { + log.info("Got a query response from model {}", model); + try { + JsonNode rootNode = om.readTree(body); + JsonNode dataNode = rootNode.path("data"); + + List> embeddings = new ArrayList<>(); + if (dataNode.isArray()) { + for (JsonNode dataItem : dataNode) { + JsonNode embeddingNode = dataItem.path("embedding"); + if (embeddingNode.isArray()) { + List embedding = new ArrayList<>(); + for (JsonNode value : embeddingNode) { + embedding.add(value.asDouble()); + } + embeddings.add(embedding); } - embeddings.add(embedding); } } + return embeddings; + } catch (Exception e) { + log.error("Error processing JSON", e); + throw new RuntimeException("Error processing JSON", e); } - return embeddings; - } catch (Exception e) { - log.error("Error processing JSON", e); - throw new RuntimeException(e); - } - }); - } catch (Exception e) { - log.error("Failed to send or serialize request", e); + }) + .exceptionally( + ex -> { + log.error("Failed to process embeddings", ex); + throw new CompletionException(ex); + }); + } + + private CompletableFuture query(String jsonPayload) { + HttpRequest request; + try { + request = + HttpRequest.newBuilder() + .uri(modelUrl.toURI()) + .header("Authorization", "Bearer " + token) + .POST(HttpRequest.BodyPublishers.ofString(jsonPayload)) + .build(); + } catch (URISyntaxException e) { + log.error("Invalid URI: {}", modelUrl, e); return CompletableFuture.failedFuture(e); } - } - private CompletableFuture query(String jsonPayload) throws Exception { - HttpRequest request = - HttpRequest.newBuilder() - .uri(modelUrl.toURI()) - .header("Authorization", "Bearer " + token) - .POST(HttpRequest.BodyPublishers.ofString(jsonPayload)) - .build(); CompletableFuture> responseHandle = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); - return responseHandle.thenApply( - response -> { - if (log.isDebugEnabled()) { - log.debug( - "Model {} query response is {} {}", - model, - response, - response.body()); - } - - if (response.statusCode() != 200) { - log.warn( - "Model {} query failed with {} {}", - model, - response, - response.body()); - throw new RuntimeException( - "Model " - + model - + " query failed with status " - + response.statusCode()); - } - - return response.body(); - }); + + return responseHandle + .thenApply( + response -> { + if (response.statusCode() != 200) { + log.error( + "Model {} query failed with status {}: {}", + model, + response.statusCode(), + response.body()); + throw new RuntimeException( + "Model query failed with status " + response.statusCode()); + } + return response.body(); + }) + .exceptionally( + ex -> { + log.error("Failed to process the model query", ex); + log.error("Request URI: {}", request.uri()); + log.error("Payload: {}", jsonPayload); + if (ex instanceof CompletionException && ex.getCause() != null) { + Throwable cause = ex.getCause(); + log.error( + "Underlying exception: {} {}", + cause.getClass(), + cause.getMessage()); + } + throw new RuntimeException("Failed to process the model query", ex); + }); } }