Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

feat: add api gateway metrics #124

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@
import com.couchbase.client.java.manager.bucket.BucketType;
import com.couchbase.client.java.manager.collection.CollectionSpec;
import com.couchbase.client.java.manager.collection.ScopeSpec;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CouchbaseAssetsManagerProvider implements AssetManagerProvider {

@Override
Expand Down Expand Up @@ -123,8 +124,7 @@ public boolean assetExists() throws Exception {
}
return false;
} catch (Exception e) {
// Handle exceptions if any issues occur
e.printStackTrace();
log.warn("Failed to check if collection exists", e);
return false;
}
}
Expand All @@ -133,7 +133,7 @@ public boolean assetExists() throws Exception {
public void deployAsset() throws Exception {
// Check and create bucket if it doesn't exist
if (!bucketExists()) {
System.out.println("Creating bucket " + bucketName);
log.info("Creating bucket {}", bucketName);
cluster.buckets()
.createBucket(
BucketSettings.create(bucketName)
Expand All @@ -145,25 +145,24 @@ public void deployAsset() throws Exception {

// Check and create scope if it doesn't exist
if (!scopeExists()) {
System.out.println("Creating scope " + scopeName);
log.info("Creating scope {}", scopeName);
bucket.collections().createScope(scopeName);
}

// Check and create collection if it doesn't exist
if (!assetExists()) {
System.out.println("Creating collection " + collectionName);
log.info("Creating collection {}", collectionName);
bucket.collections().createCollection(scopeName, collectionName);
}
// check if search index exists
if (!searchIndexExists()) {

int vectorDimension = getVectorDimension();

System.out.println(
"Creating vector search index for collection "
+ collectionName
+ "connection string"
+ connectionString);
log.info(
"Creating vector search index for collection {}, connection string {}",
collectionName,
connectionString);
createVectorSearchIndex(
scopeName,
collectionName,
Expand Down Expand Up @@ -199,8 +198,7 @@ private boolean searchIndexExists() {

// Check if there are exactly 2 or 3 scopes
if (scopes.size() != 2 && scopes.size() != 3) {
System.out.println(
"There must be exactly 2 or 3 scopes, found: " + scopes.size());
log.warn("There must be exactly 2 or 3 scopes, found: {}", scopes.size());
return true;
}

Expand All @@ -217,30 +215,32 @@ private boolean searchIndexExists() {
} else if (scope.name().equals(scopeName)) {
hasCustomScope = true;
} else {
System.out.println("Unexpected scope found: " + scope.name());
log.error("Unexpected scope found: {}", scope.name());
return true;
}
}

// Validate scope names based on the number of scopes
if (scopes.size() == 2) {
if (!hasDefaultScope || !hasSystemScope) {
System.out.println(
"When there are 2 scopes, they must be _default and _system.");
log.warn(
"When there are 2 scopes, they must be _default and _system, found {}",
scopes);
return true;
}
} else if (scopes.size() == 3) {
if (!hasDefaultScope || !hasSystemScope || !hasCustomScope) {
System.out.println(
"When there are 3 scopes, they must be _default, _system, and "
+ scopeName);
log.warn(
"When there are 3 scopes, they must be _default, _system, and {}, found {}",
scopeName,
scopes);
return true;
}
}

return false;
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to check if search index exists, assuming exists", e);
return true;
}
}
Expand Down Expand Up @@ -298,19 +298,19 @@ private void createVectorSearchIndex(
+ " }\n"
+ " }\n"
+ "}";

System.out.println(
"Creating vector search index " + indexName + " on host " + connectionString);

log.info(
"Creating vector search index: {}, connection string {}",
indexName,
connectionString);
String host =
connectionString
.replace("couchbases://", "")
.replace("couchbase://", "")
.split(":")[0];
System.out.println("Extracted host: " + host);
log.info("Extracted host: {}", host);

String urlStr =
"https://" // for testing use http
"https://"
+ host
+ ":"
+ port
Expand All @@ -321,30 +321,24 @@ private void createVectorSearchIndex(
+ "/index/"
+ indexLabel;

System.out.println("Constructed URL: " + urlStr);
URL url = new URL(urlStr);
HttpURLConnection httpConn = (HttpURLConnection) url.openConnection();
httpConn.setDoOutput(true);
httpConn.setRequestMethod("PUT");
httpConn.setRequestProperty("Content-Type", "application/json");
httpConn.setRequestProperty(
"Authorization",
"Basic "
+ Base64.getEncoder()
.encodeToString(
(username + ":" + password)
.getBytes(StandardCharsets.UTF_8)));
httpConn.getOutputStream().write(indexDefinition.getBytes(StandardCharsets.UTF_8));
httpConn.getOutputStream().flush();
httpConn.getOutputStream().close();

int responseCode = httpConn.getResponseCode();
log.info("Using URL: {}", urlStr);
HttpClient httpClient = HttpClient.newHttpClient();
String basicAuth =
Base64.getEncoder()
.encodeToString(
(username + ":" + password).getBytes(StandardCharsets.UTF_8));
HttpResponse<String> response =
httpClient.send(
HttpRequest.newBuilder()
.uri(URI.create(urlStr))
.header("Content-Type", "application/json")
.header("Authorization", "Basic " + basicAuth)
.PUT(HttpRequest.BodyPublishers.ofString(indexDefinition))
.build(),
HttpResponse.BodyHandlers.ofString());
int responseCode = response.statusCode();
if (responseCode != 200) {
InputStream errorStream = httpConn.getErrorStream();
String errorMessage =
new BufferedReader(new InputStreamReader(errorStream))
.lines()
.collect(Collectors.joining("\n"));
String errorMessage = response.body();
throw new IOException(
"Failed to create index: HTTP response code "
+ responseCode
Expand All @@ -357,23 +351,20 @@ private void createVectorSearchIndex(
public boolean deleteAssetIfExists() throws Exception {
try {
if (assetExists()) {
System.out.println(
"Deleting collection " + collectionName + " in scope " + scopeName);
log.info("Deleting collection {} in scope {}", collectionName, scopeName);
bucket.collections().dropCollection(scopeName, collectionName);
return true;
}
return false;
} catch (Exception e) {
// Handle exceptions if any issues occur
e.printStackTrace();
log.error("Failed to delete collection", e);
return false;
}
}

@Override
public void close() {
if (cluster != null) {
System.out.println("Closing Couchbase cluster connection");
cluster.disconnect();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicConnectionsRuntime;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.metrics.MetricsNames;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
Expand All @@ -29,15 +29,15 @@ public class TopicConnectionsRuntimeCacheFactory {

@Bean(destroyMethod = "close")
public TopicConnectionsRuntimeCache topicConnectionsRuntimeCache(
TopicProperties topicProperties) {
TopicProperties topicProperties, ApiGatewayMetrics apiGatewayMetrics) {
if (topicProperties.isConnectionsRuntimeCacheEnabled()) {
final LRUTopicConnectionsRuntimeCache cache =
new LRUTopicConnectionsRuntimeCache(
topicProperties.getConnectionsRuntimeCacheSize());
GuavaCacheMetrics.monitor(
Metrics.globalRegistry,
apiGatewayMetrics.getMeterRegistry(),
cache.getCache(),
MetricsNames.TOPIC_CONNECTIONS_RUNTIME_CACHE);
MetricsNames.GUAVA_CACHE_TOPIC_CONNECTIONS_RUNTIME_CACHE);
return cache;
} else {
return new TopicConnectionsRuntimeCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.metrics.MetricsNames;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
Expand All @@ -28,12 +28,15 @@
public class TopicProducerCacheFactory {

@Bean(destroyMethod = "close")
public TopicProducerCache topicProducerCache(TopicProperties topicProperties) {
public TopicProducerCache topicProducerCache(
TopicProperties topicProperties, ApiGatewayMetrics apiGatewayMetrics) {
if (topicProperties.isProducersCacheEnabled()) {
final LRUTopicProducerCache cache =
new LRUTopicProducerCache(topicProperties.getProducersCacheSize());
GuavaCacheMetrics.monitor(
Metrics.globalRegistry, cache.getCache(), MetricsNames.TOPIC_PRODUCER_CACHE);
apiGatewayMetrics.getMeterRegistry(),
cache.getCache(),
MetricsNames.GUAVA_CACHE_TOPIC_PRODUCER);
return cache;
} else {
return new TopicProducerCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import ai.langstream.apigateway.api.ProduceRequest;
import ai.langstream.apigateway.api.ProduceResponse;
import ai.langstream.apigateway.gateways.*;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean;
import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GatewayResource {
private final TopicConnectionsRuntimeCache topicConnectionsRuntimeCache;
private final ApplicationStore applicationStore;
private final GatewayRequestHandler gatewayRequestHandler;
private final ApiGatewayMetrics apiGatewayMetrics;
private final ExecutorService httpClientThreadPool =
Executors.newCachedThreadPool(
new BasicThreadFactory.Builder().namingPattern("http-client-%d").build());
Expand All @@ -105,6 +107,7 @@ ProduceResponse produce(
@NotBlank @PathVariable("gateway") String gateway,
@RequestBody String payload)
throws ProduceGateway.ProduceException {
io.micrometer.core.instrument.Timer.Sample sample = apiGatewayMetrics.startTimer();

final Map<String, String> queryString = computeQueryString(request);
final Map<String, String> headers = computeHeaders(request);
Expand Down Expand Up @@ -143,6 +146,8 @@ ProduceResponse produce(
final ProducePayload producePayload =
parseProducePayload(request, payload, producePayloadSchema);
produceGateway.produceMessage(producePayload.toProduceRequest());
apiGatewayMetrics.recordHttpGatewayRequest(
sample, tenant, application, gateway, HttpStatus.OK.value());
return ProduceResponse.OK;
}
}
Expand Down Expand Up @@ -238,6 +243,7 @@ private CompletableFuture<ResponseEntity> handleServiceCall(
String application,
String gateway)
throws IOException, ProduceGateway.ProduceException {
io.micrometer.core.instrument.Timer.Sample sample = apiGatewayMetrics.startTimer();
final Map<String, String> queryString = computeQueryString(request);
final Map<String, String> headers = computeHeaders(request);
final GatewayRequestContext context =
Expand Down Expand Up @@ -269,7 +275,17 @@ public void validateOptions(Map<String, String> options) {}
context.tenant(),
context.applicationId(),
context.gateway().getServiceOptions().getAgentId());
return forwardTo(uri, servletRequest.getMethod(), servletRequest);
return forwardTo(uri, servletRequest.getMethod(), servletRequest)
.thenApply(
response -> {
apiGatewayMetrics.recordHttpGatewayRequest(
sample,
tenant,
application,
gateway,
response.getStatusCode().value());
return response;
});
} else {
if (!servletRequest.getMethod().equalsIgnoreCase("post")) {
throw new ResponseStatusException(
Expand All @@ -285,7 +301,17 @@ public void validateOptions(Map<String, String> options) {}
: context.gateway().getServiceOptions().getPayloadSchema();
final ProducePayload producePayload =
parseProducePayload(request, payload, producePayloadSchema);
return handleServiceWithTopics(producePayload, authContext);
return handleServiceWithTopics(producePayload, authContext)
.thenApply(
response -> {
apiGatewayMetrics.recordHttpGatewayRequest(
sample,
tenant,
application,
gateway,
response.getStatusCode().value());
return response;
});
}
}

Expand Down
Loading
Loading