diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 06fb2503b56d..b2b80d4265bd 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -1533,6 +1533,7 @@ public static final class Logging { public static final String TAG_ERROR_CATEGORY = "errorCategory"; public static final String TAG_ERROR_REASON = "errorReason"; public static final String TAG_ERROR_TYPE = "errorType"; + public static final String TAG_DEPENDENCY = "dependency"; public static final String TAG_ERROR_CODE_TYPE = "errorCodeType"; public static final String TAG_ERROR_CODE = "errorCode"; public static final String TAG_SUPPORTED_DOC_URL = "supportDocUrl"; diff --git a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java index c94c335751cc..481d0f6cae55 100644 --- a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java +++ b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java @@ -162,11 +162,12 @@ private RouteDestination getV3RoutingService(String[] uriParts, AllowedMethod re || beginsWith(uriParts, "v3", "credentials")) { return APP_FABRIC_HTTP; } else if ((uriParts.length >= 8 && uriParts[7].equals("logs")) - || (uriParts.length >= 10 && uriParts[9].equals("logs")) + || (uriParts.length >= 10 && (uriParts[9].equals("logs") || uriParts[9].equals("classify"))) || (uriParts.length >= 6 && uriParts[5].equals("logs"))) { //Log Handler Paths: // /v3/namespaces//apps////logs // /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/logs + // /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/classify return LOG_QUERY; } else if (uriParts.length >= 2 && uriParts[1].equals("metrics")) { //Metrics Search Handler Path /v3/metrics diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/ErrorClassificationResponse.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/ErrorClassificationResponse.java new file mode 100644 index 000000000000..93c875ed106a --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/ErrorClassificationResponse.java @@ -0,0 +1,204 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto; + +/** + * Represents the response for classifying error logs. + */ +public class ErrorClassificationResponse { + private final String stageName; + private final String errorCategory; + private final String errorReason; + private final String errorMessage; + private final String errorType; + private final String dependency; + private final String errorCodeType; + private final String errorCode; + private final String supportedDocumentationUrl; + + private ErrorClassificationResponse(String stageName, String errorCategory, String errorReason, + String errorMessage, String errorType, String dependency, String errorCodeType, + String errorCode, String supportedDocumentationUrl) { + this.stageName = stageName; + this.errorCategory = errorCategory; + this.errorReason = errorReason; + this.errorMessage = errorMessage; + this.errorType = errorType; + this.dependency = dependency; + this.errorCodeType = errorCodeType; + this.errorCode = errorCode; + this.supportedDocumentationUrl = supportedDocumentationUrl; + } + + /** + * Gets the stage name for ErrorClassificationResponse. + */ + public String getStageName() { + return stageName; + } + + /** + * Gets the error category for ErrorClassificationResponse. + */ + public String getErrorCategory() { + return errorCategory; + } + + /** + * Gets the error reason for ErrorClassificationResponse. + */ + public String getErrorReason() { + return errorReason; + } + + /** + * Gets the error message for ErrorClassificationResponse. + */ + public String getErrorMessage() { + return errorMessage; + } + + /** + * Gets the error type for ErrorClassificationResponse. + */ + public String getErrorType() { + return errorType; + } + + /** + * Gets the dependency flag for ErrorClassificationResponse. + */ + public String getDependency() { + return dependency; + } + + /** + * Gets the error code type for ErrorClassificationResponse. + */ + public String getErrorCodeType() { + return errorCodeType; + } + + /** + * Gets the error code for ErrorClassificationResponse. + */ + public String getErrorCode() { + return errorCode; + } + + /** + * Gets the supported documentation URL for ErrorClassificationResponse. + */ + public String getSupportedDocumentationUrl() { + return supportedDocumentationUrl; + } + + /** + * Builder for {@link ErrorClassificationResponse}. + */ + public static class Builder { + private String stageName; + private String errorCategory; + private String errorReason; + private String errorMessage; + private String errorType; + private String dependency; + private String errorCodeType; + private String errorCode; + private String supportedDocumentationUrl; + + /** + * Sets the stage name for ErrorClassificationResponse. + */ + public Builder setStageName(String stageName) { + this.stageName = stageName; + return this; + } + + /** + * Sets the error category for ErrorClassificationResponse. + */ + public Builder setErrorCategory(String errorCategory) { + this.errorCategory = errorCategory; + return this; + } + + /** + * Sets the error reason for ErrorClassificationResponse. + */ + public Builder setErrorReason(String errorReason) { + this.errorReason = errorReason; + return this; + } + + /** + * Sets the error message for ErrorClassificationResponse. + */ + public Builder setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + /** + * Sets the error type for ErrorClassificationResponse. + */ + public Builder setErrorType(String errorType) { + this.errorType = errorType; + return this; + } + + /** + * Sets the dependency flag for ErrorClassificationResponse. + */ + public Builder setDependency(String dependency) { + this.dependency = dependency; + return this; + } + + /** + * Sets the error code type for ErrorClassificationResponse. + */ + public Builder setErrorCodeType(String errorCodeType) { + this.errorCodeType = errorCodeType; + return this; + } + + /** + * Sets the error code for ErrorClassificationResponse. + */ + public Builder setErrorCode(String errorCode) { + this.errorCode = errorCode; + return this; + } + + /** + * Sets the supported documentation URL for ErrorClassificationResponse. + */ + public Builder setSupportedDocumentationUrl(String supportedDocumentationUrl) { + this.supportedDocumentationUrl = supportedDocumentationUrl; + return this; + } + + /** + * Builds and returns a new instance of ErrorClassificationResponse. + */ + public ErrorClassificationResponse build() { + return new ErrorClassificationResponse(stageName, errorCategory, errorReason, errorMessage, + errorType, dependency, errorCodeType, errorCode, supportedDocumentationUrl); + } + } +} diff --git a/cdap-watchdog/pom.xml b/cdap-watchdog/pom.xml index bc2a797429b1..04f1a85174d7 100644 --- a/cdap-watchdog/pom.xml +++ b/cdap-watchdog/pom.xml @@ -110,6 +110,11 @@ junit junit + + org.mockito + mockito-core + test + pl.pragmatists JUnitParams diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java new file mode 100644 index 000000000000..21b2f4027264 --- /dev/null +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java @@ -0,0 +1,87 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.logging; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import com.google.gson.Gson; +import io.cdap.cdap.api.dataset.lib.CloseableIterator; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.common.conf.Constants.Logging; +import io.cdap.cdap.logging.read.LogEvent; +import io.cdap.cdap.proto.ErrorClassificationResponse; +import io.cdap.http.HttpResponder; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.util.HashMap; +import java.util.Map; +import org.elasticsearch.common.Strings; + +/** + * Classifies error logs and returns {@link ErrorClassificationResponse}. + * TODO - + * - Add rule based classification. + * - Handle cases when stage name is not present in the mdc. + */ +public class ErrorLogsClassifier { + private static final Gson GSON = new Gson(); + + /** + * Classifies error logs and returns {@link ErrorClassificationResponse}. + * + * @param logIter Logs Iterator that can be closed. + * @param responder The HttpResponder. + */ + public void classify(CloseableIterator logIter, HttpResponder responder) { + Map responseMap = new HashMap<>(); + while(logIter.hasNext()) { + ILoggingEvent logEvent = logIter.next().getLoggingEvent(); + Map mdc = logEvent.getMDCPropertyMap(); + if (!mdc.containsKey(Logging.TAG_FAILED_STAGE)) { + continue; + } + String stageName = mdc.get(Logging.TAG_FAILED_STAGE); + String errorMessage = null; + if (responseMap.containsKey(stageName)) { + continue; + } + IThrowableProxy throwableProxy = logEvent.getThrowableProxy(); + while (throwableProxy != null) { + if (ProgramFailureException.class.getName().equals(throwableProxy.getClassName())) { + errorMessage = throwableProxy.getMessage(); + } + throwableProxy = throwableProxy.getCause(); + } + if (!Strings.isNullOrEmpty(errorMessage)) { + ErrorClassificationResponse classificationResponse = + new ErrorClassificationResponse.Builder() + .setStageName(stageName) + .setErrorCategory(String.format("%s-'%s'",mdc.get(Logging.TAG_ERROR_CATEGORY), + stageName)) + .setErrorReason(mdc.get(Logging.TAG_ERROR_REASON)) + .setErrorMessage(errorMessage) + .setErrorType(mdc.get(Logging.TAG_ERROR_TYPE)) + .setDependency(mdc.get(Logging.TAG_DEPENDENCY)) + .setErrorCodeType(mdc.get(Logging.TAG_ERROR_CODE_TYPE)) + .setErrorCode(mdc.get(Logging.TAG_ERROR_CODE)) + .setSupportedDocumentationUrl(mdc.get(Logging.TAG_SUPPORTED_DOC_URL)) + .build(); + responseMap.put(stageName, classificationResponse); + } + } + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responseMap.values())); + } +} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java index bed735b72d88..a561bd7b3abd 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java @@ -141,6 +141,8 @@ private void addErrorClassificationTags(ILoggingEvent event, Map ((ProgramFailureException) throwable).getErrorReason()); modifiableMDC.put(Constants.Logging.TAG_ERROR_TYPE, ((ProgramFailureException) throwable).getErrorType().name()); + modifiableMDC.put(Constants.Logging.TAG_DEPENDENCY, + String.valueOf(((ProgramFailureException) throwable).isDependency())); ErrorCodeType errorCodeType = ((ProgramFailureException) throwable).getErrorCodeType(); String errorCode = ((ProgramFailureException) throwable).getErrorCode(); String supportedDocURL = diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractChunkedLogProducer.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractChunkedLogProducer.java index 0cb001e7ec11..b914ca480ff6 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractChunkedLogProducer.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractChunkedLogProducer.java @@ -96,5 +96,4 @@ public void handleError(@Nullable Throwable throwable) { LOG.error("Received error while chunking logs.", throwable); close(); } - } diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractLogHttpHandler.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractLogHttpHandler.java index 068b2a99da29..c591ac9997dc 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractLogHttpHandler.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/AbstractLogHttpHandler.java @@ -204,7 +204,7 @@ private AbstractChunkedLogProducer getFullLogsProducer(String format, * If readRange is outside runRecord's range, then the readRange is adjusted to fall within * runRecords range. */ - private ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord, + protected ReadRange adjustReadRange(ReadRange readRange, @Nullable RunRecordDetail runRecord, boolean fromTimeSpecified) { if (runRecord == null) { return readRange; diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/ErrorClassificationHttpHandler.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/ErrorClassificationHttpHandler.java new file mode 100644 index 000000000000..d8aa986ad5a0 --- /dev/null +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/gateway/handlers/ErrorClassificationHttpHandler.java @@ -0,0 +1,136 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.logging.gateway.handlers; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.cdap.cdap.api.dataset.lib.CloseableIterator; +import io.cdap.cdap.common.NotFoundException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.logging.LoggingContext; +import io.cdap.cdap.internal.app.store.RunRecordDetail; +import io.cdap.cdap.logging.ErrorLogsClassifier; +import io.cdap.cdap.logging.context.LoggingContextHelper; +import io.cdap.cdap.logging.filter.Filter; +import io.cdap.cdap.logging.filter.FilterParser; +import io.cdap.cdap.logging.read.LogEvent; +import io.cdap.cdap.logging.read.LogOffset; +import io.cdap.cdap.logging.read.LogReader; +import io.cdap.cdap.logging.read.ReadRange; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramReference; +import io.cdap.cdap.proto.security.StandardPermission; +import io.cdap.cdap.security.spi.authentication.AuthenticationContext; +import io.cdap.cdap.security.spi.authorization.AccessEnforcer; +import io.cdap.cdap.security.spi.authorization.UnauthorizedException; +import io.cdap.http.HttpHandler; +import io.cdap.http.HttpResponder; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.io.IOException; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * v3 {@link HttpHandler} to handle /classify requests + */ +@Singleton +@Path(Constants.Gateway.API_VERSION_3) +public class ErrorClassificationHttpHandler extends AbstractLogHttpHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ErrorClassificationHttpHandler.class); + private final LogReader logReader; + private final ProgramRunRecordFetcher programRunRecordFetcher; + private final AccessEnforcer accessEnforcer; + private final AuthenticationContext authenticationContext; + private final ErrorLogsClassifier errorLogsClassifier; + + /** + * Constructor for ErrorClassificationHttpHandler. + */ + @Inject + public ErrorClassificationHttpHandler(AccessEnforcer accessEnforcer, + AuthenticationContext authenticationContext, + LogReader logReader, + ProgramRunRecordFetcher programRunFetcher, + ErrorLogsClassifier errorLogsClassifier, + CConfiguration cConf) { + super(cConf); + this.logReader = logReader; + this.programRunRecordFetcher = programRunFetcher; + this.accessEnforcer = accessEnforcer; + this.authenticationContext = authenticationContext; + this.errorLogsClassifier = errorLogsClassifier; + } + + + private RunRecordDetail getRunRecordMeta(ProgramReference programRef, String runId) + throws IOException, NotFoundException, UnauthorizedException { + RunRecordDetail runRecordMeta = programRunRecordFetcher.getRunRecordMeta(programRef, runId); + if (runRecordMeta == null) { + throw new NotFoundException( + String.format("No run record found for program %s and runID: %s", programRef, runId)); + } + return runRecordMeta; + } + + private void ensureVisibilityOnProgram(String namespace, String application, String programType, + String program) { + ApplicationId appId = new ApplicationId(namespace, application); + ProgramId programId = new ProgramId(appId, ProgramType.valueOfCategoryName(programType), + program); + accessEnforcer.enforce(programId, authenticationContext.getPrincipal(), StandardPermission.GET); + } + + @POST + @Path("/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/classify") + public void classifyRunIdLogs(HttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId, + @PathParam("app-id") String appId, @PathParam("program-type") String programType, + @PathParam("program-id") String programId, + @PathParam("run-id") String runId) throws Exception { + ensureVisibilityOnProgram(namespaceId, appId, programType, programId); + ProgramType type = ProgramType.valueOfCategoryName(programType); + ProgramReference programRef = new ProgramReference(namespaceId, appId, type, programId); + RunRecordDetail runRecord = getRunRecordMeta(programRef, runId); + if (runRecord.getStatus() != ProgramRunStatus.FAILED) { + throw new IllegalArgumentException("Classification is only supported for failed runs"); + } + LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId(programRef, + runId, runRecord.getSystemArgs()); + + Filter filter = FilterParser.parse("loglevel=ERROR"); + ReadRange readRange = new ReadRange(0, System.currentTimeMillis(), + LogOffset.INVALID_KAFKA_OFFSET); + readRange = adjustReadRange(readRange, runRecord, true); + try (CloseableIterator logIter = logReader.getLog(loggingContext, + readRange.getFromMillis(), readRange.getToMillis(), filter)) { + // the iterator is closed by the BodyProducer passed to the HttpResponder + errorLogsClassifier.classify(logIter, responder); + } catch (Exception ex) { + LOG.debug("Exception while classifying logs for logging context {}", loggingContext, ex); + responder.sendStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + } +} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/guice/LogQueryRuntimeModule.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/guice/LogQueryRuntimeModule.java index 2d504ddb1331..b2fa0b131524 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/guice/LogQueryRuntimeModule.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/guice/LogQueryRuntimeModule.java @@ -25,6 +25,7 @@ import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.runtime.RuntimeModule; import io.cdap.cdap.gateway.handlers.CommonHandlers; +import io.cdap.cdap.logging.gateway.handlers.ErrorClassificationHttpHandler; import io.cdap.cdap.logging.gateway.handlers.LocalProgramRunRecordFetcher; import io.cdap.cdap.logging.gateway.handlers.LogHttpHandler; import io.cdap.cdap.logging.gateway.handlers.ProgramRunRecordFetcher; @@ -41,6 +42,7 @@ private static void bindHandlers(Binder binder) { Multibinder handlerBinder = Multibinder.newSetBinder(binder, HttpHandler.class, Names.named(Constants.Service.LOG_QUERY)); handlerBinder.addBinding().to(LogHttpHandler.class); + handlerBinder.addBinding().to(ErrorClassificationHttpHandler.class); CommonHandlers.add(handlerBinder); } diff --git a/cdap-watchdog/src/test/java/io/cdap/cdap/logging/ErrorLogsClassifierTest.java b/cdap-watchdog/src/test/java/io/cdap/cdap/logging/ErrorLogsClassifierTest.java new file mode 100644 index 000000000000..168449534ff0 --- /dev/null +++ b/cdap-watchdog/src/test/java/io/cdap/cdap/logging/ErrorLogsClassifierTest.java @@ -0,0 +1,103 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.logging; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import io.cdap.cdap.api.dataset.lib.CloseableIterator; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.logging.read.LogEvent; +import io.cdap.cdap.logging.read.LogOffset; +import io.cdap.cdap.proto.ErrorClassificationResponse; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test class for {@link ErrorLogsClassifier}. + */ +public class ErrorLogsClassifierTest { + + private static final Gson GSON = new Gson(); + private final MockResponder responder = new MockResponder(); + private final ErrorLogsClassifier classifier = new ErrorLogsClassifier(); + + @Test + public void testClassifyLogs() { + LogEvent logEvent = new LogEvent(getEvent(), LogOffset.LATEST_OFFSET); + Iterator iterator = Collections.singletonList(logEvent).iterator(); + CloseableIterator closeableIterator = new CloseableIterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public LogEvent next() { + return iterator.next(); + } + + @Override + public void close() { + // no-op + } + }; + classifier.classify(closeableIterator, responder); + Type listType = new TypeToken>() {}.getType(); + List responses = + GSON.fromJson(responder.getResponseContentAsString(), listType); + Assert.assertEquals(1, responses.size()); + Assert.assertEquals("stageName", responses.get(0).getStageName()); + Assert.assertEquals("errorCategory-'stageName'", responses.get(0).getErrorCategory()); + Assert.assertEquals("errorReason", responses.get(0).getErrorReason()); + Assert.assertEquals("some error occurred", responses.get(0).getErrorMessage()); + Assert.assertEquals("errorType", responses.get(0).getErrorType()); + Assert.assertEquals("dependency", responses.get(0).getDependency()); + Assert.assertEquals("errorCodeType", responses.get(0).getErrorCodeType()); + Assert.assertEquals("errorCode", responses.get(0).getErrorCode()); + Assert.assertEquals("supportedDocumentationUrl", + responses.get(0).getSupportedDocumentationUrl()); + } + + private ILoggingEvent getEvent() { + Map map = new HashMap<>(); + map.put(Constants.Logging.TAG_FAILED_STAGE, "stageName"); + map.put(Constants.Logging.TAG_ERROR_CATEGORY, "errorCategory"); + map.put(Constants.Logging.TAG_ERROR_REASON, "errorReason"); + map.put(Constants.Logging.TAG_ERROR_TYPE, "errorType"); + map.put(Constants.Logging.TAG_DEPENDENCY, "dependency"); + map.put(Constants.Logging.TAG_ERROR_CODE_TYPE, "errorCodeType"); + map.put(Constants.Logging.TAG_ERROR_CODE, "errorCode"); + map.put(Constants.Logging.TAG_SUPPORTED_DOC_URL, "supportedDocumentationUrl"); + IThrowableProxy throwableProxy = Mockito.mock(IThrowableProxy.class); + Mockito.when(throwableProxy.getMessage()).thenReturn("some error occurred"); + Mockito.when(throwableProxy.getClassName()).thenReturn(ProgramFailureException.class.getName()); + ILoggingEvent event = Mockito.mock(ILoggingEvent.class); + Mockito.when(event.getThrowableProxy()).thenReturn(throwableProxy); + Mockito.when(event.getMDCPropertyMap()).thenReturn(map); + return event; + } +} diff --git a/cdap-watchdog/src/test/java/io/cdap/cdap/logging/MockResponder.java b/cdap-watchdog/src/test/java/io/cdap/cdap/logging/MockResponder.java new file mode 100644 index 000000000000..d37cf0aa4fc1 --- /dev/null +++ b/cdap-watchdog/src/test/java/io/cdap/cdap/logging/MockResponder.java @@ -0,0 +1,104 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.logging; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import io.cdap.http.AbstractHttpResponder; +import io.cdap.http.BodyProducer; +import io.cdap.http.ChunkResponder; +import io.cdap.http.HttpResponder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * A mock implementation of {@link HttpResponder} that only record the response status. + */ +public final class MockResponder extends AbstractHttpResponder { + + private static final Gson GSON = new Gson(); + + private HttpResponseStatus status; + private ByteBuf content; + + public HttpResponseStatus getStatus() { + return status; + } + + public String getResponseContentAsString() { + return content.toString(StandardCharsets.UTF_8); + } + + public T decodeResponseContent(Type type) { + return decodeResponseContent(type, GSON); + } + + public T decodeResponseContent(Type type, Gson gson) { + JsonReader jsonReader = new JsonReader(new InputStreamReader + (new ByteBufInputStream(content), StandardCharsets.UTF_8)); + return gson.fromJson(jsonReader, type); + } + + @Override + public ChunkResponder sendChunkStart(HttpResponseStatus status, HttpHeaders headers) { + this.status = status; + return new ChunkResponder() { + @Override + public void sendChunk(ByteBuffer chunk) throws IOException { + sendChunk(Unpooled.wrappedBuffer(chunk)); + } + + @Override + public void sendChunk(ByteBuf chunk) throws IOException { + if (content == null) { + content = Unpooled.buffer(chunk.readableBytes()); + } + content.writeBytes(chunk); + } + + @Override + public void close() throws IOException { + // No-op + } + }; + } + + @Override + public void sendContent(HttpResponseStatus status, ByteBuf content, HttpHeaders headers) { + this.content = content.copy(); + this.status = status; + } + + @Override + public void sendFile(File file, HttpHeaders headers) { + this.status = HttpResponseStatus.OK; + } + + @Override + public void sendContent(HttpResponseStatus httpResponseStatus, BodyProducer bodyProducer, HttpHeaders headers) { + this.status = HttpResponseStatus.OK; + } +} diff --git a/cdap-watchdog/src/test/java/io/cdap/cdap/logging/appender/ErrorClassificationLoggingTest.java b/cdap-watchdog/src/test/java/io/cdap/cdap/logging/appender/ErrorClassificationLoggingTest.java index 3e4cb20945a2..582568b893c7 100644 --- a/cdap-watchdog/src/test/java/io/cdap/cdap/logging/appender/ErrorClassificationLoggingTest.java +++ b/cdap-watchdog/src/test/java/io/cdap/cdap/logging/appender/ErrorClassificationLoggingTest.java @@ -77,6 +77,7 @@ public void testErrorClassificationTagsArePresent() { .withErrorCategory(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN)) .withErrorReason("error Reason") .withErrorType(ErrorType.USER) + .withDependency(true) .withErrorCodeType(ErrorCodeType.HTTP) .withErrorCode("403") .withSupportedDocumentationUrl("http://www.example.com") @@ -97,6 +98,7 @@ public void testErrorClassificationTagsArePresent() { String errorCategory = mdc.get(Logging.TAG_ERROR_CATEGORY); String errorReason = mdc.get(Logging.TAG_ERROR_REASON); String errorType = mdc.get(Logging.TAG_ERROR_TYPE); + String dependency = mdc.get(Logging.TAG_DEPENDENCY); String errorCodeType = mdc.get(Logging.TAG_ERROR_CODE_TYPE); String errorCode = mdc.get(Logging.TAG_ERROR_CODE); String supportedDocumentationUrl = mdc.get(Logging.TAG_SUPPORTED_DOC_URL); @@ -104,6 +106,7 @@ public void testErrorClassificationTagsArePresent() { Assert.assertEquals("Plugin", errorCategory); Assert.assertEquals("error Reason", errorReason); Assert.assertEquals("USER", errorType); + Assert.assertEquals(Boolean.TRUE.toString(), dependency); Assert.assertEquals("HTTP", errorCodeType); Assert.assertEquals("403", errorCode); Assert.assertEquals("http://www.example.com", supportedDocumentationUrl);