Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
fix: improve logging for cascading errors in agents (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Aug 21, 2024
1 parent 8e52249 commit 5df5c42
Show file tree
Hide file tree
Showing 35 changed files with 227 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ai.langstream.api.util.ConfigurationUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -205,10 +206,14 @@ public void configure() throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (camelContext != null) {
camelContext.close();
try {
camelContext.close();
} catch (IOException e) {
log.error("Error closing Camel context", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,18 @@ public void stopChannel(boolean wait) throws Exception {
}

@Override
public synchronized void close() throws Exception {
public synchronized void close() {
super.close();
stopBeforeRestart();
stopChannel(true);
try {
stopBeforeRestart();
} catch (Exception e) {
log.error("Error while stopping", e);
}
try {
stopChannel(true);
} catch (Exception e) {
log.error("Error while stopping", e);
}
for (TopicProducer topicProducer : topicProducers.values()) {
topicProducer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void start() throws Exception {
}

@Override
public synchronized void close() throws Exception {
public synchronized void close() {
super.close();
if (server != null) {
server.close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void start() throws Exception {
}

@Override
public synchronized void close() throws Exception {
public synchronized void close() {
if (server != null) server.close(false);
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void start() throws Exception {
}

@Override
public synchronized void close() throws Exception {
public synchronized void close() {
super.close();
if (server != null) {
server.close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void start() throws Exception {
}

@Override
public synchronized void close() throws Exception {
public synchronized void close() {
super.close();
if (server != null) {
server.close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,16 @@ private AgentContextConfiguration computeAgentContextConfiguration() {
return agentContextConfiguration;
}

public void close(boolean ignoreErrors) throws Exception {
public void close(boolean ignoreErrors) {
if (pythonProcess != null) {
pythonProcess.destroy();
int exitCode = pythonProcess.waitFor();
int exitCode;
try {
exitCode = pythonProcess.waitFor();
} catch (InterruptedException e) {
log.info("Interrupted while waiting for python process to exit", e);
throw new RuntimeException(e);
}
log.info("Python process exited with code {}", exitCode);

if (!ignoreErrors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private static String encodeParam(String key, String value) {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (executor != null) {
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void start() throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (topicProducer != null) {
topicProducer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.*;

@Slf4j
public class PulsarDLQSource extends AbstractAgentCode implements AgentSource {
Expand All @@ -40,6 +37,7 @@ public class PulsarDLQSource extends AbstractAgentCode implements AgentSource {
private Consumer<byte[]> dlqTopicsConsumer;
private boolean includePartitioned;
private int timeoutMs;
private int autoDiscoveryPeriodSeconds;

private static class PulsarRecord implements Record {
private final Message<byte[]> message;
Expand Down Expand Up @@ -106,11 +104,9 @@ public void init(Map<String, Object> configuration) throws Exception {
includePartitioned =
ConfigurationUtils.getBoolean("include-partitioned", false, configuration);
timeoutMs = ConfigurationUtils.getInt("timeout-ms", 0, configuration);
log.info("Initializing PulsarDLQSource with pulsarUrl: {}", pulsarUrl);
log.info("Namespace: {}", namespace);
log.info("Subscription: {}", subscription);
log.info("DLQ Suffix: {}", dlqSuffix);
log.info("Include Partitioned: {}", includePartitioned);
autoDiscoveryPeriodSeconds =
ConfigurationUtils.getInt(
"pattern-auto-discovery-period-seconds", 60, configuration);
}

@Override
Expand All @@ -128,22 +124,38 @@ public void start() throws Exception {

Pattern dlqTopicsInNamespace = Pattern.compile(patternString);

dlqTopicsConsumer =
pulsarClient
.newConsumer()
.topicsPattern(dlqTopicsInNamespace)
.subscriptionName(subscription)
.subscribe();
try {
dlqTopicsConsumer =
pulsarClient
.newConsumer()
.consumerName("dlq-source")
.patternAutoDiscoveryPeriod(
autoDiscoveryPeriodSeconds, TimeUnit.SECONDS)
.topicsPattern(dlqTopicsInNamespace)
.subscriptionName(subscription)
.subscribe();
} catch (PulsarClientException pulsarClientException) {
log.error("Error creating consumer", pulsarClientException);
throw pulsarClientException;
}
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (dlqTopicsConsumer != null) {
dlqTopicsConsumer.close();
try {
dlqTopicsConsumer.close();
} catch (PulsarClientException pulsarClientException) {
log.error("Error closing consumer", pulsarClientException);
}
}
if (pulsarClient != null) {
pulsarClient.close();
try {
pulsarClient.close();
} catch (PulsarClientException pulsarClientException) {
log.error("Error closing client", pulsarClientException);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,14 @@ public String valueAsString() {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (minioClient != null) {
minioClient.close();
try {
minioClient.close();
} catch (Exception e) {
log.error("Error closing minioClient", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,14 @@ static boolean isExtensionAllowed(String name, Set<String> extensions) {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (minioClient != null) {
minioClient.close();
try {
minioClient.close();
} catch (Exception e) {
log.error("Error closing minioClient", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public void onSignal(Record record) throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (deletedDocumentsProducer != null) {
deletedDocumentsProducer.close();
Expand All @@ -607,7 +607,11 @@ public void close() throws Exception {
sourceActivitySummaryProducer.close();
}
if (stateStorage != null) {
stateStorage.close();
try {
stateStorage.close();
} catch (Exception e) {
log.error("Error closing state storage", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public void onSignal(Record record) throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (deletedObjectsProducer != null) {
deletedObjectsProducer.close();
Expand All @@ -453,7 +453,11 @@ public void close() throws Exception {
sourceActivitySummaryProducer.close();
}
if (stateStorage != null) {
stateStorage.close();
try {
stateStorage.close();
} catch (Exception e) {
log.error("Error closing state storage", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void processRecord(Record record, RecordSink recordSink) {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
producers.forEach(
(destination, producer) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void commit(List<Record> records) throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (executorService != null) {
executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void start() throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (topicProducer != null) {
topicProducer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,17 @@ static boolean isExtensionAllowed(String name, Set<String> extensions) {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (credentials != null) {
credentials.close();
}
if (gcsClient != null) {
gcsClient.close();
try {
gcsClient.close();
} catch (Exception e) {
log.error("Error closing GCS client", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,14 @@ public boolean isStateStorageRequired() {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (client != null) {
client.close();
try {
client.close();
} catch (Exception e) {
log.error("Error closing client", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,17 @@ public void start() throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (dataSource != null) {
dataSource.close();
}
if (step != null) {
step.getTransformStep().close();
try {
step.getTransformStep().close();
} catch (Exception e) {
log.error("Error closing step", e);
}
}
if (serviceProvider != null) {
serviceProvider.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void start() throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (this.loopTopicProducer != null) {
this.loopTopicProducer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ public List<Record> processRecord(Record record) throws Exception {
public void start() {}

@Override
public void close() throws Exception {
public void close() {
super.close();
if (queryExecutor != null) {
queryExecutor.close();
try {
queryExecutor.close();
} catch (Exception e) {
log.error("Error closing queryExecutor", e);
}
}
if (dataSource != null) {
dataSource.close();
Expand Down
Loading

0 comments on commit 5df5c42

Please sign in to comment.