Skip to content

Commit

Permalink
Handle different signatures for some methods through reflection (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
frsv authored Nov 27, 2023
1 parent 99c42ee commit c1b905c
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 18 deletions.
3 changes: 2 additions & 1 deletion src/java/com/google/cloud/bigquery/dwhassessment/hooks/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ java_library(
deps = [
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/avro",
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger",
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception",
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/utils",
"@maven//:org_apache_hadoop_hadoop_mapred_0_22_0",
"@maven//:org_apache_hive_hive_exec_2_2_0",
"@maven//:org_apache_hive_hive_exec",
"@maven//:org_slf4j_slf4j_api",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.VersionValidator.isHiveVersionSupported;

import com.google.cloud.bigquery.dwhassessment.hooks.logger.EventLogger;
import com.google.cloud.bigquery.dwhassessment.hooks.logger.exception.LoggingHookFatalException;
import java.io.File;
import java.time.Clock;
import java.util.Arrays;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void run(HookContext hookContext) throws Exception {
} catch (Throwable e) {
String baseMessage = "Got an exception while processing event.";
// Handles errors such as NoSuchMethodError, NoClassDefFoundError
if (e instanceof LinkageError) {
if (e instanceof LinkageError | e instanceof LoggingHookFatalException) {
String classpath =
Arrays.toString(System.getProperty("java.class.path").split(File.pathSeparator));
String errorMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ java_library(
"@maven//:org_apache_hadoop_hadoop_mapreduce_client_common",
"@maven//:org_apache_hadoop_hadoop_yarn_api",
"@maven//:org_apache_hadoop_hadoop_yarn_client",
"@maven//:org_apache_hive_hive_exec_2_2_0",
"@maven//:org_apache_hive_hive_exec",
"@maven//:org_apache_tez_tez_api_0_8_5",
"@maven//:org_slf4j_slf4j_api",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.PARTITION;
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE;

import com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.ReflectionMethods;
import com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.TasksRetriever;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Clock;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -41,10 +43,13 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.Counters;
Expand Down Expand Up @@ -92,6 +97,8 @@ private GenericRecord getPreHookEvent(HookContext hookContext) {
// Make a copy so that we do not modify hookContext conf.
HiveConf conf = new HiveConf(hookContext.getConf());
ExecutionMode executionMode = getExecutionMode(plan);
Set<ReadEntity> inputs = ReflectionMethods.INSTANCE.getInputs(plan);
Set<WriteEntity> outputs = ReflectionMethods.INSTANCE.getOutputs(plan);

return new GenericRecordBuilder(QUERY_EVENT_SCHEMA)
.set("QueryId", plan.getQueryId())
Expand All @@ -104,10 +111,10 @@ private GenericRecord getPreHookEvent(HookContext hookContext) {
.set("ExecutionMode", executionMode.name())
.set("ExecutionEngine", conf.get("hive.execution.engine"))
.set("Queue", retrieveSessionQueueName(conf, executionMode))
.set("TablesRead", getTablesFromEntitySet(plan.getInputs()))
.set("TablesWritten", getTablesFromEntitySet(plan.getOutputs()))
.set("PartitionsRead", getPartitionsFromEntitySet(plan.getInputs()))
.set("PartitionsWritten", getPartitionsFromEntitySet(plan.getOutputs()))
.set("TablesRead", getTablesFromEntitySet(inputs))
.set("TablesWritten", getTablesFromEntitySet(outputs))
.set("PartitionsRead", getPartitionsFromEntitySet(inputs))
.set("PartitionsWritten", getPartitionsFromEntitySet(outputs))
.set("SessionId", hookContext.getSessionId())
.set("InvokerInfo", conf.getLogIdVar(hookContext.getSessionId()))
.set("ThreadName", hookContext.getThreadId())
Expand All @@ -118,8 +125,8 @@ private GenericRecord getPreHookEvent(HookContext hookContext) {
.set("HiveAddress", getHiveInstanceAddress(hookContext))
.set("HiveInstanceType", getHiveInstanceType(hookContext))
.set("OperationId", hookContext.getOperationId())
.set("DatabasesRead", getDatabasesFromEntitySet(plan.getInputs()))
.set("DatabasesWritten", getDatabasesFromEntitySet(plan.getOutputs()))
.set("DatabasesRead", getDatabasesFromEntitySet(inputs))
.set("DatabasesWritten", getDatabasesFromEntitySet(outputs))
.set("DefaultDatabase", SessionState.get().getCurrentDatabase())
.build();
}
Expand Down Expand Up @@ -201,7 +208,8 @@ private static Optional<String> dumpMapReduceCounters() {
}

private static Optional<String> dumpTezCounters(QueryPlan plan) {
List<TezTask> tezTasks = Utilities.getTezTasks(plan.getRootTasks());
List<Task<?>> rootTasks = ReflectionMethods.INSTANCE.getRootTasks(plan);
List<TezTask> tezTasks = Utilities.getTezTasks(rootTasks);
List<TezCounters> list =
tezTasks.stream().map(TezTask::getTezCounters).collect(Collectors.toList());
return generateCountersJson(
Expand Down Expand Up @@ -245,10 +253,11 @@ private static <C, G extends Iterable<C>> Optional<String> generateCountersJson(
}

private String dumpPerfData(PerfLogger perfLogger) {
Map<String, Long> perfStats = ReflectionMethods.INSTANCE.getStartTimes(perfLogger);
JSONObject perfObj = new JSONObject();
long now = clock.millis();

for (String key : perfLogger.getStartTimes().keySet()) {
for (String key : perfStats.keySet()) {
long duration = perfLogger.getDuration(key);
// Some perf logger entries are finished after the hook. Make the best effort to capture them
// here with the duration at the current time.
Expand Down Expand Up @@ -302,11 +311,13 @@ private static String getRequestUser(HookContext hookContext) {

private static ExecutionMode getExecutionMode(QueryPlan plan) {
// Utilities methods check for null, so possibly it is nullable
if (plan.getRootTasks() != null && plan.getRootTasks().isEmpty()) {
List<Task<?>> rootTasks = ReflectionMethods.INSTANCE.getRootTasks(plan);

if (rootTasks != null && rootTasks.isEmpty()) {
return ExecutionMode.CLIENT_ONLY;
}

List<TezTask> tezTasks = Utilities.getTezTasks(plan.getRootTasks());
List<TezTask> tezTasks = Utilities.getTezTasks(rootTasks);
if (!tezTasks.isEmpty()) {
// Need to go in and check if any of the tasks is running in LLAP mode.
for (TezTask tezTask : tezTasks) {
Expand All @@ -317,11 +328,11 @@ private static ExecutionMode getExecutionMode(QueryPlan plan) {
return ExecutionMode.TEZ;
}

if (!Utilities.getMRTasks(plan.getRootTasks()).isEmpty()) {
if (!Utilities.getMRTasks(rootTasks).isEmpty()) {
return ExecutionMode.MR;
}

if (!Utilities.getSparkTasks(plan.getRootTasks()).isEmpty()) {
if (!Utilities.getSparkTasks(rootTasks).isEmpty()) {
return ExecutionMode.SPARK;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2022-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("@rules_java//java:defs.bzl", "java_library")

package(default_visibility = ["//src:internal"])

java_library(
name = "exception",
srcs = glob(["*.java"]),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.dwhassessment.hooks.logger.exception;

/** Exception indicating a critical issue with the hook processing or initialization. */
public class LoggingHookFatalException extends RuntimeException {
public LoggingHookFatalException(Exception e) {
super(e);
}

public LoggingHookFatalException(String message, Exception e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ java_library(
name = "utils",
srcs = glob(["*.java"]),
deps = [
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger/exception",
"@maven//:commons_lang_commons_lang",
"@maven//:org_apache_hive_hive_exec_2_2_0",
"@maven//:org_apache_hive_hive_exec",
"@maven//:org_slf4j_slf4j_api",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.dwhassessment.hooks.logger.utils;

import com.google.cloud.bigquery.dwhassessment.hooks.logger.exception.LoggingHookFatalException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.log.PerfLogger;

/**
* Component to handle methods and classes, which might differ between Hive versions, with
* reflection.
*
* <p>Although all hive-exec 2.2.x-3.1.3 open source releases are compatible for methods used by the
* logging hook, we saw multiple vendors using their own versions of Hive with version 3.1.3x, but
* using at least part of the codebase shipped in 4.0.0. Normally, the changes are minor, but since
* Java compiles the logging hook JAR against specific signatures, we need to make sure we can
* handle cases when a function returns either HashSet or Set.
*/
public final class ReflectionMethods {

public static final ReflectionMethods INSTANCE = new ReflectionMethods();

private final Method queryPlanGetRootTasks;
private final Method queryPlanGetInputs;
private final Method queryPlanGetOutputs;
private final Method perfLoggerGetStartTimes;
private final Class<?> ddlTaskClass;

private ReflectionMethods() {
try {
Class<?> queryPlan = Class.forName("org.apache.hadoop.hive.ql.QueryPlan");
queryPlanGetRootTasks = queryPlan.getMethod("getRootTasks");
queryPlanGetInputs = queryPlan.getMethod("getInputs");
queryPlanGetOutputs = queryPlan.getMethod("getOutputs");

Class<?> perfLogger = Class.forName("org.apache.hadoop.hive.ql.log.PerfLogger");
perfLoggerGetStartTimes = perfLogger.getMethod("getStartTimes");
} catch (ClassNotFoundException | NoSuchMethodException e) {
throw new LoggingHookFatalException("Failed to initialize methods with reflection", e);
}

Class<?> ddlTaskClassTemp;
try {
ddlTaskClassTemp = Class.forName("org.apache.hadoop.hive.ql.exec.DDLTask");
} catch (ClassNotFoundException unused) {
try {
ddlTaskClassTemp = Class.forName("org.apache.hadoop.hive.ql.ddl.DDLTask");
} catch (ClassNotFoundException e) {
throw new LoggingHookFatalException("Failed to find DDLTask class with reflection", e);
}
}
ddlTaskClass = ddlTaskClassTemp;
}

/**
* Returns {@code QueryPlan#getRootTasks()} invocation result. Some Hive versions return list of
* type {@code Task<? extends java.io.Serializable>} and some - {@code Task<?>}.
*/
public List<Task<?>> getRootTasks(QueryPlan queryPlan) {
return (List<Task<?>>) invokeChecked(queryPlanGetRootTasks, queryPlan);
}

/**
* Returns {@code QueryPlan#getInputs()} invocation result. Some Hive versions return {@code
* java.util.HashSet} and some - {@code java.util.Set}.
*/
public Set<ReadEntity> getInputs(QueryPlan queryPlan) {
return (Set<ReadEntity>) invokeChecked(queryPlanGetInputs, queryPlan);
}

/**
* Returns {@code QueryPlan#getOutputs()} invocation result. Some Hive versions return {@code
* java.util.HashSet} and some - {@code java.util.Set}.
*/
public Set<WriteEntity> getOutputs(QueryPlan queryPlan) {
return (Set<WriteEntity>) invokeChecked(queryPlanGetOutputs, queryPlan);
}

/**
* Returns {@code PerfLogger#getStartTimes()} invocation result. Some Hive versions return {@code
* com.google.common.collect.ImmutableMap} and some - {@code java.util.Map}.
*/
public Map<String, Long> getStartTimes(PerfLogger perfLogger) {
return (Map<String, Long>) invokeChecked(perfLoggerGetStartTimes, perfLogger);
}

/**
* Returns {@code DDLTask} class. In Hive v4 it was moved from {@code
* org.apache.hadoop.hive.ql.exec} to {@code org.apache.hadoop.hive.ql.ddl} package.
*/
public Class<?> getDdlTaskClass() {
return ddlTaskClass;
}

private Object invokeChecked(Method method, Object target) {
try {
return method.invoke(target);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new LoggingHookFatalException("Failed to invoke method with reflection", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.Task;

/** Helper for retrieving tasks of the target type from the query plan tasks. */
public final class TasksRetriever {
private TasksRetriever() {}

public static boolean hasDdlTask(List<Task<? extends Serializable>> tasks) {
Class<?> ddlTaskClass = ReflectionMethods.INSTANCE.getDdlTaskClass();
for (Task<? extends Serializable> task : tasks) {
if (task instanceof DDLTask) {
if (ddlTaskClass.isInstance(task)) {
return true;
}

Expand Down

0 comments on commit c1b905c

Please sign in to comment.