Skip to content

Commit

Permalink
Implement ErrorClassification API for program runs
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Dec 18, 2024
1 parent 3d1422a commit 2245c19
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.proto;

public class ErrorClassificationResponse {
private final String stageName;
private final String errorCategory;
private final String errorReason;
private final String errorMessage;
private final String errorType;
private final String errorCodeType;
private final String errorCode;
private final String supportedDocumentationUrl;

public ErrorClassificationResponse(String stageName, String errorCategory, String errorReason,
String errorMessage, String errorType, String errorCodeType, String errorCode,
String supportedDocumentationUrl) {
this.stageName = stageName;
this.errorCategory = errorCategory;
this.errorReason = errorReason;
this.errorMessage = errorMessage;
this.errorType = errorType;
this.errorCodeType = errorCodeType;
this.errorCode = errorCode;
this.supportedDocumentationUrl = supportedDocumentationUrl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import com.google.gson.Gson;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.common.conf.Constants.Logging;
import io.cdap.cdap.logging.read.LogEvent;
import io.cdap.cdap.proto.ErrorClassificationResponse;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
import org.elasticsearch.common.Strings;

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
*/
public class ErrorLogsClassifier {
private static final Gson GSON = new Gson();

public void classify(CloseableIterator<LogEvent> logIter, HttpResponder responder) {
Map<String, ErrorClassificationResponse> responseMap = new HashMap<>();
while(logIter.hasNext()) {
ILoggingEvent logEvent = logIter.next().getLoggingEvent();
Map<String, String> mdc = logEvent.getMDCPropertyMap();
if (!mdc.containsKey(Logging.TAG_FAILED_STAGE)) {
continue;
}
String stageName = mdc.get(Logging.TAG_FAILED_STAGE);
String errorMessage = null;
if (responseMap.containsKey(stageName)) {
continue;
}
IThrowableProxy throwableProxy = logEvent.getThrowableProxy();
while (throwableProxy != null) {
if (ProgramFailureException.class.getName().equals(throwableProxy.getClassName())) {
errorMessage = throwableProxy.getMessage();
}
throwableProxy = throwableProxy.getCause();
}
if (!Strings.isNullOrEmpty(errorMessage)) {
ErrorClassificationResponse classificationResponse =
new ErrorClassificationResponse(stageName,
String.format("%s-'%s'",mdc.get(Logging.TAG_ERROR_CATEGORY), stageName),
mdc.get(Logging.TAG_ERROR_REASON), errorMessage, mdc.get(Logging.TAG_ERROR_TYPE),
mdc.get(Logging.TAG_ERROR_CODE_TYPE), mdc.get(Logging.TAG_ERROR_CODE),
mdc.get(Logging.TAG_SUPPORTED_DOC_URL));
responseMap.put(stageName, classificationResponse);
}
}
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responseMap.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,4 @@ public void handleError(@Nullable Throwable throwable) {
LOG.error("Received error while chunking logs.", throwable);
close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private AbstractChunkedLogProducer getFullLogsProducer(String format,
* If readRange is outside runRecord's range, then the readRange is adjusted to fall within
* runRecords range.
*/
private ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord,
protected ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord,
boolean fromTimeSpecified) {
if (runRecord == null) {
return readRange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.internal.app.store.RunRecordDetail;
import io.cdap.cdap.logging.ErrorLogsClassifier;
import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.logging.filter.Filter;
import io.cdap.cdap.logging.filter.FilterParser;
import io.cdap.cdap.logging.read.LogEvent;
import io.cdap.cdap.logging.read.LogOffset;
import io.cdap.cdap.logging.read.LogReader;
import io.cdap.cdap.logging.read.ReadRange;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.NamespaceId;
Expand All @@ -38,13 +46,17 @@
import io.cdap.http.HttpHandler;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.List;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* v3 {@link HttpHandler} to handle /logs requests
Expand All @@ -53,6 +65,7 @@
@Path(Constants.Gateway.API_VERSION_3)
public class LogHttpHandler extends AbstractLogHttpHandler {

private static final Logger LOG = LoggerFactory.getLogger(LogHttpHandler.class);
private final AccessEnforcer accessEnforcer;
private final AuthenticationContext authenticationContext;
private final LogReader logReader;
Expand Down Expand Up @@ -115,6 +128,40 @@ public void getRunIdLogs(HttpRequest request, HttpResponder responder,
escape, filterStr, runRecord, format, suppress);
}

@POST
@Path("/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/logs/classify")
public void classifyRunIdLogs(HttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@PathParam("app-id") String appId, @PathParam("program-type") String programType,
@PathParam("program-id") String programId,
@PathParam("run-id") String runId) throws Exception {
ensureVisibilityOnProgram(namespaceId, appId, programType, programId);
ProgramType type = ProgramType.valueOfCategoryName(programType);
ProgramReference programRef = new ProgramReference(namespaceId, appId, type, programId);
RunRecordDetail runRecord = getRunRecordMeta(programRef, runId);
if (runRecord.getStatus() != ProgramRunStatus.FAILED) {
throw new IllegalArgumentException("Classification is only supported for failed runs");
}
LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId(programRef,
runId, runRecord.getSystemArgs());

Filter filter = FilterParser.parse("loglevel=ERROR");
ReadRange readRange = new ReadRange(0, System.currentTimeMillis(),
LogOffset.INVALID_KAFKA_OFFSET);
readRange = adjustReadRange(readRange, runRecord, true);
try {
// the iterator is closed by the BodyProducer passed to the HttpResponder
CloseableIterator<LogEvent> logIter = logReader.getLog(loggingContext,
readRange.getFromMillis(),
readRange.getToMillis(), filter);
ErrorLogsClassifier classifier = new ErrorLogsClassifier();
classifier.classify(logIter, responder);
} catch (Exception ex) {
LOG.debug("Exception while classifying logs for logging context {}", loggingContext, ex);
responder.sendStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}

@GET
@Path("/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/logs/next")
public void next(HttpRequest request, HttpResponder responder,
Expand Down

0 comments on commit 2245c19

Please sign in to comment.