Skip to content

Commit

Permalink
Implement ErrorClassification API for program runs
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Dec 19, 2024
1 parent 3d1422a commit 4e18685
Show file tree
Hide file tree
Showing 12 changed files with 647 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,7 @@ public static final class Logging {
public static final String TAG_ERROR_CATEGORY = "errorCategory";
public static final String TAG_ERROR_REASON = "errorReason";
public static final String TAG_ERROR_TYPE = "errorType";
public static final String TAG_DEPENDENCY = "dependency";
public static final String TAG_ERROR_CODE_TYPE = "errorCodeType";
public static final String TAG_ERROR_CODE = "errorCode";
public static final String TAG_SUPPORTED_DOC_URL = "supportDocUrl";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ private RouteDestination getV3RoutingService(String[] uriParts, AllowedMethod re
|| beginsWith(uriParts, "v3", "credentials")) {
return APP_FABRIC_HTTP;
} else if ((uriParts.length >= 8 && uriParts[7].equals("logs"))
|| (uriParts.length >= 10 && uriParts[9].equals("logs"))
|| (uriParts.length >= 10 && (uriParts[9].equals("logs") || uriParts[9].equals("classify")))
|| (uriParts.length >= 6 && uriParts[5].equals("logs"))) {
//Log Handler Paths:
// /v3/namespaces/<namespaceid>/apps/<appid>/<programid-type>/<programid>/logs
// /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/logs
// /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/classify
return LOG_QUERY;
} else if (uriParts.length >= 2 && uriParts[1].equals("metrics")) {
//Metrics Search Handler Path /v3/metrics
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.proto;

/**
* Represents the response for classifying error logs.
*/
public class ErrorClassificationResponse {
private final String stageName;
private final String errorCategory;
private final String errorReason;
private final String errorMessage;
private final String errorType;
private final String dependency;
private final String errorCodeType;
private final String errorCode;
private final String supportedDocumentationUrl;

private ErrorClassificationResponse(String stageName, String errorCategory, String errorReason,
String errorMessage, String errorType, String dependency, String errorCodeType,
String errorCode, String supportedDocumentationUrl) {
this.stageName = stageName;
this.errorCategory = errorCategory;
this.errorReason = errorReason;
this.errorMessage = errorMessage;
this.errorType = errorType;
this.dependency = dependency;
this.errorCodeType = errorCodeType;
this.errorCode = errorCode;
this.supportedDocumentationUrl = supportedDocumentationUrl;
}

/**
* Gets the stage name for ErrorClassificationResponse.
*/
public String getStageName() {
return stageName;
}

/**
* Gets the error category for ErrorClassificationResponse.
*/
public String getErrorCategory() {
return errorCategory;
}

/**
* Gets the error reason for ErrorClassificationResponse.
*/
public String getErrorReason() {
return errorReason;
}

/**
* Gets the error message for ErrorClassificationResponse.
*/
public String getErrorMessage() {
return errorMessage;
}

/**
* Gets the error type for ErrorClassificationResponse.
*/
public String getErrorType() {
return errorType;
}

/**
* Gets the dependency flag for ErrorClassificationResponse.
*/
public String getDependency() {
return dependency;
}

/**
* Gets the error code type for ErrorClassificationResponse.
*/
public String getErrorCodeType() {
return errorCodeType;
}

/**
* Gets the error code for ErrorClassificationResponse.
*/
public String getErrorCode() {
return errorCode;
}

/**
* Gets the supported documentation URL for ErrorClassificationResponse.
*/
public String getSupportedDocumentationUrl() {
return supportedDocumentationUrl;
}

/**
* Builder for {@link ErrorClassificationResponse}.
*/
public static class Builder {
private String stageName;
private String errorCategory;
private String errorReason;
private String errorMessage;
private String errorType;
private String dependency;
private String errorCodeType;
private String errorCode;
private String supportedDocumentationUrl;

/**
* Sets the stage name for ErrorClassificationResponse.
*/
public Builder setStageName(String stageName) {
this.stageName = stageName;
return this;
}

/**
* Sets the error category for ErrorClassificationResponse.
*/
public Builder setErrorCategory(String errorCategory) {
this.errorCategory = errorCategory;
return this;
}

/**
* Sets the error reason for ErrorClassificationResponse.
*/
public Builder setErrorReason(String errorReason) {
this.errorReason = errorReason;
return this;
}

/**
* Sets the error message for ErrorClassificationResponse.
*/
public Builder setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
return this;
}

/**
* Sets the error type for ErrorClassificationResponse.
*/
public Builder setErrorType(String errorType) {
this.errorType = errorType;
return this;
}

/**
* Sets the dependency flag for ErrorClassificationResponse.
*/
public Builder setDependency(String dependency) {
this.dependency = dependency;
return this;
}

/**
* Sets the error code type for ErrorClassificationResponse.
*/
public Builder setErrorCodeType(String errorCodeType) {
this.errorCodeType = errorCodeType;
return this;
}

/**
* Sets the error code for ErrorClassificationResponse.
*/
public Builder setErrorCode(String errorCode) {
this.errorCode = errorCode;
return this;
}

/**
* Sets the supported documentation URL for ErrorClassificationResponse.
*/
public Builder setSupportedDocumentationUrl(String supportedDocumentationUrl) {
this.supportedDocumentationUrl = supportedDocumentationUrl;
return this;
}

/**
* Builds and returns a new instance of ErrorClassificationResponse.
*/
public ErrorClassificationResponse build() {
return new ErrorClassificationResponse(stageName, errorCategory, errorReason, errorMessage,
errorType, dependency, errorCodeType, errorCode, supportedDocumentationUrl);
}
}
}
5 changes: 5 additions & 0 deletions cdap-watchdog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import com.google.gson.Gson;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.common.conf.Constants.Logging;
import io.cdap.cdap.logging.read.LogEvent;
import io.cdap.cdap.proto.ErrorClassificationResponse;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.common.Strings;

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
* TODO -
* - Add rule based classification.
* - Handle cases when stage name is not present in the mdc.
*/
public class ErrorLogsClassifier {
private static final Gson GSON = new Gson();

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
*
* @param logIter Logs Iterator that can be closed.
* @param responder The HttpResponder.
*/
public void classify(CloseableIterator<LogEvent> logIter, HttpResponder responder) {
Map<String, ErrorClassificationResponse> responseMap = new HashMap<>();
while(logIter.hasNext()) {
ILoggingEvent logEvent = logIter.next().getLoggingEvent();
Map<String, String> mdc = logEvent.getMDCPropertyMap();
if (!mdc.containsKey(Logging.TAG_FAILED_STAGE)) {
continue;
}
String stageName = mdc.get(Logging.TAG_FAILED_STAGE);
String errorMessage = null;
if (responseMap.containsKey(stageName)) {
continue;
}
IThrowableProxy throwableProxy = logEvent.getThrowableProxy();
while (throwableProxy != null) {
if (ProgramFailureException.class.getName().equals(throwableProxy.getClassName())) {
errorMessage = throwableProxy.getMessage();
}
throwableProxy = throwableProxy.getCause();
}
if (!Strings.isNullOrEmpty(errorMessage)) {
ErrorClassificationResponse classificationResponse =
new ErrorClassificationResponse.Builder()
.setStageName(stageName)
.setErrorCategory(String.format("%s-'%s'",mdc.get(Logging.TAG_ERROR_CATEGORY),
stageName))
.setErrorReason(mdc.get(Logging.TAG_ERROR_REASON))
.setErrorMessage(errorMessage)
.setErrorType(mdc.get(Logging.TAG_ERROR_TYPE))
.setDependency(mdc.get(Logging.TAG_DEPENDENCY))
.setErrorCodeType(mdc.get(Logging.TAG_ERROR_CODE_TYPE))
.setErrorCode(mdc.get(Logging.TAG_ERROR_CODE))
.setSupportedDocumentationUrl(mdc.get(Logging.TAG_SUPPORTED_DOC_URL))
.build();
responseMap.put(stageName, classificationResponse);
}
}
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responseMap.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ private void addErrorClassificationTags(ILoggingEvent event, Map<String, String>
((ProgramFailureException) throwable).getErrorReason());
modifiableMDC.put(Constants.Logging.TAG_ERROR_TYPE,
((ProgramFailureException) throwable).getErrorType().name());
modifiableMDC.put(Constants.Logging.TAG_DEPENDENCY,
String.valueOf(((ProgramFailureException) throwable).isDependency()));
ErrorCodeType errorCodeType = ((ProgramFailureException) throwable).getErrorCodeType();
String errorCode = ((ProgramFailureException) throwable).getErrorCode();
String supportedDocURL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,4 @@ public void handleError(@Nullable Throwable throwable) {
LOG.error("Received error while chunking logs.", throwable);
close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private AbstractChunkedLogProducer getFullLogsProducer(String format,
* If readRange is outside runRecord's range, then the readRange is adjusted to fall within
* runRecords range.
*/
private ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord,
protected ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord,
boolean fromTimeSpecified) {
if (runRecord == null) {
return readRange;
Expand Down
Loading

0 comments on commit 4e18685

Please sign in to comment.