diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseAssetsManagerProvider.java index 31d00c971..435880d24 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseAssetsManagerProvider.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseAssetsManagerProvider.java @@ -28,18 +28,19 @@ import com.couchbase.client.java.manager.bucket.BucketType; import com.couchbase.client.java.manager.collection.CollectionSpec; import com.couchbase.client.java.manager.collection.ScopeSpec; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class CouchbaseAssetsManagerProvider implements AssetManagerProvider { @Override @@ -123,8 +124,7 @@ public boolean assetExists() throws Exception { } return false; } catch (Exception e) { - // Handle exceptions if any issues occur - e.printStackTrace(); + log.warn("Failed to check if collection exists", e); return false; } } @@ -133,7 +133,7 @@ public boolean assetExists() throws Exception { public void deployAsset() throws Exception { // Check and create bucket if it doesn't exist if (!bucketExists()) { - System.out.println("Creating bucket " + bucketName); + log.info("Creating bucket {}", bucketName); cluster.buckets() .createBucket( BucketSettings.create(bucketName) @@ -145,13 +145,13 @@ public void deployAsset() throws Exception { // Check and create scope if it doesn't exist if (!scopeExists()) { - System.out.println("Creating scope " + scopeName); + log.info("Creating scope {}", scopeName); bucket.collections().createScope(scopeName); } // Check and create collection if it doesn't exist if (!assetExists()) { - System.out.println("Creating collection " + collectionName); + log.info("Creating collection {}", collectionName); bucket.collections().createCollection(scopeName, collectionName); } // check if search index exists @@ -159,11 +159,10 @@ public void deployAsset() throws Exception { int vectorDimension = getVectorDimension(); - System.out.println( - "Creating vector search index for collection " - + collectionName - + "connection string" - + connectionString); + log.info( + "Creating vector search index for collection {}, connection string {}", + collectionName, + connectionString); createVectorSearchIndex( scopeName, collectionName, @@ -199,8 +198,7 @@ private boolean searchIndexExists() { // Check if there are exactly 2 or 3 scopes if (scopes.size() != 2 && scopes.size() != 3) { - System.out.println( - "There must be exactly 2 or 3 scopes, found: " + scopes.size()); + log.warn("There must be exactly 2 or 3 scopes, found: {}", scopes.size()); return true; } @@ -217,7 +215,7 @@ private boolean searchIndexExists() { } else if (scope.name().equals(scopeName)) { hasCustomScope = true; } else { - System.out.println("Unexpected scope found: " + scope.name()); + log.error("Unexpected scope found: {}", scope.name()); return true; } } @@ -225,22 +223,24 @@ private boolean searchIndexExists() { // Validate scope names based on the number of scopes if (scopes.size() == 2) { if (!hasDefaultScope || !hasSystemScope) { - System.out.println( - "When there are 2 scopes, they must be _default and _system."); + log.warn( + "When there are 2 scopes, they must be _default and _system, found {}", + scopes); return true; } } else if (scopes.size() == 3) { if (!hasDefaultScope || !hasSystemScope || !hasCustomScope) { - System.out.println( - "When there are 3 scopes, they must be _default, _system, and " - + scopeName); + log.warn( + "When there are 3 scopes, they must be _default, _system, and {}, found {}", + scopeName, + scopes); return true; } } return false; } catch (Exception e) { - e.printStackTrace(); + log.error("Failed to check if search index exists, assuming exists", e); return true; } } @@ -298,19 +298,19 @@ private void createVectorSearchIndex( + " }\n" + " }\n" + "}"; - - System.out.println( - "Creating vector search index " + indexName + " on host " + connectionString); - + log.info( + "Creating vector search index: {}, connection string {}", + indexName, + connectionString); String host = connectionString .replace("couchbases://", "") .replace("couchbase://", "") .split(":")[0]; - System.out.println("Extracted host: " + host); + log.info("Extracted host: {}", host); String urlStr = - "https://" // for testing use http + "https://" + host + ":" + port @@ -321,30 +321,24 @@ private void createVectorSearchIndex( + "/index/" + indexLabel; - System.out.println("Constructed URL: " + urlStr); - URL url = new URL(urlStr); - HttpURLConnection httpConn = (HttpURLConnection) url.openConnection(); - httpConn.setDoOutput(true); - httpConn.setRequestMethod("PUT"); - httpConn.setRequestProperty("Content-Type", "application/json"); - httpConn.setRequestProperty( - "Authorization", - "Basic " - + Base64.getEncoder() - .encodeToString( - (username + ":" + password) - .getBytes(StandardCharsets.UTF_8))); - httpConn.getOutputStream().write(indexDefinition.getBytes(StandardCharsets.UTF_8)); - httpConn.getOutputStream().flush(); - httpConn.getOutputStream().close(); - - int responseCode = httpConn.getResponseCode(); + log.info("Using URL: {}", urlStr); + HttpClient httpClient = HttpClient.newHttpClient(); + String basicAuth = + Base64.getEncoder() + .encodeToString( + (username + ":" + password).getBytes(StandardCharsets.UTF_8)); + HttpResponse response = + httpClient.send( + HttpRequest.newBuilder() + .uri(URI.create(urlStr)) + .header("Content-Type", "application/json") + .header("Authorization", "Basic " + basicAuth) + .PUT(HttpRequest.BodyPublishers.ofString(indexDefinition)) + .build(), + HttpResponse.BodyHandlers.ofString()); + int responseCode = response.statusCode(); if (responseCode != 200) { - InputStream errorStream = httpConn.getErrorStream(); - String errorMessage = - new BufferedReader(new InputStreamReader(errorStream)) - .lines() - .collect(Collectors.joining("\n")); + String errorMessage = response.body(); throw new IOException( "Failed to create index: HTTP response code " + responseCode @@ -357,15 +351,13 @@ private void createVectorSearchIndex( public boolean deleteAssetIfExists() throws Exception { try { if (assetExists()) { - System.out.println( - "Deleting collection " + collectionName + " in scope " + scopeName); + log.info("Deleting collection {} in scope {}", collectionName, scopeName); bucket.collections().dropCollection(scopeName, collectionName); return true; } return false; } catch (Exception e) { - // Handle exceptions if any issues occur - e.printStackTrace(); + log.error("Failed to delete collection", e); return false; } } @@ -373,7 +365,6 @@ public boolean deleteAssetIfExists() throws Exception { @Override public void close() { if (cluster != null) { - System.out.println("Closing Couchbase cluster connection"); cluster.disconnect(); } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicConnectionsRuntimeCacheFactory.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicConnectionsRuntimeCacheFactory.java index ec3eb20f1..b8856964d 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicConnectionsRuntimeCacheFactory.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicConnectionsRuntimeCacheFactory.java @@ -16,9 +16,9 @@ package ai.langstream.apigateway.gateways; import ai.langstream.api.runner.topics.TopicConnectionsRuntime; -import ai.langstream.apigateway.MetricsNames; import ai.langstream.apigateway.config.TopicProperties; -import io.micrometer.core.instrument.Metrics; +import ai.langstream.apigateway.metrics.ApiGatewayMetrics; +import ai.langstream.apigateway.metrics.MetricsNames; import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics; import java.util.function.Supplier; import org.springframework.context.annotation.Bean; @@ -29,15 +29,15 @@ public class TopicConnectionsRuntimeCacheFactory { @Bean(destroyMethod = "close") public TopicConnectionsRuntimeCache topicConnectionsRuntimeCache( - TopicProperties topicProperties) { + TopicProperties topicProperties, ApiGatewayMetrics apiGatewayMetrics) { if (topicProperties.isConnectionsRuntimeCacheEnabled()) { final LRUTopicConnectionsRuntimeCache cache = new LRUTopicConnectionsRuntimeCache( topicProperties.getConnectionsRuntimeCacheSize()); GuavaCacheMetrics.monitor( - Metrics.globalRegistry, + apiGatewayMetrics.getMeterRegistry(), cache.getCache(), - MetricsNames.TOPIC_CONNECTIONS_RUNTIME_CACHE); + MetricsNames.GUAVA_CACHE_TOPIC_CONNECTIONS_RUNTIME_CACHE); return cache; } else { return new TopicConnectionsRuntimeCache() { diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java index a98b0e278..87ef38f59 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java @@ -16,9 +16,9 @@ package ai.langstream.apigateway.gateways; import ai.langstream.api.runner.topics.TopicProducer; -import ai.langstream.apigateway.MetricsNames; import ai.langstream.apigateway.config.TopicProperties; -import io.micrometer.core.instrument.Metrics; +import ai.langstream.apigateway.metrics.ApiGatewayMetrics; +import ai.langstream.apigateway.metrics.MetricsNames; import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics; import java.util.function.Supplier; import org.springframework.context.annotation.Bean; @@ -28,12 +28,15 @@ public class TopicProducerCacheFactory { @Bean(destroyMethod = "close") - public TopicProducerCache topicProducerCache(TopicProperties topicProperties) { + public TopicProducerCache topicProducerCache( + TopicProperties topicProperties, ApiGatewayMetrics apiGatewayMetrics) { if (topicProperties.isProducersCacheEnabled()) { final LRUTopicProducerCache cache = new LRUTopicProducerCache(topicProperties.getProducersCacheSize()); GuavaCacheMetrics.monitor( - Metrics.globalRegistry, cache.getCache(), MetricsNames.TOPIC_PRODUCER_CACHE); + apiGatewayMetrics.getMeterRegistry(), + cache.getCache(), + MetricsNames.GUAVA_CACHE_TOPIC_PRODUCER); return cache; } else { return new TopicProducerCache() { diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index f5338ee51..2065f8c47 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -28,6 +28,7 @@ import ai.langstream.apigateway.api.ProduceRequest; import ai.langstream.apigateway.api.ProduceResponse; import ai.langstream.apigateway.gateways.*; +import ai.langstream.apigateway.metrics.ApiGatewayMetrics; import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; import com.fasterxml.jackson.databind.ObjectMapper; @@ -88,6 +89,7 @@ public class GatewayResource { private final TopicConnectionsRuntimeCache topicConnectionsRuntimeCache; private final ApplicationStore applicationStore; private final GatewayRequestHandler gatewayRequestHandler; + private final ApiGatewayMetrics apiGatewayMetrics; private final ExecutorService httpClientThreadPool = Executors.newCachedThreadPool( new BasicThreadFactory.Builder().namingPattern("http-client-%d").build()); @@ -105,6 +107,7 @@ ProduceResponse produce( @NotBlank @PathVariable("gateway") String gateway, @RequestBody String payload) throws ProduceGateway.ProduceException { + io.micrometer.core.instrument.Timer.Sample sample = apiGatewayMetrics.startTimer(); final Map queryString = computeQueryString(request); final Map headers = computeHeaders(request); @@ -143,6 +146,8 @@ ProduceResponse produce( final ProducePayload producePayload = parseProducePayload(request, payload, producePayloadSchema); produceGateway.produceMessage(producePayload.toProduceRequest()); + apiGatewayMetrics.recordHttpGatewayRequest( + sample, tenant, application, gateway, HttpStatus.OK.value()); return ProduceResponse.OK; } } @@ -238,6 +243,7 @@ private CompletableFuture handleServiceCall( String application, String gateway) throws IOException, ProduceGateway.ProduceException { + io.micrometer.core.instrument.Timer.Sample sample = apiGatewayMetrics.startTimer(); final Map queryString = computeQueryString(request); final Map headers = computeHeaders(request); final GatewayRequestContext context = @@ -269,7 +275,17 @@ public void validateOptions(Map options) {} context.tenant(), context.applicationId(), context.gateway().getServiceOptions().getAgentId()); - return forwardTo(uri, servletRequest.getMethod(), servletRequest); + return forwardTo(uri, servletRequest.getMethod(), servletRequest) + .thenApply( + response -> { + apiGatewayMetrics.recordHttpGatewayRequest( + sample, + tenant, + application, + gateway, + response.getStatusCode().value()); + return response; + }); } else { if (!servletRequest.getMethod().equalsIgnoreCase("post")) { throw new ResponseStatusException( @@ -285,7 +301,17 @@ public void validateOptions(Map options) {} : context.gateway().getServiceOptions().getPayloadSchema(); final ProducePayload producePayload = parseProducePayload(request, payload, producePayloadSchema); - return handleServiceWithTopics(producePayload, authContext); + return handleServiceWithTopics(producePayload, authContext) + .thenApply( + response -> { + apiGatewayMetrics.recordHttpGatewayRequest( + sample, + tenant, + application, + gateway, + response.getStatusCode().value()); + return response; + }); } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetrics.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetrics.java new file mode 100644 index 000000000..7d7a02fce --- /dev/null +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetrics.java @@ -0,0 +1,63 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.apigateway.metrics; + +import static ai.langstream.apigateway.metrics.MetricsNames.METRIC_GATEWAYS_HTTP_REQUESTS; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class ApiGatewayMetrics implements AutoCloseable { + + private static final String TAG_TENANT = "tenant"; + private static final String TAG_APPLICATION_ID = "application"; + private static final String TAG_GATEWAY_ID = "gateway"; + + private final MeterRegistry meterRegistry; + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + + public Timer.Sample startTimer() { + return Timer.start(getMeterRegistry()); + } + + public void recordHttpGatewayRequest( + io.micrometer.core.instrument.Timer.Sample sample, + String tenant, + String applicationId, + String gatewayId, + int responseStatusCode) { + Timer timer = + Timer.builder(METRIC_GATEWAYS_HTTP_REQUESTS) + .description("HTTP requests to gateways") + .tag(TAG_TENANT, tenant) + .tag(TAG_APPLICATION_ID, applicationId) + .tag(TAG_GATEWAY_ID, gatewayId) + .tag("response_status_code", responseStatusCode + "") + .publishPercentiles(0.5, 0.95, 0.99) + .register(getMeterRegistry()); + sample.stop(timer); + } + + @Override + public void close() throws Exception { + meterRegistry.clear(); + } +} diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java new file mode 100644 index 000000000..2c6d2b0cd --- /dev/null +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java @@ -0,0 +1,15 @@ +package ai.langstream.apigateway.metrics; + +import io.micrometer.core.instrument.Metrics; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ApiGatewayMetricsProvider { + + @Bean(destroyMethod = "close") + public ApiGatewayMetrics apiGatewayMetrics() { + System.out.println("CALL ApiGatewayMetricsProvider"); + return new ApiGatewayMetrics(Metrics.globalRegistry); + } +} diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/MetricsNames.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/MetricsNames.java similarity index 55% rename from langstream-api-gateway/src/main/java/ai/langstream/apigateway/MetricsNames.java rename to langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/MetricsNames.java index 67b43ccff..e121a2509 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/MetricsNames.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/MetricsNames.java @@ -13,9 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.apigateway; +package ai.langstream.apigateway.metrics; public class MetricsNames { - public static final String TOPIC_PRODUCER_CACHE = "topic_producer_cache"; - public static final String TOPIC_CONNECTIONS_RUNTIME_CACHE = "topic_connections_runtime_cache"; + // guava cache metrics are exposed as label value + public static final String GUAVA_CACHE_TOPIC_PRODUCER = "langstream_topic_producer_cache"; + public static final String GUAVA_CACHE_TOPIC_CONNECTIONS_RUNTIME_CACHE = + "langstream_topic_connections_runtime_cache"; + + // metric names are converted by micrometer to naming conventions + public static final String METRIC_GATEWAYS_HTTP_REQUESTS = "langstream.gateways.http.requests"; } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/ApiGatewayTestUtil.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/ApiGatewayTestUtil.java new file mode 100644 index 000000000..5a63fbbdc --- /dev/null +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/ApiGatewayTestUtil.java @@ -0,0 +1,74 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.apigateway; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.SneakyThrows; + +public class ApiGatewayTestUtil { + + public static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient(); + + @SneakyThrows + public static String getPrometheusMetrics(final int port) { + final String url = "http://localhost:%d/management/prometheus".formatted(port); + final HttpRequest request = HttpRequest.newBuilder(URI.create(url)).GET().build(); + final HttpResponse response = + HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + return response.body(); + } + + public record ParsedMetric(String name, String value, Map labels) {} + + public static List findMetric(String metricName, String metrics) { + return metrics.lines() + .filter( + m -> { + if (m.startsWith("#")) { + return false; + } + return m.startsWith(metricName + "{"); + }) + .map( + line -> { + final String[] parts = line.split(" "); + String name = parts[0]; + final String value = parts[1]; + final String labels = + name.substring(name.indexOf("{") + 1, name.indexOf("}")); + name = name.substring(0, name.indexOf("{")); + final Map labelMap = new HashMap<>(); + for (String label : labels.split(",")) { + final String[] labelParts = label.split("="); + labelMap.put( + labelParts[0], + labelParts[1].substring(1, labelParts[1].length() - 1)); + } + return new ParsedMetric(name, value, labelMap); + }) + .collect(Collectors.toList()); + } +} diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index f2025e352..e38561490 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -15,13 +15,13 @@ */ package ai.langstream.apigateway.http; +import static ai.langstream.apigateway.ApiGatewayTestUtil.findMetric; +import static ai.langstream.apigateway.ApiGatewayTestUtil.getPrometheusMetrics; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import ai.langstream.api.model.*; import ai.langstream.api.runner.code.Header; @@ -35,6 +35,7 @@ import ai.langstream.api.runtime.DeployContext; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.storage.ApplicationStore; +import ai.langstream.apigateway.ApiGatewayTestUtil; import ai.langstream.apigateway.api.ConsumePushMessage; import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties; import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean; @@ -398,35 +399,37 @@ void testSimpleProduceCacheProducer() throws Exception { produceJsonAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); - final String metrics = - mockMvc.perform(get("/management/prometheus")) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(); - - final List cacheMetrics = - metrics.lines() - .filter(l -> l.contains("topic_producer_cache")) - .collect(Collectors.toList()); - System.out.println(cacheMetrics); - assertEquals(5, cacheMetrics.size()); - - for (String cacheMetric : cacheMetrics) { - if (cacheMetric.contains("cache_puts_total")) { - assertTrue(cacheMetric.contains("3.0")); - } else if (cacheMetric.contains("hit")) { - assertTrue(cacheMetric.contains("1.0")); - } else if (cacheMetric.contains("miss")) { - assertTrue(cacheMetric.contains("3.0")); - } else if (cacheMetric.contains("cache_size")) { - assertTrue(cacheMetric.contains("2.0")); - } else if (cacheMetric.contains("cache_evictions_total")) { - assertTrue(cacheMetric.contains("1.0")); - } else { - throw new RuntimeException(cacheMetric); - } - } + Awaitility.await() + .untilAsserted( + () -> { + String metrics = getPrometheusMetrics(port); + System.out.println(metrics); + + final List cacheMetrics = + metrics.lines() + .filter( + l -> + l.contains( + "langstream_topic_producer_cache")) + .collect(Collectors.toList()); + assertEquals(5, cacheMetrics.size()); + + for (String cacheMetric : cacheMetrics) { + if (cacheMetric.contains("cache_puts_total")) { + assertTrue(cacheMetric.contains("3.0")); + } else if (cacheMetric.contains("hit")) { + assertTrue(cacheMetric.contains("1.0")); + } else if (cacheMetric.contains("miss")) { + assertTrue(cacheMetric.contains("3.0")); + } else if (cacheMetric.contains("cache_size")) { + assertTrue(cacheMetric.contains("2.0")); + } else if (cacheMetric.contains("cache_evictions_total")) { + assertTrue(cacheMetric.contains("1.0")); + } else { + throw new RuntimeException(cacheMetric); + } + } + }); } @Test @@ -793,6 +796,47 @@ void testService() throws Exception { assertMessageContent( new MsgRecord(null, "{\"key\":\"my-key\",\"value\":\"my-value\"}", Map.of()), produceJsonAndGetBody(valueUrl, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + + Awaitility.await() + .untilAsserted( + () -> { + String metrics = getPrometheusMetrics(port); + System.out.println("got metrics: " + metrics); + + List metricsList = + findMetric( + "langstream_gateways_http_requests_seconds_count", + metrics); + assertEquals(2, metricsList.size()); + for (ApiGatewayTestUtil.ParsedMetric parsedMetric : metricsList) { + assertEquals( + "langstream_gateways_http_requests_seconds_count", + parsedMetric.name()); + assertEquals("tenant1", parsedMetric.labels().get("tenant")); + assertEquals( + "application1", parsedMetric.labels().get("application")); + assertEquals( + "200", parsedMetric.labels().get("response_status_code")); + if (parsedMetric.labels().get("gateway").equals("svc")) { + assertEquals("5.0", parsedMetric.value()); + } else { + assertEquals("svc-value", parsedMetric.labels().get("gateway")); + assertEquals("1.0", parsedMetric.value()); + } + } + assertEquals( + 2, + findMetric( + "langstream_gateways_http_requests_seconds_sum", + metrics) + .size()); + assertEquals( + 2, + findMetric( + "langstream_gateways_http_requests_seconds_max", + metrics) + .size()); + }); } @Test @@ -842,6 +886,27 @@ void testServiceWithError() throws Exception { assertEquals( "{\"type\":\"about:blank\",\"title\":\"Internal Server Error\",\"status\":500,\"detail\":\"the agent failed!\",\"instance\":\"/api/gateways/service/tenant1/application1/svc\"}", response.body()); + Awaitility.await() + .untilAsserted( + () -> { + String metrics = getPrometheusMetrics(port); + System.out.println(metrics); + + List metricsList = + findMetric( + "langstream_gateways_http_requests_seconds_count", + metrics); + assertEquals(1, metricsList.size()); + ApiGatewayTestUtil.ParsedMetric parsedMetric = metricsList.get(0); + assertEquals( + "langstream_gateways_http_requests_seconds_count", + parsedMetric.name()); + assertEquals("tenant1", parsedMetric.labels().get("tenant")); + assertEquals("application1", parsedMetric.labels().get("application")); + assertEquals("500", parsedMetric.labels().get("response_status_code")); + assertEquals("svc", parsedMetric.labels().get("gateway")); + assertEquals("1.0", parsedMetric.value()); + }); } private void startTopicExchange( diff --git a/langstream-runtime/langstream-runtime-tester/pom.xml b/langstream-runtime/langstream-runtime-tester/pom.xml index a0f6e84bd..1597e8700 100644 --- a/langstream-runtime/langstream-runtime-tester/pom.xml +++ b/langstream-runtime/langstream-runtime-tester/pom.xml @@ -183,7 +183,7 @@ https://downloads.apache.org - kafka/3.5.2/kafka_2.13-3.5.2.tgz + kafka/3.6.2/kafka_2.13-3.6.2.tgz ${project.build.directory}/kafka true diff --git a/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile b/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile index bcc5d52e7..1c71a04d2 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile +++ b/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile @@ -22,7 +22,7 @@ USER 0 ADD maven/minio /minio/minio ADD maven/herddb-services-0.28.0.zip /herddb/herddb-services-0.28.0.zip -COPY maven/kafka_2.13-3.5.2.tgz /kafka/kafka.tgz +COPY maven/kafka_2.13-3.6.2.tgz /kafka/kafka.tgz COPY maven/apache-pulsar-3.2.3-bin.tar.gz /pulsar/pulsar.tar.gz