Skip to content

Commit

Permalink
Integrate logs with stackdriver
Browse files Browse the repository at this point in the history
  • Loading branch information
priyanshu-beep committed Dec 31, 2024
1 parent 6657539 commit 3b09406
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,12 @@ public static final class Logging {
public static final String LOG_APPENDER_EXT_DIR = "app.program.log.appender.extensions.dir";
public static final String LOG_APPENDER_PROPERTY_PREFIX = "app.program.log.appender.system.properties.";

// Log publisher configs.
public static final String LOG_PUBLISHER_PROVIDER = "app.program.log.publisher.provider";
public static final String LOG_PUBLISHER_ENABLED = "app.program.log.publisher.enabled";
public static final String LOG_PUBLISHER_EXT_DIR = "app.program.log.publisher.extensions.dir";
public static final String LOG_PUBLISHER_PREFIX = "app.program.log.publisher";

// Property key in the logger context to indicate it is performing pipeline validation
public static final String PIPELINE_VALIDATION = "log.pipeline.validation";

Expand Down
10 changes: 10 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2804,6 +2804,16 @@
</description>
</property>

<property>
<name>app.program.log.publisher.enabled</name>
<value>true</value>
</property>

<property>
<name>app.program.log.publisher.extensions.dir</name>
<value>/opt/cdap/master/ext/log-publisher</value>
</property>

<!-- Metrics Configuration -->

<property>
Expand Down
38 changes: 38 additions & 0 deletions cdap-log-publisher-spi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2024 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>cdap</artifactId>
<groupId>io.cdap.cdap</groupId>
<version>6.11.0-SNAPSHOT</version>
</parent>

<artifactId>cdap-log-publisher-spi</artifactId>
<name>CDAP Log Publisher SPI</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logs;

import ch.qos.logback.classic.spi.ILoggingEvent;
import java.io.Closeable;

/**
* Publishes log events to a destination.
*/
public interface LogPublisher extends Closeable {

/**
* Returns the name of the log publisher.
*
* @return the name of the publisher.
*/
String getName();

/**
* Publishes a logging event.
*
* @param event the logging event to publish.
*/
void publish(ILoggingEvent event);

/**
* Initializes the log publisher with the required context.
*
* @param context the context to initialize the publisher with.
*/
void initialize(LogPublisherContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logs;

import java.util.Map;

/**
* Provides context information for {@link LogPublisher} initialization.
*/
public interface LogPublisherContext {

/**
* Properties are derived from the CDAP configuration. Configuration file path will be added as an
* entry in the properties.
*
* @return unmodifiable properties for the log publisher.
*/
Map<String, String> getProperties();
}
5 changes: 5 additions & 0 deletions cdap-watchdog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-log-publisher-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-watchdog-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logging.appender;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Context;
import com.google.common.annotations.VisibleForTesting;
import io.cdap.cdap.logging.framework.local.LocalLogAppender;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.slf4j.LoggerFactory;

/**
* A log appender that delegates logging events to a list of other log appenders.
*/
public class CompositeLogAppender extends LogAppender {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CompositeLogAppender.class);
private final List<LogAppender> appenders;

/**
* Constructs a CompositeLogAppender with the given list of appenders.
*
* @param appenders the list of {@link LogAppender} instances to delegate logging events to
*/
public CompositeLogAppender(List<LogAppender> appenders) {
if (appenders == null) {
throw new IllegalArgumentException("Appenders list cannot be null");
}

this.appenders = Collections.unmodifiableList(new ArrayList<>(appenders));
setName(getClass().getName());
}

@Override
public void start() {
appenders.forEach(appender -> safelyExecute(
appender::start,
String.format("Failed to start appender: %s", appender.getName())
));
super.start();
}

@Override
public void stop() {
super.stop();
appenders.forEach(appender -> safelyExecute(
appender::stop,
String.format("Failed to stop appender: %s", appender.getName())
));
}

@Override
protected void appendEvent(LogMessage logMessage) {
appenders.forEach(appender -> safelyExecute(
() -> appender.appendEvent(logMessage),
String.format("Failed to append log message to appender: %s. Message: %s",
appender.getName(), logMessage.getFormattedMessage())
));
}

@Override
public void setContext(Context context) {
super.setContext(context);
appenders.forEach(appender -> safelyExecute(
() -> appender.setContext(context),
String.format("Failed to set context to appender: %s", appender.getName())
));
}

@Override
public synchronized void doAppend(ILoggingEvent eventObject) {
if (shouldSkipLogging()) {
return;
}
super.doAppend(eventObject);
}

@VisibleForTesting
boolean shouldSkipLogging() {
// If any of the LogAppender is LocalLogAppender then, ignore logs coming from the log process
// pipeline, otherwise it'll become an infinite loop of logs.
// This won't guard against the case that an appender starts a new thread and emit log per
// event (something like what this class does). If that's the case, the appender itself need to
// guard against it, similar to what's being done in here.
// They are still logged via other log appender (e.g. log to cdap.log), but just not being
// collected via the log collection system.
return appenders.stream()
.filter(LocalLogAppender.class::isInstance)
.map(LocalLogAppender.class::cast)
.anyMatch(localAppender -> localAppender.getPipelineThreads()
.get()
.contains(Thread.currentThread()));
}

private void safelyExecute(Runnable action, String errorMessage) {
try {
action.run();
} catch (Exception e) {
LOG.warn(errorMessage, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logging.appender;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.logs.LogPublisherContext;
import java.util.Collections;
import java.util.Map;

/**
* A context implementation for log publishers that provides configuration properties
* scoped to a specific log publisher provider.
*/
public class DefaultLogPublisherContext implements LogPublisherContext {

private final Map<String, String> properties;

/**
* Constructs a DefaultLogPublisherContext with configuration properties specific to the provider.
*
* @param cConf The configuration object containing the properties.
* @param providerName The name of the log publisher.
*/
public DefaultLogPublisherContext(CConfiguration cConf, String providerName) {
String prefix = String.format("%s.%s.", Constants.Logging.LOG_PUBLISHER_PREFIX, providerName);
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(prefix));
}

@Override
public Map<String, String> getProperties() {
return properties;
}
}
Loading

0 comments on commit 3b09406

Please sign in to comment.