Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
feat: add api gateway metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Aug 20, 2024
1 parent ca593ee commit d6108b8
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicConnectionsRuntime;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.metrics.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.metrics.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import ai.langstream.apigateway.api.ProduceRequest;
import ai.langstream.apigateway.api.ProduceResponse;
import ai.langstream.apigateway.gateways.*;
import ai.langstream.apigateway.metrics.ApiGatewayMetrics;
import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean;
import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GatewayResource {
private final TopicConnectionsRuntimeCache topicConnectionsRuntimeCache;
private final ApplicationStore applicationStore;
private final GatewayRequestHandler gatewayRequestHandler;
private final ApiGatewayMetrics apiGatewayMetrics;
private final ExecutorService httpClientThreadPool =
Executors.newCachedThreadPool(
new BasicThreadFactory.Builder().namingPattern("http-client-%d").build());
Expand Down Expand Up @@ -195,7 +197,15 @@ CompletableFuture<ResponseEntity> service(
@NotBlank @PathVariable("application") String application,
@NotBlank @PathVariable("gateway") String gateway)
throws Exception {
return handleServiceCall(request, servletRequest, tenant, application, gateway);
return handleServiceCall(request, servletRequest, tenant, application, gateway)
.thenApply(response -> {
apiGatewayMetrics.addHttpGatewayRequest(tenant,
application,
gateway,
servletRequest.getMethod(),
response.getStatusCode().value());
return response;
});
}

@GetMapping(value = GATEWAY_SERVICE_PATH)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ai.langstream.apigateway.metrics;

import ai.langstream.api.model.Gateway;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;

public class ApiGatewayMetrics implements AutoCloseable {

private final static String TAG_TENANT = "tenant";
private final static String TAG_APPLICATION_ID = "application";
private final static String TAG_GATEWAY_ID = "gateway";

public void addHttpGatewayRequest(
String tenant,
String applicationId,
String gatewayId,
String httpMethod,
int responseStatusCode) {
Counter.builder("langstream.gateways.http.requests")
.description("HTTP requests to gateways")
.tag(TAG_TENANT, tenant)
.tag(TAG_APPLICATION_ID, applicationId)
.tag(TAG_GATEWAY_ID, gatewayId)
.tag("http_method", httpMethod)
.tag("response_status_code", responseStatusCode + "")
.register(Metrics.globalRegistry)
.increment();
}

@Override
public void close() throws Exception {
Metrics.globalRegistry.close();
Metrics.globalRegistry.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ai.langstream.apigateway.metrics;

import ai.langstream.api.model.Gateway;
import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.apigateway.config.TopicProperties;
import ai.langstream.apigateway.gateways.LRUTopicProducerCache;
import ai.langstream.apigateway.gateways.TopicProducerCache;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Supplier;

@Configuration
public class ApiGatewayMetricsProvider {


@Bean(destroyMethod = "close")
public ApiGatewayMetrics apiGatewayMetrics() {
return new ApiGatewayMetrics();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.apigateway;
package ai.langstream.apigateway.metrics;

public class MetricsNames {
public static final String TOPIC_PRODUCER_CACHE = "topic_producer_cache";
public static final String TOPIC_CONNECTIONS_RUNTIME_CACHE = "topic_connections_runtime_cache";
public static final String TOPIC_PRODUCER_CACHE = "langstream_topic_producer_cache";
public static final String TOPIC_CONNECTIONS_RUNTIME_CACHE = "langstream_topic_connections_runtime_cache";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ai.langstream.apigateway;

import lombok.SneakyThrows;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ApiGatewayTestUtil {

@SneakyThrows
public static String getPrometheusMetrics(final int port) {
final String url =
"http://localhost:%d/management/prometheus"
.formatted(port);
final HttpRequest request =
HttpRequest.newBuilder(URI.create(url))
.GET()
.build();
final HttpResponse<String> response =
HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
assertEquals(200, response.statusCode());
return response.body();
}

public record ParsedMetric(String name, String value, Map<String, String> labels) {
}


public static List<ParsedMetric> findMetric(String metricName, String metrics) {
return metrics.lines()
.filter(m -> {
if (m.startsWith("#")) {
return false;
}
return m.startsWith(metricName + "{");
})
.map(line -> {
final String[] parts = line.split(" ");
String name = parts[0];
final String value = parts[1];
final String labels = name.substring(name.indexOf("{") + 1, name.indexOf("}"));
name = name.substring(0, name.indexOf("{"));
final Map<String, String> labelMap = new HashMap<>();
for (String label : labels.split(",")) {
final String[] labelParts = label.split("=");
labelMap.put(labelParts[0], labelParts[1].substring(1, labelParts[1].length() - 1));
}
return new ParsedMetric(name, value, labelMap);
})
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package ai.langstream.apigateway.http;

import static ai.langstream.apigateway.ApiGatewayTestUtil.findMetric;
import static ai.langstream.apigateway.ApiGatewayTestUtil.getPrometheusMetrics;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -35,6 +37,7 @@
import ai.langstream.api.runtime.DeployContext;
import ai.langstream.api.runtime.PluginsRegistry;
import ai.langstream.api.storage.ApplicationStore;
import ai.langstream.apigateway.ApiGatewayTestUtil;
import ai.langstream.apigateway.api.ConsumePushMessage;
import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean;
Expand Down Expand Up @@ -793,6 +796,25 @@ void testService() throws Exception {
assertMessageContent(
new MsgRecord(null, "{\"key\":\"my-key\",\"value\":\"my-value\"}", Map.of()),
produceJsonAndGetBody(valueUrl, "{\"key\": \"my-key\", \"value\": \"my-value\"}"));


String metrics = getPrometheusMetrics(port);

List<ApiGatewayTestUtil.ParsedMetric> metricsList = findMetric("langstream_gateways_http_requests_total", metrics);
assertEquals(2, metricsList.size());
for (ApiGatewayTestUtil.ParsedMetric parsedMetric : metricsList) {
assertEquals("langstream_gateways_http_requests_total", parsedMetric.name());
assertEquals("tenant1", parsedMetric.labels().get("tenant"));
assertEquals("application1", parsedMetric.labels().get("application"));
assertEquals("POST", parsedMetric.labels().get("http_method"));
assertEquals("200", parsedMetric.labels().get("response_status_code"));
if (parsedMetric.labels().get("gateway").equals("svc")) {
assertEquals("5.0", parsedMetric.value());
} else {
assertEquals("svc-value", parsedMetric.labels().get("gateway"));
assertEquals("1.0", parsedMetric.value());
}
}
}

@Test
Expand Down Expand Up @@ -842,6 +864,19 @@ void testServiceWithError() throws Exception {
assertEquals(
"{\"type\":\"about:blank\",\"title\":\"Internal Server Error\",\"status\":500,\"detail\":\"the agent failed!\",\"instance\":\"/api/gateways/service/tenant1/application1/svc\"}",
response.body());

String metrics = getPrometheusMetrics(port);

List<ApiGatewayTestUtil.ParsedMetric> metricsList = findMetric("langstream_gateways_http_requests_total", metrics);
assertEquals(1, metricsList.size());
ApiGatewayTestUtil.ParsedMetric parsedMetric = metricsList.get(0);
assertEquals("langstream_gateways_http_requests_total", parsedMetric.name());
assertEquals("tenant1", parsedMetric.labels().get("tenant"));
assertEquals("application1", parsedMetric.labels().get("application"));
assertEquals("POST", parsedMetric.labels().get("http_method"));
assertEquals("500", parsedMetric.labels().get("response_status_code"));
assertEquals("svc", parsedMetric.labels().get("gateway"));
assertEquals("1.0", parsedMetric.value());
}

private void startTopicExchange(
Expand Down

0 comments on commit d6108b8

Please sign in to comment.