From c1b905c12edcf5a5a2c6e039589f457d16e829ad Mon Sep 17 00:00:00 2001 From: Roman Fursov Date: Mon, 27 Nov 2023 18:05:14 +0100 Subject: [PATCH] Handle different signatures for some methods through reflection (#63) --- .../cloud/bigquery/dwhassessment/hooks/BUILD | 3 +- .../hooks/MigrationAssessmentLoggingHook.java | 3 +- .../bigquery/dwhassessment/hooks/logger/BUILD | 2 +- .../hooks/logger/EventRecordConstructor.java | 35 +++-- .../hooks/logger/exception/BUILD | 21 +++ .../exception/LoggingHookFatalException.java | 27 ++++ .../dwhassessment/hooks/logger/utils/BUILD | 3 +- .../hooks/logger/utils/ReflectionMethods.java | 123 ++++++++++++++++++ .../hooks/logger/utils/TasksRetriever.java | 4 +- 9 files changed, 203 insertions(+), 18 deletions(-) create mode 100644 src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/BUILD create mode 100644 src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/LoggingHookFatalException.java create mode 100644 src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/ReflectionMethods.java diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/BUILD b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/BUILD index 4a39ea7..2a8560c 100644 --- a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/BUILD +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/BUILD @@ -21,9 +21,10 @@ java_library( deps = [ "//src/java/com/google/cloud/bigquery/dwhassessment/hooks/avro", "//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger", + "//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception", "//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils", "@maven//:org_apache_hadoop_hadoop_mapred_0_22_0", - "@maven//:org_apache_hive_hive_exec_2_2_0", + "@maven//:org_apache_hive_hive_exec", "@maven//:org_slf4j_slf4j_api", ], ) diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/MigrationAssessmentLoggingHook.java b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/MigrationAssessmentLoggingHook.java index c83e587..656954f 100644 --- a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/MigrationAssessmentLoggingHook.java +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/MigrationAssessmentLoggingHook.java @@ -20,6 +20,7 @@ import static com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.VersionValidator.isHiveVersionSupported; import com.google.cloud.bigquery.dwhassessment.hooks.logger.EventLogger; +import com.google.cloud.bigquery.dwhassessment.hooks.logger.exception.LoggingHookFatalException; import java.io.File; import java.time.Clock; import java.util.Arrays; @@ -51,7 +52,7 @@ public void run(HookContext hookContext) throws Exception { } catch (Throwable e) { String baseMessage = "Got an exception while processing event."; // Handles errors such as NoSuchMethodError, NoClassDefFoundError - if (e instanceof LinkageError) { + if (e instanceof LinkageError | e instanceof LoggingHookFatalException) { String classpath = Arrays.toString(System.getProperty("java.class.path").split(File.pathSeparator)); String errorMessage = diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/BUILD b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/BUILD index 169fee4..809b3c0 100644 --- a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/BUILD +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/BUILD @@ -30,7 +30,7 @@ java_library( "@maven//:org_apache_hadoop_hadoop_mapreduce_client_common", "@maven//:org_apache_hadoop_hadoop_yarn_api", "@maven//:org_apache_hadoop_hadoop_yarn_client", - "@maven//:org_apache_hive_hive_exec_2_2_0", + "@maven//:org_apache_hive_hive_exec", "@maven//:org_apache_tez_tez_api_0_8_5", "@maven//:org_slf4j_slf4j_api", ], diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/EventRecordConstructor.java b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/EventRecordConstructor.java index d34f0b4..c91d1b7 100644 --- a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/EventRecordConstructor.java +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/EventRecordConstructor.java @@ -25,12 +25,14 @@ import static org.apache.hadoop.hive.ql.hooks.Entity.Type.PARTITION; import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE; +import com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.ReflectionMethods; import com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.TasksRetriever; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Clock; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -41,10 +43,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.Counters; @@ -92,6 +97,8 @@ private GenericRecord getPreHookEvent(HookContext hookContext) { // Make a copy so that we do not modify hookContext conf. HiveConf conf = new HiveConf(hookContext.getConf()); ExecutionMode executionMode = getExecutionMode(plan); + Set inputs = ReflectionMethods.INSTANCE.getInputs(plan); + Set outputs = ReflectionMethods.INSTANCE.getOutputs(plan); return new GenericRecordBuilder(QUERY_EVENT_SCHEMA) .set("QueryId", plan.getQueryId()) @@ -104,10 +111,10 @@ private GenericRecord getPreHookEvent(HookContext hookContext) { .set("ExecutionMode", executionMode.name()) .set("ExecutionEngine", conf.get("hive.execution.engine")) .set("Queue", retrieveSessionQueueName(conf, executionMode)) - .set("TablesRead", getTablesFromEntitySet(plan.getInputs())) - .set("TablesWritten", getTablesFromEntitySet(plan.getOutputs())) - .set("PartitionsRead", getPartitionsFromEntitySet(plan.getInputs())) - .set("PartitionsWritten", getPartitionsFromEntitySet(plan.getOutputs())) + .set("TablesRead", getTablesFromEntitySet(inputs)) + .set("TablesWritten", getTablesFromEntitySet(outputs)) + .set("PartitionsRead", getPartitionsFromEntitySet(inputs)) + .set("PartitionsWritten", getPartitionsFromEntitySet(outputs)) .set("SessionId", hookContext.getSessionId()) .set("InvokerInfo", conf.getLogIdVar(hookContext.getSessionId())) .set("ThreadName", hookContext.getThreadId()) @@ -118,8 +125,8 @@ private GenericRecord getPreHookEvent(HookContext hookContext) { .set("HiveAddress", getHiveInstanceAddress(hookContext)) .set("HiveInstanceType", getHiveInstanceType(hookContext)) .set("OperationId", hookContext.getOperationId()) - .set("DatabasesRead", getDatabasesFromEntitySet(plan.getInputs())) - .set("DatabasesWritten", getDatabasesFromEntitySet(plan.getOutputs())) + .set("DatabasesRead", getDatabasesFromEntitySet(inputs)) + .set("DatabasesWritten", getDatabasesFromEntitySet(outputs)) .set("DefaultDatabase", SessionState.get().getCurrentDatabase()) .build(); } @@ -201,7 +208,8 @@ private static Optional dumpMapReduceCounters() { } private static Optional dumpTezCounters(QueryPlan plan) { - List tezTasks = Utilities.getTezTasks(plan.getRootTasks()); + List> rootTasks = ReflectionMethods.INSTANCE.getRootTasks(plan); + List tezTasks = Utilities.getTezTasks(rootTasks); List list = tezTasks.stream().map(TezTask::getTezCounters).collect(Collectors.toList()); return generateCountersJson( @@ -245,10 +253,11 @@ private static > Optional generateCountersJson( } private String dumpPerfData(PerfLogger perfLogger) { + Map perfStats = ReflectionMethods.INSTANCE.getStartTimes(perfLogger); JSONObject perfObj = new JSONObject(); long now = clock.millis(); - for (String key : perfLogger.getStartTimes().keySet()) { + for (String key : perfStats.keySet()) { long duration = perfLogger.getDuration(key); // Some perf logger entries are finished after the hook. Make the best effort to capture them // here with the duration at the current time. @@ -302,11 +311,13 @@ private static String getRequestUser(HookContext hookContext) { private static ExecutionMode getExecutionMode(QueryPlan plan) { // Utilities methods check for null, so possibly it is nullable - if (plan.getRootTasks() != null && plan.getRootTasks().isEmpty()) { + List> rootTasks = ReflectionMethods.INSTANCE.getRootTasks(plan); + + if (rootTasks != null && rootTasks.isEmpty()) { return ExecutionMode.CLIENT_ONLY; } - List tezTasks = Utilities.getTezTasks(plan.getRootTasks()); + List tezTasks = Utilities.getTezTasks(rootTasks); if (!tezTasks.isEmpty()) { // Need to go in and check if any of the tasks is running in LLAP mode. for (TezTask tezTask : tezTasks) { @@ -317,11 +328,11 @@ private static ExecutionMode getExecutionMode(QueryPlan plan) { return ExecutionMode.TEZ; } - if (!Utilities.getMRTasks(plan.getRootTasks()).isEmpty()) { + if (!Utilities.getMRTasks(rootTasks).isEmpty()) { return ExecutionMode.MR; } - if (!Utilities.getSparkTasks(plan.getRootTasks()).isEmpty()) { + if (!Utilities.getSparkTasks(rootTasks).isEmpty()) { return ExecutionMode.SPARK; } diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/BUILD b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/BUILD new file mode 100644 index 0000000..5721722 --- /dev/null +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/BUILD @@ -0,0 +1,21 @@ +# Copyright 2022-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +load("@rules_java//java:defs.bzl", "java_library") + +package(default_visibility = ["//src:internal"]) + +java_library( + name = "exception", + srcs = glob(["*.java"]), +) diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/LoggingHookFatalException.java b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/LoggingHookFatalException.java new file mode 100644 index 0000000..ae95c45 --- /dev/null +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception/LoggingHookFatalException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022-2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.dwhassessment.hooks.logger.exception; + +/** Exception indicating a critical issue with the hook processing or initialization. */ +public class LoggingHookFatalException extends RuntimeException { + public LoggingHookFatalException(Exception e) { + super(e); + } + + public LoggingHookFatalException(String message, Exception e) { + super(message, e); + } +} diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/BUILD b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/BUILD index 9f97a70..e810cfa 100644 --- a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/BUILD +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/BUILD @@ -19,8 +19,9 @@ java_library( name = "utils", srcs = glob(["*.java"]), deps = [ + "//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception", "@maven//:commons_lang_commons_lang", - "@maven//:org_apache_hive_hive_exec_2_2_0", + "@maven//:org_apache_hive_hive_exec", "@maven//:org_slf4j_slf4j_api", ], ) diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/ReflectionMethods.java b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/ReflectionMethods.java new file mode 100644 index 0000000..35e0260 --- /dev/null +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/ReflectionMethods.java @@ -0,0 +1,123 @@ +/* + * Copyright 2022-2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.dwhassessment.hooks.logger.utils; + +import com.google.cloud.bigquery.dwhassessment.hooks.logger.exception.LoggingHookFatalException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.log.PerfLogger; + +/** + * Component to handle methods and classes, which might differ between Hive versions, with + * reflection. + * + *

Although all hive-exec 2.2.x-3.1.3 open source releases are compatible for methods used by the + * logging hook, we saw multiple vendors using their own versions of Hive with version 3.1.3x, but + * using at least part of the codebase shipped in 4.0.0. Normally, the changes are minor, but since + * Java compiles the logging hook JAR against specific signatures, we need to make sure we can + * handle cases when a function returns either HashSet or Set. + */ +public final class ReflectionMethods { + + public static final ReflectionMethods INSTANCE = new ReflectionMethods(); + + private final Method queryPlanGetRootTasks; + private final Method queryPlanGetInputs; + private final Method queryPlanGetOutputs; + private final Method perfLoggerGetStartTimes; + private final Class ddlTaskClass; + + private ReflectionMethods() { + try { + Class queryPlan = Class.forName("org.apache.hadoop.hive.ql.QueryPlan"); + queryPlanGetRootTasks = queryPlan.getMethod("getRootTasks"); + queryPlanGetInputs = queryPlan.getMethod("getInputs"); + queryPlanGetOutputs = queryPlan.getMethod("getOutputs"); + + Class perfLogger = Class.forName("org.apache.hadoop.hive.ql.log.PerfLogger"); + perfLoggerGetStartTimes = perfLogger.getMethod("getStartTimes"); + } catch (ClassNotFoundException | NoSuchMethodException e) { + throw new LoggingHookFatalException("Failed to initialize methods with reflection", e); + } + + Class ddlTaskClassTemp; + try { + ddlTaskClassTemp = Class.forName("org.apache.hadoop.hive.ql.exec.DDLTask"); + } catch (ClassNotFoundException unused) { + try { + ddlTaskClassTemp = Class.forName("org.apache.hadoop.hive.ql.ddl.DDLTask"); + } catch (ClassNotFoundException e) { + throw new LoggingHookFatalException("Failed to find DDLTask class with reflection", e); + } + } + ddlTaskClass = ddlTaskClassTemp; + } + + /** + * Returns {@code QueryPlan#getRootTasks()} invocation result. Some Hive versions return list of + * type {@code Task} and some - {@code Task}. + */ + public List> getRootTasks(QueryPlan queryPlan) { + return (List>) invokeChecked(queryPlanGetRootTasks, queryPlan); + } + + /** + * Returns {@code QueryPlan#getInputs()} invocation result. Some Hive versions return {@code + * java.util.HashSet} and some - {@code java.util.Set}. + */ + public Set getInputs(QueryPlan queryPlan) { + return (Set) invokeChecked(queryPlanGetInputs, queryPlan); + } + + /** + * Returns {@code QueryPlan#getOutputs()} invocation result. Some Hive versions return {@code + * java.util.HashSet} and some - {@code java.util.Set}. + */ + public Set getOutputs(QueryPlan queryPlan) { + return (Set) invokeChecked(queryPlanGetOutputs, queryPlan); + } + + /** + * Returns {@code PerfLogger#getStartTimes()} invocation result. Some Hive versions return {@code + * com.google.common.collect.ImmutableMap} and some - {@code java.util.Map}. + */ + public Map getStartTimes(PerfLogger perfLogger) { + return (Map) invokeChecked(perfLoggerGetStartTimes, perfLogger); + } + + /** + * Returns {@code DDLTask} class. In Hive v4 it was moved from {@code + * org.apache.hadoop.hive.ql.exec} to {@code org.apache.hadoop.hive.ql.ddl} package. + */ + public Class getDdlTaskClass() { + return ddlTaskClass; + } + + private Object invokeChecked(Method method, Object target) { + try { + return method.invoke(target); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new LoggingHookFatalException("Failed to invoke method with reflection", e); + } + } +} diff --git a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/TasksRetriever.java b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/TasksRetriever.java index fe9b548..efa72f0 100644 --- a/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/TasksRetriever.java +++ b/src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils/TasksRetriever.java @@ -18,7 +18,6 @@ import java.io.Serializable; import java.util.List; -import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.Task; /** Helper for retrieving tasks of the target type from the query plan tasks. */ @@ -26,8 +25,9 @@ public final class TasksRetriever { private TasksRetriever() {} public static boolean hasDdlTask(List> tasks) { + Class ddlTaskClass = ReflectionMethods.INSTANCE.getDdlTaskClass(); for (Task task : tasks) { - if (task instanceof DDLTask) { + if (ddlTaskClass.isInstance(task)) { return true; }