Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
Improve error handling and logging in voyage embedding service
Browse files Browse the repository at this point in the history
  • Loading branch information
cdbartholomew committed Apr 13, 2024
1 parent c194ba1 commit 5231146
Showing 1 changed file with 81 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -88,18 +90,14 @@ public VoyageEmbeddingService(VoyageApiConfig conf) throws MalformedURLException
this.token = conf.accessKey;
this.modelUrl = new URL(conf.vgUrl);

this.httpClient =
HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1) // Force HTTP/1.1
.build();
this.httpClient = HttpClient.newBuilder().build();
}

@Override
public CompletableFuture<List<List<Double>>> computeEmbeddings(List<String> texts) {
VoyagePojo.VoyagePojoBuilder pojoBuilder =
VoyagePojo.builder().input(texts).model(this.model);

// Conditionally add properties if they are not null
if (this.conf.input_type != null) {
pojoBuilder.inputType(this.conf.input_type);
}
Expand All @@ -111,75 +109,93 @@ public CompletableFuture<List<List<Double>>> computeEmbeddings(List<String> text
}

VoyagePojo pojo = pojoBuilder.build();
String jsonContent;
try {
String jsonContent = om.writeValueAsString(pojo);

CompletableFuture<String> bodyHandle = query(jsonContent);
return bodyHandle.thenApply(
body -> {
log.info("Got a query response from model {}", model);
try {
JsonNode rootNode = om.readTree(body);
JsonNode dataNode = rootNode.path("data");

List<List<Double>> embeddings = new ArrayList<>();
if (dataNode.isArray()) {
for (JsonNode dataItem : dataNode) {
JsonNode embeddingNode = dataItem.path("embedding");
if (embeddingNode.isArray()) {
List<Double> embedding = new ArrayList<>();
for (JsonNode value : embeddingNode) {
embedding.add(value.asDouble());
jsonContent = om.writeValueAsString(pojo);
} catch (Exception e) {
log.error("Failed to serialize request", e);
return CompletableFuture.failedFuture(e);
}

CompletableFuture<String> bodyHandle = query(jsonContent);
return bodyHandle
.thenApply(
body -> {
log.info("Got a query response from model {}", model);
try {
JsonNode rootNode = om.readTree(body);
JsonNode dataNode = rootNode.path("data");

List<List<Double>> embeddings = new ArrayList<>();
if (dataNode.isArray()) {
for (JsonNode dataItem : dataNode) {
JsonNode embeddingNode = dataItem.path("embedding");
if (embeddingNode.isArray()) {
List<Double> embedding = new ArrayList<>();
for (JsonNode value : embeddingNode) {
embedding.add(value.asDouble());
}
embeddings.add(embedding);
}
embeddings.add(embedding);
}
}
return embeddings;
} catch (Exception e) {
log.error("Error processing JSON", e);
throw new RuntimeException("Error processing JSON", e);
}
return embeddings;
} catch (Exception e) {
log.error("Error processing JSON", e);
throw new RuntimeException(e);
}
});
} catch (Exception e) {
log.error("Failed to send or serialize request", e);
})
.exceptionally(
ex -> {
log.error("Failed to process embeddings", ex);
throw new CompletionException(ex);
});
}

private CompletableFuture<String> query(String jsonPayload) {
HttpRequest request;
try {
request =
HttpRequest.newBuilder()
.uri(modelUrl.toURI())
.header("Authorization", "Bearer " + token)
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
.build();
} catch (URISyntaxException e) {
log.error("Invalid URI: {}", modelUrl, e);
return CompletableFuture.failedFuture(e);
}
}

private CompletableFuture<String> query(String jsonPayload) throws Exception {
HttpRequest request =
HttpRequest.newBuilder()
.uri(modelUrl.toURI())
.header("Authorization", "Bearer " + token)
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
.build();
CompletableFuture<HttpResponse<String>> responseHandle =
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
return responseHandle.thenApply(
response -> {
if (log.isDebugEnabled()) {
log.debug(
"Model {} query response is {} {}",
model,
response,
response.body());
}

if (response.statusCode() != 200) {
log.warn(
"Model {} query failed with {} {}",
model,
response,
response.body());
throw new RuntimeException(
"Model "
+ model
+ " query failed with status "
+ response.statusCode());
}

return response.body();
});

return responseHandle
.thenApply(
response -> {
if (response.statusCode() != 200) {
log.error(
"Model {} query failed with status {}: {}",
model,
response.statusCode(),
response.body());
throw new RuntimeException(
"Model query failed with status " + response.statusCode());
}
return response.body();
})
.exceptionally(
ex -> {
log.error("Failed to process the model query", ex);
log.error("Request URI: {}", request.uri());
log.error("Payload: {}", jsonPayload);
if (ex instanceof CompletionException && ex.getCause() != null) {
Throwable cause = ex.getCause();
log.error(
"Underlying exception: {} {}",
cause.getClass(),
cause.getMessage());
}
throw new RuntimeException("Failed to process the model query", ex);
});
}
}

0 comments on commit 5231146

Please sign in to comment.