Skip to content

Commit

Permalink
Add EventRecordConstructor responsible for creating GenericRecords ba…
Browse files Browse the repository at this point in the history
…sed on the hook type (#2)

* Add EventRecordConstructor responsible for creating GenericRecords based on the hook type

* Replaced ZoneId with ZoneOffset

* Add DatePartitionedLogger and RecordsWriter (#3)

* Add DatePartitionedLogger and RecordsWriter

DatePartitionedLogger is a part of avro logging hook taking care of managing the system paths and creating RecordWriter instances.

RecordsWriter is an Avro datum writer working with HDFS.

* Add EventLogger (#4)

* Add EventLogger

Event logger is a core handler of the logging hook. It is almost a full copy of the EventLogger in the HiveProtoLoggingHook with minimal adjustments

* Integrate EventLogger to a Hook (#5)

Integrate EventLogger to a Hook

This finishes the backbone implementation of the logging hook. The hook logs Avro messages to GCS (as it was tested on Dataproc. It should work the same with HDFS, but it's for further verification) file in a date partitioned folder. The implementation of rollover is not yet tested, but it was taken as is from the HiveProtoLoggingHook implementation.

Verified on Dataproc with Hive v2.3.6 and v3.
  • Loading branch information
frsv authored Dec 12, 2022
1 parent bcdd066 commit ed97cdb
Show file tree
Hide file tree
Showing 22 changed files with 1,589 additions and 110 deletions.
13 changes: 13 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ maven_install(
"31.0-jre",
neverlink = True,
),
maven.artifact(
"org.apache.commons",
"commons-compress",
"1.10",
neverlink = True,
),
maven.artifact(
"org.apache.hadoop",
"hadoop-common",
"2.2.0",
neverlink = True,
),
maven.artifact(
"org.apache.avro",
"avro",
Expand Down Expand Up @@ -70,6 +82,7 @@ maven_install(
"org.apache.hadoop:hadoop-mapreduce-client-core:2.9.0",
"org.apache.hive:hive-exec:2.2.0",
"com.google.truth:truth:1.1.3",
"com.google.truth.extensions:truth-java8-extension:1.1.3",
"org.apache.curator:apache-curator:2.7.1",
"junit:junit:4.13.2",
"org.mockito:mockito-core:3.11.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ java_library(
srcs = glob(["*.java"]),
deps = [
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/avro",
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/logger",
"@maven//:org_apache_hive_hive_exec_2_2_0",
"@maven//:org_slf4j_slf4j_api",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,9 @@
*/
package com.google.cloud.bigquery.dwhassessment.hooks;

import static org.apache.hadoop.hive.ql.hooks.Entity.Type.PARTITION;
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE;

import com.google.cloud.bigquery.dwhassessment.hooks.avro.AvroSchemaLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import com.google.cloud.bigquery.dwhassessment.hooks.logger.EventLogger;
import java.time.Clock;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.slf4j.Logger;
Expand All @@ -41,87 +28,12 @@ public class MigrationAssessmentLoggingHook implements ExecuteWithHookContext {

private static final Logger LOG = LoggerFactory.getLogger(MigrationAssessmentLoggingHook.class);

private static final Schema QUERY_EVENT_SCHEMA = AvroSchemaLoader.loadSchema("QueryEvents.avsc");

// Just for the example. Real hook will write it to the file
public List<GenericRecord> records = new ArrayList<>();

public void run(HookContext hookContext) throws Exception {
processQueryEventAvro(hookContext);
}

private void processQueryEventAvro(HookContext hookContext) {
QueryPlan plan = hookContext.getQueryPlan();

LOG.info("Received hook notification for: {}", plan.getQueryId());

// Make a copy so that we do not modify hookContext conf.
HiveConf conf = new HiveConf(hookContext.getConf());
List<ExecDriver> mrTasks = Utilities.getMRTasks(plan.getRootTasks());
List<TezTask> tezTasks = Utilities.getTezTasks(plan.getRootTasks());
ExecutionMode executionMode = getExecutionMode(mrTasks, tezTasks);

GenericRecord queryEvent =
new GenericRecordBuilder(QUERY_EVENT_SCHEMA)
.set("QueryId", plan.getQueryId())
.set("QueryText", plan.getQueryStr())
.set("EventType", hookContext.getHookType().name())
.set("Timestamp", plan.getQueryStartTime())
.set("User", getUser(hookContext))
.set("RequestUser", getRequestUser(hookContext))
.set("ExecutionMode", executionMode.name())
.set("Queue", getQueueName(executionMode, conf))
.set("TablesRead", getTablesFromEntitySet(plan.getInputs()))
.set("TablesWritten", getTablesFromEntitySet(plan.getOutputs()))
.build();
records.add(queryEvent);
LOG.info("Processed record: {}", queryEvent);
}

private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
List<String> tableNames = new ArrayList<>();
for (Entity entity : entities) {
if (entity.getType() == TABLE || entity.getType() == PARTITION) {
tableNames.add(entity.getTable().getCompleteName());
}
}
return tableNames;
}

private String getUser(HookContext hookContext) {
return hookContext.getUgi().getShortUserName();
}

private String getRequestUser(HookContext hookContext) {
String requestUser = hookContext.getUserName();
return requestUser == null ? hookContext.getUgi().getUserName() : requestUser;
}

private ExecutionMode getExecutionMode(List<ExecDriver> mrTasks, List<TezTask> tezTasks) {
if (tezTasks.size() > 0) {
// Need to go in and check if any of the tasks is running in LLAP mode.
for (TezTask tezTask : tezTasks) {
if (tezTask.getWork().getLlapMode()) {
return ExecutionMode.LLAP;
}
}
return ExecutionMode.TEZ;
}

return mrTasks.size() > 0 ? ExecutionMode.MR : ExecutionMode.NONE;
}

private String getQueueName(ExecutionMode mode, HiveConf conf) {
switch (mode) {
case LLAP:
return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname);
case MR:
return "TODO_MAPREDUCE";
case TEZ:
return "TODO_TEZ";
case NONE:
default:
return null;
try {
EventLogger logger = EventLogger.getInstance(hookContext.getConf(), Clock.systemUTC());
logger.handle(hookContext);
} catch (Exception e) {
LOG.error("Got exception while processing event", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public static Schema loadSchema(String name) {
throw new IllegalStateException(String.format("Error reading schema '%s'.", name), e);
}
}

private AvroSchemaLoader() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,110 @@
"items": "string"
},
"default": []
},
{
"name": "Status",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ErrorMessage",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PerfObject",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SessionId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "IsTez",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "IsMapReduce",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "InvokerInfo",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ThreadName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "HookVersion",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ClientIpAddress",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "HiveAddress",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "HiveInstanceType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LlapApplicationId",
"type": [
"null",
"string"
],
"default": null
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("@rules_java//java:defs.bzl", "java_library")

package(default_visibility = ["//src:internal"])

java_library(
name = "logger",
srcs = glob(["*.java"]),
deps = [
"//src/java/com/google/cloud/bigquery/dwhassessment/hooks/avro",
"@maven//:com_google_guava_guava",
"@maven//:org_apache_avro_avro",
"@maven//:org_apache_commons_commons_compress",
"@maven//:org_apache_hadoop_hadoop_common",
"@maven//:org_apache_hive_hive_exec_2_2_0",
"@maven//:org_slf4j_slf4j_api",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.dwhassessment.hooks.logger;

import java.io.IOException;
import java.time.Clock;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Factory for {@link RecordsWriter} instances. Manages them to write to files, partitioned by
* dates.
*/
public class DatePartitionedRecordsWriterFactory {
private static final Logger LOG =
LoggerFactory.getLogger(DatePartitionedRecordsWriterFactory.class);
private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short) 1023);
private final Path basePath;
private final Configuration conf;
private final Schema schema;
private final Clock clock;

public DatePartitionedRecordsWriterFactory(
Path baseDir, Configuration conf, Schema schema, Clock clock) throws IOException {
this.conf = conf;
this.createDirIfNotExists(baseDir);
this.schema = schema;
this.clock = clock;
basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
}

public static LocalDate getDateFromDir(String dirName) {
try {
return LocalDate.parse(dirName, DateTimeFormatter.ISO_LOCAL_DATE);
} catch (DateTimeParseException e) {
throw new IllegalArgumentException("Invalid directory: " + dirName, e);
}
}

public RecordsWriter createWriter(String fileName) throws IOException {
Path filePath = getPathForDate(getNow(), fileName);
return new RecordsWriter(conf, filePath, schema);
}

private void createDirIfNotExists(Path path) throws IOException {
FileSystem fileSystem = path.getFileSystem(conf);

try {
if (!fileSystem.exists(path)) {
fileSystem.mkdirs(path);
fileSystem.setPermission(path, DIR_PERMISSION);
}
} catch (IOException e) {
LOG.warn("Error while trying to set permission", e);
}
}

private Path getPathForDate(LocalDate date, String fileName) throws IOException {
Path path = new Path(basePath, getDirForDate(date));
createDirIfNotExists(path);
return new Path(path, fileName);
}

private String getDirForDate(LocalDate date) {
return DateTimeFormatter.ISO_LOCAL_DATE.format(date);
}

public LocalDate getNow() {
return clock.instant().atOffset(ZoneOffset.UTC).toLocalDate();
}
}
Loading

0 comments on commit ed97cdb

Please sign in to comment.