Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21099] Implement ErrorClassification API for program runs #15774

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,7 @@ public static final class Logging {
public static final String TAG_ERROR_CATEGORY = "errorCategory";
public static final String TAG_ERROR_REASON = "errorReason";
public static final String TAG_ERROR_TYPE = "errorType";
public static final String TAG_DEPENDENCY = "dependency";
public static final String TAG_ERROR_CODE_TYPE = "errorCodeType";
public static final String TAG_ERROR_CODE = "errorCode";
public static final String TAG_SUPPORTED_DOC_URL = "supportDocUrl";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ private RouteDestination getV3RoutingService(String[] uriParts, AllowedMethod re
|| beginsWith(uriParts, "v3", "credentials")) {
return APP_FABRIC_HTTP;
} else if ((uriParts.length >= 8 && uriParts[7].equals("logs"))
|| (uriParts.length >= 10 && uriParts[9].equals("logs"))
|| (uriParts.length >= 10 && (uriParts[9].equals("logs") || uriParts[9].equals("classify")))
|| (uriParts.length >= 6 && uriParts[5].equals("logs"))) {
//Log Handler Paths:
// /v3/namespaces/<namespaceid>/apps/<appid>/<programid-type>/<programid>/logs
// /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/logs
// /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/classify
return LOG_QUERY;
} else if (uriParts.length >= 2 && uriParts[1].equals("metrics")) {
//Metrics Search Handler Path /v3/metrics
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.proto;

/**
* Represents the response for classifying error logs.
*/
public class ErrorClassificationResponse {
private final String stageName;
private final String errorCategory;
private final String errorReason;
private final String errorMessage;
private final String errorType;
private final String dependency;
private final String errorCodeType;
itsankit-google marked this conversation as resolved.
Show resolved Hide resolved
private final String errorCode;
private final String supportedDocumentationUrl;

private ErrorClassificationResponse(String stageName, String errorCategory, String errorReason,
String errorMessage, String errorType, String dependency, String errorCodeType,
String errorCode, String supportedDocumentationUrl) {
this.stageName = stageName;
this.errorCategory = errorCategory;
this.errorReason = errorReason;
this.errorMessage = errorMessage;
this.errorType = errorType;
this.dependency = dependency;
this.errorCodeType = errorCodeType;
this.errorCode = errorCode;
this.supportedDocumentationUrl = supportedDocumentationUrl;
}

/**
* Gets the stage name for ErrorClassificationResponse.
*/
public String getStageName() {
return stageName;
}

/**
* Gets the error category for ErrorClassificationResponse.
*/
public String getErrorCategory() {
return errorCategory;
}

/**
* Gets the error reason for ErrorClassificationResponse.
*/
public String getErrorReason() {
return errorReason;
}

/**
* Gets the error message for ErrorClassificationResponse.
*/
public String getErrorMessage() {
return errorMessage;
}

/**
* Gets the error type for ErrorClassificationResponse.
*/
public String getErrorType() {
return errorType;
}

/**
* Gets the dependency flag for ErrorClassificationResponse.
*/
public String getDependency() {
return dependency;
}

/**
* Gets the error code type for ErrorClassificationResponse.
*/
public String getErrorCodeType() {
return errorCodeType;
}

/**
* Gets the error code for ErrorClassificationResponse.
*/
public String getErrorCode() {
return errorCode;
}

/**
* Gets the supported documentation URL for ErrorClassificationResponse.
*/
public String getSupportedDocumentationUrl() {
return supportedDocumentationUrl;
}

/**
* Builder for {@link ErrorClassificationResponse}.
*/
public static class Builder {
private String stageName;
private String errorCategory;
private String errorReason;
private String errorMessage;
private String errorType;
private String dependency;
private String errorCodeType;
private String errorCode;
private String supportedDocumentationUrl;

/**
* Sets the stage name for ErrorClassificationResponse.
*/
public Builder setStageName(String stageName) {
this.stageName = stageName;
return this;
}

/**
* Sets the error category for ErrorClassificationResponse.
*/
public Builder setErrorCategory(String errorCategory) {
this.errorCategory = errorCategory;
return this;
}

/**
* Sets the error reason for ErrorClassificationResponse.
*/
public Builder setErrorReason(String errorReason) {
this.errorReason = errorReason;
return this;
}

/**
* Sets the error message for ErrorClassificationResponse.
*/
public Builder setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
return this;
}

/**
* Sets the error type for ErrorClassificationResponse.
*/
public Builder setErrorType(String errorType) {
this.errorType = errorType;
return this;
}

/**
* Sets the dependency flag for ErrorClassificationResponse.
*/
public Builder setDependency(String dependency) {
this.dependency = dependency;
return this;
}

/**
* Sets the error code type for ErrorClassificationResponse.
*/
public Builder setErrorCodeType(String errorCodeType) {
this.errorCodeType = errorCodeType;
return this;
}

/**
* Sets the error code for ErrorClassificationResponse.
*/
public Builder setErrorCode(String errorCode) {
this.errorCode = errorCode;
return this;
}

/**
* Sets the supported documentation URL for ErrorClassificationResponse.
*/
public Builder setSupportedDocumentationUrl(String supportedDocumentationUrl) {
this.supportedDocumentationUrl = supportedDocumentationUrl;
return this;
}

/**
* Builds and returns a new instance of ErrorClassificationResponse.
*/
public ErrorClassificationResponse build() {
return new ErrorClassificationResponse(stageName, errorCategory, errorReason, errorMessage,
errorType, dependency, errorCodeType, errorCode, supportedDocumentationUrl);
}
}
}
5 changes: 5 additions & 0 deletions cdap-watchdog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import com.google.gson.Gson;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.common.conf.Constants.Logging;
import io.cdap.cdap.logging.read.LogEvent;
import io.cdap.cdap.proto.ErrorClassificationResponse;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.common.Strings;

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
itsankit-google marked this conversation as resolved.
Show resolved Hide resolved
* TODO -
* - Add rule based classification.
* - Handle cases when stage name is not present in the mdc.
*/
public class ErrorLogsClassifier {
private static final Gson GSON = new Gson();

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
*
* @param logIter Logs Iterator that can be closed.
* @param responder The HttpResponder.
*/
public void classify(CloseableIterator<LogEvent> logIter, HttpResponder responder) {
itsankit-google marked this conversation as resolved.
Show resolved Hide resolved
Map<String, ErrorClassificationResponse> responseMap = new HashMap<>();
while(logIter.hasNext()) {
ILoggingEvent logEvent = logIter.next().getLoggingEvent();
Map<String, String> mdc = logEvent.getMDCPropertyMap();
if (!mdc.containsKey(Logging.TAG_FAILED_STAGE)) {
continue;
}
String stageName = mdc.get(Logging.TAG_FAILED_STAGE);
String errorMessage = null;
if (responseMap.containsKey(stageName)) {
continue;
}
IThrowableProxy throwableProxy = logEvent.getThrowableProxy();
while (throwableProxy != null) {
if (ProgramFailureException.class.getName().equals(throwableProxy.getClassName())) {
errorMessage = throwableProxy.getMessage();
}
throwableProxy = throwableProxy.getCause();
}
if (!Strings.isNullOrEmpty(errorMessage)) {
ErrorClassificationResponse classificationResponse =
new ErrorClassificationResponse.Builder()
.setStageName(stageName)
.setErrorCategory(String.format("%s-'%s'",mdc.get(Logging.TAG_ERROR_CATEGORY),
stageName))
.setErrorReason(mdc.get(Logging.TAG_ERROR_REASON))
.setErrorMessage(errorMessage)
.setErrorType(mdc.get(Logging.TAG_ERROR_TYPE))
.setDependency(mdc.get(Logging.TAG_DEPENDENCY))
.setErrorCodeType(mdc.get(Logging.TAG_ERROR_CODE_TYPE))
.setErrorCode(mdc.get(Logging.TAG_ERROR_CODE))
.setSupportedDocumentationUrl(mdc.get(Logging.TAG_SUPPORTED_DOC_URL))
.build();
responseMap.put(stageName, classificationResponse);
}
}
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responseMap.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ private void addErrorClassificationTags(ILoggingEvent event, Map<String, String>
((ProgramFailureException) throwable).getErrorReason());
modifiableMDC.put(Constants.Logging.TAG_ERROR_TYPE,
((ProgramFailureException) throwable).getErrorType().name());
modifiableMDC.put(Constants.Logging.TAG_DEPENDENCY,
String.valueOf(((ProgramFailureException) throwable).isDependency()));
ErrorCodeType errorCodeType = ((ProgramFailureException) throwable).getErrorCodeType();
String errorCode = ((ProgramFailureException) throwable).getErrorCode();
String supportedDocURL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,4 @@ public void handleError(@Nullable Throwable throwable) {
LOG.error("Received error while chunking logs.", throwable);
close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private AbstractChunkedLogProducer getFullLogsProducer(String format,
* If readRange is outside runRecord's range, then the readRange is adjusted to fall within
* runRecords range.
*/
private ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord,
protected ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord,
boolean fromTimeSpecified) {
if (runRecord == null) {
return readRange;
Expand Down
Loading
Loading