From 5df5c42d21673289f5e8d1892d14cbaa4f8548b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 21 Aug 2024 14:07:00 +0200 Subject: [PATCH] fix: improve logging for cascading errors in agents (#125) --- .../langstream/agents/camel/CamelSource.java | 9 +++- .../agents/grpc/AbstractGrpcAgent.java | 14 ++++-- .../agents/grpc/PythonGrpcAgentProcessor.java | 2 +- .../agents/grpc/PythonGrpcAgentService.java | 2 +- .../agents/grpc/PythonGrpcAgentSink.java | 2 +- .../agents/grpc/PythonGrpcAgentSource.java | 2 +- .../agents/grpc/PythonGrpcServer.java | 10 +++- .../agents/http/HttpRequestAgent.java | 2 +- .../agents/http/LangServeInvokeAgent.java | 2 +- .../agents/pulsardlq/PulsarDLQSource.java | 48 ++++++++++++------- .../ai/langstream/agents/s3/S3Processor.java | 8 +++- .../ai/langstream/agents/s3/S3Source.java | 8 +++- .../agents/webcrawler/WebCrawlerSource.java | 8 +++- .../provider/StorageProviderSource.java | 8 +++- .../langstream/agents/flow/DispatchAgent.java | 2 +- .../langstream/agents/flow/TimerSource.java | 2 +- .../agents/flow/TriggerEventProcessor.java | 2 +- .../GoogleCloudStorageSource.java | 8 +++- .../google/drive/GoogleDriveSource.java | 8 +++- .../ai/agents/GenAIToolKitAgent.java | 8 +++- .../ai/agents/flare/FlareControllerAgent.java | 2 +- .../agents/vector/QueryVectorDBAgent.java | 8 +++- .../agents/vector/VectorDBSinkAgent.java | 8 +++- ...thenticationProviderConfigurationTest.java | 22 ++++----- .../metrics/ApiGatewayMetricsProvider.java | 15 ++++++ .../api/runner/code/AbstractAgentCode.java | 8 +++- .../langstream/api/runner/code/AgentCode.java | 2 +- .../api/runner/code/AgentCodeAndLoader.java | 16 +++---- .../agents/PulsarDLQSourceAgentProvider.java | 35 +++++++++----- .../kafkaconnect/KafkaConnectSourceAgent.java | 8 +++- .../langstream/runtime/agent/AgentRunner.java | 26 ++++++++-- .../agent/CompositeAgentProcessor.java | 8 +++- .../runtime/agent/TopicConsumerSource.java | 8 +++- .../runtime/agent/TopicProducerSink.java | 2 +- .../MockProcessorAgentsCodeProvider.java | 6 ++- 35 files changed, 227 insertions(+), 102 deletions(-) diff --git a/langstream-agents/langstream-agent-camel/src/main/java/ai/langstream/agents/camel/CamelSource.java b/langstream-agents/langstream-agent-camel/src/main/java/ai/langstream/agents/camel/CamelSource.java index 6469b6630..1e9b13ac3 100644 --- a/langstream-agents/langstream-agent-camel/src/main/java/ai/langstream/agents/camel/CamelSource.java +++ b/langstream-agents/langstream-agent-camel/src/main/java/ai/langstream/agents/camel/CamelSource.java @@ -20,6 +20,7 @@ import ai.langstream.api.util.ConfigurationUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; @@ -205,10 +206,14 @@ public void configure() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (camelContext != null) { - camelContext.close(); + try { + camelContext.close(); + } catch (IOException e) { + log.error("Error closing Camel context", e); + } } } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java index 2d1e50382..6fd4315a0 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java @@ -252,10 +252,18 @@ public void stopChannel(boolean wait) throws Exception { } @Override - public synchronized void close() throws Exception { + public synchronized void close() { super.close(); - stopBeforeRestart(); - stopChannel(true); + try { + stopBeforeRestart(); + } catch (Exception e) { + log.error("Error while stopping", e); + } + try { + stopChannel(true); + } catch (Exception e) { + log.error("Error while stopping", e); + } for (TopicProducer topicProducer : topicProducers.values()) { topicProducer.close(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java index f6c91d2f1..94ca08cd1 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java @@ -47,7 +47,7 @@ public void start() throws Exception { } @Override - public synchronized void close() throws Exception { + public synchronized void close() { super.close(); if (server != null) { server.close(true); diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentService.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentService.java index d479471a6..1f031e09c 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentService.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentService.java @@ -45,7 +45,7 @@ public void start() throws Exception { } @Override - public synchronized void close() throws Exception { + public synchronized void close() { if (server != null) server.close(false); super.close(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java index e222403a6..205d222b3 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java @@ -45,7 +45,7 @@ public void start() throws Exception { } @Override - public synchronized void close() throws Exception { + public synchronized void close() { super.close(); if (server != null) { server.close(true); diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java index 2757e13aa..4f6f6e246 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java @@ -45,7 +45,7 @@ public void start() throws Exception { } @Override - public synchronized void close() throws Exception { + public synchronized void close() { super.close(); if (server != null) { server.close(true); diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java index a9228d87f..e55c76eea 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java @@ -139,10 +139,16 @@ private AgentContextConfiguration computeAgentContextConfiguration() { return agentContextConfiguration; } - public void close(boolean ignoreErrors) throws Exception { + public void close(boolean ignoreErrors) { if (pythonProcess != null) { pythonProcess.destroy(); - int exitCode = pythonProcess.waitFor(); + int exitCode; + try { + exitCode = pythonProcess.waitFor(); + } catch (InterruptedException e) { + log.info("Interrupted while waiting for python process to exit", e); + throw new RuntimeException(e); + } log.info("Python process exited with code {}", exitCode); if (!ignoreErrors) { diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/HttpRequestAgent.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/HttpRequestAgent.java index 519eae0aa..239cf7aff 100644 --- a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/HttpRequestAgent.java +++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/HttpRequestAgent.java @@ -226,7 +226,7 @@ private static String encodeParam(String key, String value) { } @Override - public void close() throws Exception { + public void close() { super.close(); if (executor != null) { executor.shutdownNow(); diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/LangServeInvokeAgent.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/LangServeInvokeAgent.java index 68a3fd8f8..d090f5b36 100644 --- a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/LangServeInvokeAgent.java +++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/http/LangServeInvokeAgent.java @@ -227,7 +227,7 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (topicProducer != null) { topicProducer.close(); diff --git a/langstream-agents/langstream-agent-pulsardlq/src/main/java/ai/langstream/agents/pulsardlq/PulsarDLQSource.java b/langstream-agents/langstream-agent-pulsardlq/src/main/java/ai/langstream/agents/pulsardlq/PulsarDLQSource.java index 18587127c..3f5bb8361 100644 --- a/langstream-agents/langstream-agent-pulsardlq/src/main/java/ai/langstream/agents/pulsardlq/PulsarDLQSource.java +++ b/langstream-agents/langstream-agent-pulsardlq/src/main/java/ai/langstream/agents/pulsardlq/PulsarDLQSource.java @@ -25,10 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.*; @Slf4j public class PulsarDLQSource extends AbstractAgentCode implements AgentSource { @@ -40,6 +37,7 @@ public class PulsarDLQSource extends AbstractAgentCode implements AgentSource { private Consumer dlqTopicsConsumer; private boolean includePartitioned; private int timeoutMs; + private int autoDiscoveryPeriodSeconds; private static class PulsarRecord implements Record { private final Message message; @@ -106,11 +104,9 @@ public void init(Map configuration) throws Exception { includePartitioned = ConfigurationUtils.getBoolean("include-partitioned", false, configuration); timeoutMs = ConfigurationUtils.getInt("timeout-ms", 0, configuration); - log.info("Initializing PulsarDLQSource with pulsarUrl: {}", pulsarUrl); - log.info("Namespace: {}", namespace); - log.info("Subscription: {}", subscription); - log.info("DLQ Suffix: {}", dlqSuffix); - log.info("Include Partitioned: {}", includePartitioned); + autoDiscoveryPeriodSeconds = + ConfigurationUtils.getInt( + "pattern-auto-discovery-period-seconds", 60, configuration); } @Override @@ -128,22 +124,38 @@ public void start() throws Exception { Pattern dlqTopicsInNamespace = Pattern.compile(patternString); - dlqTopicsConsumer = - pulsarClient - .newConsumer() - .topicsPattern(dlqTopicsInNamespace) - .subscriptionName(subscription) - .subscribe(); + try { + dlqTopicsConsumer = + pulsarClient + .newConsumer() + .consumerName("dlq-source") + .patternAutoDiscoveryPeriod( + autoDiscoveryPeriodSeconds, TimeUnit.SECONDS) + .topicsPattern(dlqTopicsInNamespace) + .subscriptionName(subscription) + .subscribe(); + } catch (PulsarClientException pulsarClientException) { + log.error("Error creating consumer", pulsarClientException); + throw pulsarClientException; + } } @Override - public void close() throws Exception { + public void close() { super.close(); if (dlqTopicsConsumer != null) { - dlqTopicsConsumer.close(); + try { + dlqTopicsConsumer.close(); + } catch (PulsarClientException pulsarClientException) { + log.error("Error closing consumer", pulsarClientException); + } } if (pulsarClient != null) { - pulsarClient.close(); + try { + pulsarClient.close(); + } catch (PulsarClientException pulsarClientException) { + log.error("Error closing client", pulsarClientException); + } } } diff --git a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java index 0700d7387..47eac72d5 100644 --- a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java +++ b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java @@ -201,10 +201,14 @@ public String valueAsString() { } @Override - public void close() throws Exception { + public void close() { super.close(); if (minioClient != null) { - minioClient.close(); + try { + minioClient.close(); + } catch (Exception e) { + log.error("Error closing minioClient", e); + } } } } diff --git a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java index eef4f30b5..8dfd015df 100644 --- a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java +++ b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java @@ -262,10 +262,14 @@ static boolean isExtensionAllowed(String name, Set extensions) { } @Override - public void close() throws Exception { + public void close() { super.close(); if (minioClient != null) { - minioClient.close(); + try { + minioClient.close(); + } catch (Exception e) { + log.error("Error closing minioClient", e); + } } } } diff --git a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java index 51aa12977..677e7dae3 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java +++ b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java @@ -598,7 +598,7 @@ public void onSignal(Record record) throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (deletedDocumentsProducer != null) { deletedDocumentsProducer.close(); @@ -607,7 +607,11 @@ public void close() throws Exception { sourceActivitySummaryProducer.close(); } if (stateStorage != null) { - stateStorage.close(); + try { + stateStorage.close(); + } catch (Exception e) { + log.error("Error closing state storage", e); + } } } diff --git a/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java b/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java index d627ab21f..b168e11cb 100644 --- a/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java +++ b/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java @@ -444,7 +444,7 @@ public void onSignal(Record record) throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (deletedObjectsProducer != null) { deletedObjectsProducer.close(); @@ -453,7 +453,11 @@ public void close() throws Exception { sourceActivitySummaryProducer.close(); } if (stateStorage != null) { - stateStorage.close(); + try { + stateStorage.close(); + } catch (Exception e) { + log.error("Error closing state storage", e); + } } } diff --git a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/DispatchAgent.java b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/DispatchAgent.java index 55c911a57..9f6970c4c 100644 --- a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/DispatchAgent.java +++ b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/DispatchAgent.java @@ -163,7 +163,7 @@ public void processRecord(Record record, RecordSink recordSink) { } @Override - public void close() throws Exception { + public void close() { super.close(); producers.forEach( (destination, producer) -> { diff --git a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TimerSource.java b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TimerSource.java index aecaf1c2d..12b7ca828 100644 --- a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TimerSource.java +++ b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TimerSource.java @@ -121,7 +121,7 @@ public void commit(List records) throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (executorService != null) { executorService.shutdown(); diff --git a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TriggerEventProcessor.java b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TriggerEventProcessor.java index 3329c8b34..c5776d607 100644 --- a/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TriggerEventProcessor.java +++ b/langstream-agents/langstream-agents-flow-control/src/main/java/ai/langstream/agents/flow/TriggerEventProcessor.java @@ -147,7 +147,7 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (topicProducer != null) { topicProducer.close(); diff --git a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java index ef309fdda..31626a33a 100644 --- a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java +++ b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java @@ -258,13 +258,17 @@ static boolean isExtensionAllowed(String name, Set extensions) { } @Override - public void close() throws Exception { + public void close() { super.close(); if (credentials != null) { credentials.close(); } if (gcsClient != null) { - gcsClient.close(); + try { + gcsClient.close(); + } catch (Exception e) { + log.error("Error closing GCS client", e); + } } } } diff --git a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java index 4b0650452..b19c9612d 100644 --- a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java +++ b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java @@ -375,10 +375,14 @@ public boolean isStateStorageRequired() { } @Override - public void close() throws Exception { + public void close() { super.close(); if (client != null) { - client.close(); + try { + client.close(); + } catch (Exception e) { + log.error("Error closing client", e); + } } } } diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java index e3080b632..5d1a3e87a 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java @@ -162,13 +162,17 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (dataSource != null) { dataSource.close(); } if (step != null) { - step.getTransformStep().close(); + try { + step.getTransformStep().close(); + } catch (Exception e) { + log.error("Error closing step", e); + } } if (serviceProvider != null) { serviceProvider.close(); diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/flare/FlareControllerAgent.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/flare/FlareControllerAgent.java index 15f936a2b..6722cd9ac 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/flare/FlareControllerAgent.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/flare/FlareControllerAgent.java @@ -104,7 +104,7 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); if (this.loopTopicProducer != null) { this.loopTopicProducer.close(); diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java index 5648d8eae..12bc288dc 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/QueryVectorDBAgent.java @@ -82,10 +82,14 @@ public List processRecord(Record record) throws Exception { public void start() {} @Override - public void close() throws Exception { + public void close() { super.close(); if (queryExecutor != null) { - queryExecutor.close(); + try { + queryExecutor.close(); + } catch (Exception e) { + log.error("Error closing queryExecutor", e); + } } if (dataSource != null) { dataSource.close(); diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/VectorDBSinkAgent.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/VectorDBSinkAgent.java index 93087c2ac..ee7e4d0f2 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/VectorDBSinkAgent.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/VectorDBSinkAgent.java @@ -41,10 +41,14 @@ public void init(Map configuration) throws Exception { public void start() {} @Override - public void close() throws Exception { + public void close() { super.close(); if (writer != null) { - writer.close(); + try { + writer.close(); + } catch (Exception e) { + log.error("Error closing writer", e); + } } } diff --git a/langstream-api-gateway-auth/langstream-jwt-api-gateway-auth/src/test/java/ai/langstream/apigateway/auth/impl/jwt/admin/JwtAuthenticationProviderConfigurationTest.java b/langstream-api-gateway-auth/langstream-jwt-api-gateway-auth/src/test/java/ai/langstream/apigateway/auth/impl/jwt/admin/JwtAuthenticationProviderConfigurationTest.java index bc1eb324c..516a50c03 100644 --- a/langstream-api-gateway-auth/langstream-jwt-api-gateway-auth/src/test/java/ai/langstream/apigateway/auth/impl/jwt/admin/JwtAuthenticationProviderConfigurationTest.java +++ b/langstream-api-gateway-auth/langstream-jwt-api-gateway-auth/src/test/java/ai/langstream/apigateway/auth/impl/jwt/admin/JwtAuthenticationProviderConfigurationTest.java @@ -1,15 +1,13 @@ package ai.langstream.apigateway.auth.impl.jwt.admin; -import com.fasterxml.jackson.core.JsonProcessingException; +import static org.junit.jupiter.api.Assertions.*; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import lombok.SneakyThrows; -import org.junit.jupiter.api.Test; - import java.util.List; import java.util.Map; - -import static org.junit.jupiter.api.Assertions.*; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; class JwtAuthenticationProviderConfigurationTest { @@ -17,7 +15,8 @@ class JwtAuthenticationProviderConfigurationTest { @Test void parseCamelCase() { - final String yaml = """ + final String yaml = + """ adminRoles: - admin - super-admin @@ -28,7 +27,7 @@ void parseCamelCase() { publicKey: --key-- secretKey: --skey-- jwksHostsAllowlist: https://localhost - + """; final JwtAuthenticationProviderConfiguration tokenProperties = parse(yaml); assertEquals(List.of("admin", "super-admin"), tokenProperties.adminRoles()); @@ -43,7 +42,8 @@ void parseCamelCase() { @Test void parseKebabCase() { - final String yaml = """ + final String yaml = + """ admin-roles: - admin - super-admin @@ -54,7 +54,7 @@ void parseKebabCase() { public-key: --key-- secret-key: --skey-- jwks-hosts-allowlist: https://localhost - + """; final JwtAuthenticationProviderConfiguration tokenProperties = parse(yaml); assertEquals(List.of("admin", "super-admin"), tokenProperties.adminRoles()); @@ -74,4 +74,4 @@ private static JwtAuthenticationProviderConfiguration parse(String yaml) { yamlConfigReader.convertValue(map, JwtAuthenticationProviderConfiguration.class); return tokenProperties; } -} \ No newline at end of file +} diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java index 2c6d2b0cd..9aead723b 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/metrics/ApiGatewayMetricsProvider.java @@ -1,3 +1,18 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ai.langstream.apigateway.metrics; import io.micrometer.core.instrument.Metrics; diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AbstractAgentCode.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AbstractAgentCode.java index 4305503b7..e485666c2 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AbstractAgentCode.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AbstractAgentCode.java @@ -159,13 +159,17 @@ protected AgentCodeRegistry getAgentCodeRegistry() { } @Override - public void close() throws Exception { + public void close() { closed = true; if (signalsExecutor != null) { signalsExecutor.shutdown(); } if (signalsConsumer != null) { - signalsConsumer.close(); + try { + signalsConsumer.close(); + } catch (Exception exception) { + log.error("Error closing signals consumer", exception); + } } } } diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java index 9cc687019..d257aa140 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java @@ -62,7 +62,7 @@ default void cleanup(Map configuration, AgentContext context) throws Exception {} @Override - default void close() throws Exception {} + default void close() {} /** * Return information about the agent. This is a List because an Agent can be the composition of diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java index 5a0b682f0..63ab76be3 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java @@ -134,8 +134,8 @@ public void start() throws Exception { } @Override - public void close() throws Exception { - executeWithContextClassloader(AgentCode::close); + public void close() { + executeNoExceptionWithContextClassloader(AgentCode::close); } @Override @@ -197,8 +197,8 @@ public void start() throws Exception { } @Override - public void close() throws Exception { - executeWithContextClassloader(AgentCode::close); + public void close() { + executeNoExceptionWithContextClassloader(AgentCode::close); } @Override @@ -259,8 +259,8 @@ public void start() throws Exception { } @Override - public void close() throws Exception { - executeWithContextClassloader(AgentCode::close); + public void close() { + executeNoExceptionWithContextClassloader(AgentCode::close); } @Override @@ -321,8 +321,8 @@ public void join() throws Exception { } @Override - public void close() throws Exception { - executeWithContextClassloader(AgentCode::close); + public void close() { + executeNoExceptionWithContextClassloader(AgentCode::close); } @Override diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/PulsarDLQSourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/PulsarDLQSourceAgentProvider.java index 542a91028..1a97353d8 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/PulsarDLQSourceAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/PulsarDLQSourceAgentProvider.java @@ -56,8 +56,8 @@ public static class PulsarDLQSourceConfiguration { @ConfigProperty( description = """ - The URL of the Pulsar cluster to connect to. - """, + The URL of the Pulsar cluster to connect to. + """, defaultValue = "pulsar://localhost:6650", required = false) @JsonProperty("pulsar-url") @@ -66,8 +66,8 @@ public static class PulsarDLQSourceConfiguration { @ConfigProperty( description = """ - Namespace to listen for DLQ topics. - """, + Namespace to listen for DLQ topics. + """, defaultValue = "public/default", required = false) @JsonProperty("namespace") @@ -76,8 +76,8 @@ public static class PulsarDLQSourceConfiguration { @ConfigProperty( description = """ - Subscription name to use for the DLQ topics. - """, + Subscription name to use for the DLQ topics. + """, defaultValue = "langstream-dlq-subscription", required = false) @JsonProperty("subscription") @@ -86,8 +86,8 @@ public static class PulsarDLQSourceConfiguration { @ConfigProperty( description = """ - Suffix to use for DLQ topics. - """, + Suffix to use for DLQ topics. + """, defaultValue = "-DLQ", required = false) @JsonProperty("dlq-suffix") @@ -96,8 +96,8 @@ public static class PulsarDLQSourceConfiguration { @ConfigProperty( description = """ - Include partitioned topics in the DLQ topics. - """, + Include partitioned topics in the DLQ topics. + """, defaultValue = "false") @JsonProperty("include-partitioned") private boolean includePartitioned; @@ -105,11 +105,20 @@ public static class PulsarDLQSourceConfiguration { @ConfigProperty( description = """ - Timeout in milliseconds to wait for messages from the DLQ topics. - Default is 0, meaning it will wait indefinitely. - """, + Timeout in milliseconds to wait for messages from the DLQ topics. + Default is 0, meaning it will wait indefinitely. + """, defaultValue = "0") @JsonProperty("timeout-ms") private int timeoutMs; + + @ConfigProperty( + description = + """ + Pattern auto discovery period in seconds. + """, + defaultValue = "60") + @JsonProperty("pattern-auto-discovery-period-seconds") + private int autoDiscoveryPeriodSeconds; } } diff --git a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/kafkaconnect/KafkaConnectSourceAgent.java b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/kafkaconnect/KafkaConnectSourceAgent.java index 61ec6cae3..38f1523fd 100644 --- a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/kafkaconnect/KafkaConnectSourceAgent.java +++ b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/kafkaconnect/KafkaConnectSourceAgent.java @@ -347,7 +347,7 @@ public OffsetStorageReader offsetStorageReader() { } @Override - public void close() throws Exception { + public void close() { super.close(); if (sourceTask != null) { sourceTask.stop(); @@ -367,7 +367,11 @@ public void close() throws Exception { } if (topicConsumerFromOffsetStore != null) { - topicConsumerFromOffsetStore.close(); + try { + topicConsumerFromOffsetStore.close(); + } catch (Exception e) { + log.error("Error closing consumer", e); + } } if (topicProducerToOffsetStore != null) { diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java index e6e97e96d..2b657a375 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java @@ -440,7 +440,11 @@ public TopicProducer createProducer( } finally { if (mainProcessor != null) { - mainProcessor.close(); + try { + mainProcessor.close(); + } catch (Throwable tt) { + log.error("Error closing the processor", tt); + } } if (beforeStopSource != null) { @@ -452,15 +456,27 @@ public TopicProducer createProducer( } if (source != null) { - source.close(); + try { + source.close(); + } catch (Throwable tt) { + log.error("Error closing the source", tt); + } } if (sink != null) { - sink.close(); + try { + sink.close(); + } catch (Throwable tt) { + log.error("Error closing the sink", tt); + } } if (mainService != null) { - mainService.close(); + try { + mainService.close(); + } catch (Throwable tt) { + log.error("Error closing the main service", tt); + } } log.info("Agent fully stopped"); @@ -517,7 +533,7 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { wrapped.close(); } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java index 1604cb287..5bcc0e571 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java @@ -114,10 +114,14 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); for (AgentProcessor agent : processors) { - agent.close(); + try { + agent.close(); + } catch (Exception e) { + log.error("Error closing agent: {}", agent.agentId(), e); + } } } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java index e15bacfac..818b300f5 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java @@ -114,10 +114,14 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); log.info("Closing consumer {}", consumer); - consumer.close(); + try { + consumer.close(); + } catch (Exception e) { + log.error("Error closing consumer", e); + } deadLetterQueueProducer.close(); } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicProducerSink.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicProducerSink.java index a5a4566b1..a14714bbb 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicProducerSink.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicProducerSink.java @@ -41,7 +41,7 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); producer.close(); } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockProcessorAgentsCodeProvider.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockProcessorAgentsCodeProvider.java index b4efd5bef..3732fac8c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockProcessorAgentsCodeProvider.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockProcessorAgentsCodeProvider.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -138,7 +139,8 @@ public void start() throws Exception { } @Override - public void close() throws Exception { + @SneakyThrows + public void close() { super.close(); if (executorService != null) { executorService.shutdown(); @@ -243,7 +245,7 @@ public void join() throws Exception { } @Override - public void close() throws Exception { + public void close() { super.close(); closeCounter.incrementAndGet(); }