Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
feat: add api gateway metrics (#124)
Browse files Browse the repository at this point in the history
added timer metric for http gateway calls: 

```
langstream_gateways_http_requests_seconds_count{application="application1",gateway="svc",response_status_code="500",tenant="tenant1",} 1.0
langstream_gateways_http_requests_seconds{application="application1",gateway="svc",response_status_code="500",tenant="tenant1",quantile="0.99",} 1.140850688
```

The prometheus endpoint is /management/prometheus
  • Loading branch information
nicoloboschi authored Aug 20, 2024
1 parent ca593ee commit 9e3b7a0
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@
import com.couchbase.client.java.manager.bucket.BucketType;
import com.couchbase.client.java.manager.collection.CollectionSpec;
import com.couchbase.client.java.manager.collection.ScopeSpec;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CouchbaseAssetsManagerProvider implements AssetManagerProvider {

@Override
Expand Down Expand Up @@ -123,8 +124,7 @@ public boolean assetExists() throws Exception {
}
return false;
} catch (Exception e) {
// Handle exceptions if any issues occur
e.printStackTrace();
log.warn("Failed to check if collection exists", e);
return false;
}
}
Expand All @@ -133,7 +133,7 @@ public boolean assetExists() throws Exception {
public void deployAsset() throws Exception {
// Check and create bucket if it doesn't exist
if (!bucketExists()) {
System.out.println("Creating bucket " + bucketName);
log.info("Creating bucket {}", bucketName);
cluster.buckets()
.createBucket(
BucketSettings.create(bucketName)
Expand All @@ -145,25 +145,24 @@ public void deployAsset() throws Exception {

// Check and create scope if it doesn't exist
if (!scopeExists()) {
System.out.println("Creating scope " + scopeName);
log.info("Creating scope {}", scopeName);
bucket.collections().createScope(scopeName);
}

// Check and create collection if it doesn't exist
if (!assetExists()) {
System.out.println("Creating collection " + collectionName);
log.info("Creating collection {}", collectionName);
bucket.collections().createCollection(scopeName, collectionName);
}
// check if search index exists
if (!searchIndexExists()) {

int vectorDimension = getVectorDimension();

System.out.println(
"Creating vector search index for collection "
+ collectionName
+ "connection string"
+ connectionString);
log.info(
"Creating vector search index for collection {}, connection string {}",
collectionName,
connectionString);
createVectorSearchIndex(
scopeName,
collectionName,
Expand Down Expand Up @@ -199,8 +198,7 @@ private boolean searchIndexExists() {

// Check if there are exactly 2 or 3 scopes
if (scopes.size() != 2 && scopes.size() != 3) {
System.out.println(
"There must be exactly 2 or 3 scopes, found: " + scopes.size());
log.warn("There must be exactly 2 or 3 scopes, found: {}", scopes.size());
return true;
}

Expand All @@ -217,30 +215,32 @@ private boolean searchIndexExists() {
} else if (scope.name().equals(scopeName)) {
hasCustomScope = true;
} else {
System.out.println("Unexpected scope found: " + scope.name());
log.error("Unexpected scope found: {}", scope.name());
return true;
}
}

// Validate scope names based on the number of scopes
if (scopes.size() == 2) {
if (!hasDefaultScope || !hasSystemScope) {
System.out.println(
"When there are 2 scopes, they must be _default and _system.");
log.warn(
"When there are 2 scopes, they must be _default and _system, found {}",
scopes);
return true;
}
} else if (scopes.size() == 3) {
if (!hasDefaultScope || !hasSystemScope || !hasCustomScope) {
System.out.println(
"When there are 3 scopes, they must be _default, _system, and "
+ scopeName);
log.warn(
"When there are 3 scopes, they must be _default, _system, and {}, found {}",
scopeName,
scopes);
return true;
}
}

return false;
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to check if search index exists, assuming exists", e);
return true;
}
}
Expand Down Expand Up @@ -298,19 +298,19 @@ private void createVectorSearchIndex(
+ " }\n"
+ " }\n"
+ "}";

System.out.println(
"Creating vector search index " + indexName + " on host " + connectionString);

log.info(
"Creating vector search index: {}, connection string {}",
indexName,
connectionString);
String host =
connectionString
.replace("couchbases://", "")
.replace("couchbase://", "")
.split(":")[0];
System.out.println("Extracted host: " + host);
log.info("Extracted host: {}", host);

String urlStr =
"https://" // for testing use http
"https://"
+ host
+ ":"
+ port
Expand All @@ -321,30 +321,24 @@ private void createVectorSearchIndex(
+ "/index/"
+ indexLabel;

System.out.println("Constructed URL: " + urlStr);
URL url = new URL(urlStr);
HttpURLConnection httpConn = (HttpURLConnection) url.openConnection();
httpConn.setDoOutput(true);
httpConn.setRequestMethod("PUT");
httpConn.setRequestProperty("Content-Type", "application/json");
httpConn.setRequestProperty(
"Authorization",
"Basic "
+ Base64.getEncoder()
.encodeToString(
(username + ":" + password)
.getBytes(StandardCharsets.UTF_8)));
httpConn.getOutputStream().write(indexDefinition.getBytes(StandardCharsets.UTF_8));
httpConn.getOutputStream().flush();
httpConn.getOutputStream().close();

int responseCode = httpConn.getResponseCode();
log.info("Using URL: {}", urlStr);
HttpClient httpClient = HttpClient.newHttpClient();
String basicAuth =
Base64.getEncoder()
.encodeToString(
(username + ":" + password).getBytes(StandardCharsets.UTF_8));
HttpResponse<String> response =
httpClient.send(
HttpRequest.newBuilder()
.uri(URI.create(urlStr))
.header("Content-Type", "application/json")
.header("Authorization", "Basic " + basicAuth)
.PUT(HttpRequest.BodyPublishers.ofString(indexDefinition))
.build(),
HttpResponse.BodyHandlers.ofString());
int responseCode = response.statusCode();
if (responseCode != 200) {
InputStream errorStream = httpConn.getErrorStream();
String errorMessage =
new BufferedReader(new InputStreamReader(errorStream))
.lines()
.collect(Collectors.joining("\n"));
String errorMessage = response.body();
throw new IOException(
"Failed to create index: HTTP response code "
+ responseCode
Expand All @@ -357,23 +351,20 @@ private void createVectorSearchIndex(
public boolean deleteAssetIfExists() throws Exception {
try {
if (assetExists()) {
System.out.println(
"Deleting collection " + collectionName + " in scope " + scopeName);
log.info("Deleting collection {} in scope {}", collectionName, scopeName);
bucket.collections().dropCollection(scopeName, collectionName);
return true;
}
return false;
} catch (Exception e) {
// Handle exceptions if any issues occur
e.printStackTrace();
log.error("Failed to delete collection", e);
return false;
}
}

@Override
public void close() {
if (cluster != null) {
System.out.println("Closing Couchbase cluster connection");
cluster.disconnect();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicConnectionsRuntime;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.metrics.MetricsNames;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
Expand All @@ -29,15 +29,15 @@ public class TopicConnectionsRuntimeCacheFactory {

@Bean(destroyMethod = "close")
public TopicConnectionsRuntimeCache topicConnectionsRuntimeCache(
TopicProperties topicProperties) {
TopicProperties topicProperties, ApiGatewayMetrics apiGatewayMetrics) {
if (topicProperties.isConnectionsRuntimeCacheEnabled()) {
final LRUTopicConnectionsRuntimeCache cache =
new LRUTopicConnectionsRuntimeCache(
topicProperties.getConnectionsRuntimeCacheSize());
GuavaCacheMetrics.monitor(
Metrics.globalRegistry,
apiGatewayMetrics.getMeterRegistry(),
cache.getCache(),
MetricsNames.TOPIC_CONNECTIONS_RUNTIME_CACHE);
MetricsNames.GUAVA_CACHE_TOPIC_CONNECTIONS_RUNTIME_CACHE);
return cache;
} else {
return new TopicConnectionsRuntimeCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.metrics.MetricsNames;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
Expand All @@ -28,12 +28,15 @@
public class TopicProducerCacheFactory {

@Bean(destroyMethod = "close")
public TopicProducerCache topicProducerCache(TopicProperties topicProperties) {
public TopicProducerCache topicProducerCache(
TopicProperties topicProperties, ApiGatewayMetrics apiGatewayMetrics) {
if (topicProperties.isProducersCacheEnabled()) {
final LRUTopicProducerCache cache =
new LRUTopicProducerCache(topicProperties.getProducersCacheSize());
GuavaCacheMetrics.monitor(
Metrics.globalRegistry, cache.getCache(), MetricsNames.TOPIC_PRODUCER_CACHE);
apiGatewayMetrics.getMeterRegistry(),
cache.getCache(),
MetricsNames.GUAVA_CACHE_TOPIC_PRODUCER);
return cache;
} else {
return new TopicProducerCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import ai.langstream.apigateway.api.ProduceRequest;
import ai.langstream.apigateway.api.ProduceResponse;
import ai.langstream.apigateway.gateways.*;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean;
import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GatewayResource {
private final TopicConnectionsRuntimeCache topicConnectionsRuntimeCache;
private final ApplicationStore applicationStore;
private final GatewayRequestHandler gatewayRequestHandler;
private final ApiGatewayMetrics apiGatewayMetrics;
private final ExecutorService httpClientThreadPool =
Executors.newCachedThreadPool(
new BasicThreadFactory.Builder().namingPattern("http-client-%d").build());
Expand All @@ -105,6 +107,7 @@ ProduceResponse produce(
@NotBlank @PathVariable("gateway") String gateway,
@RequestBody String payload)
throws ProduceGateway.ProduceException {
io.micrometer.core.instrument.Timer.Sample sample = apiGatewayMetrics.startTimer();

final Map<String, String> queryString = computeQueryString(request);
final Map<String, String> headers = computeHeaders(request);
Expand Down Expand Up @@ -143,6 +146,8 @@ ProduceResponse produce(
final ProducePayload producePayload =
parseProducePayload(request, payload, producePayloadSchema);
produceGateway.produceMessage(producePayload.toProduceRequest());
apiGatewayMetrics.recordHttpGatewayRequest(
sample, tenant, application, gateway, HttpStatus.OK.value());
return ProduceResponse.OK;
}
}
Expand Down Expand Up @@ -238,6 +243,7 @@ private CompletableFuture<ResponseEntity> handleServiceCall(
String application,
String gateway)
throws IOException, ProduceGateway.ProduceException {
io.micrometer.core.instrument.Timer.Sample sample = apiGatewayMetrics.startTimer();
final Map<String, String> queryString = computeQueryString(request);
final Map<String, String> headers = computeHeaders(request);
final GatewayRequestContext context =
Expand Down Expand Up @@ -269,7 +275,17 @@ public void validateOptions(Map<String, String> options) {}
context.tenant(),
context.applicationId(),
context.gateway().getServiceOptions().getAgentId());
return forwardTo(uri, servletRequest.getMethod(), servletRequest);
return forwardTo(uri, servletRequest.getMethod(), servletRequest)
.thenApply(
response -> {
apiGatewayMetrics.recordHttpGatewayRequest(
sample,
tenant,
application,
gateway,
response.getStatusCode().value());
return response;
});
} else {
if (!servletRequest.getMethod().equalsIgnoreCase("post")) {
throw new ResponseStatusException(
Expand All @@ -285,7 +301,17 @@ public void validateOptions(Map<String, String> options) {}
: context.gateway().getServiceOptions().getPayloadSchema();
final ProducePayload producePayload =
parseProducePayload(request, payload, producePayloadSchema);
return handleServiceWithTopics(producePayload, authContext);
return handleServiceWithTopics(producePayload, authContext)
.thenApply(
response -> {
apiGatewayMetrics.recordHttpGatewayRequest(
sample,
tenant,
application,
gateway,
response.getStatusCode().value());
return response;
});
}
}

Expand Down
Loading

0 comments on commit 9e3b7a0

Please sign in to comment.