Skip to content

Commit

Permalink
Revert YARN applications data retrieval changes (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
frsv authored Jun 22, 2023
1 parent 0dd8a2c commit 98d0db2
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@
],
"default": null
},
{
"name": "YarnApplicationId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DatabasesRead",
"type": [
Expand Down Expand Up @@ -275,14 +283,6 @@
"string"
],
"default": null
},
{
"name": "ApplicationData",
"type": [
"null",
"string"
],
"default": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package com.google.cloud.bigquery.dwhassessment.hooks.logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
Expand All @@ -39,17 +36,17 @@ public class ApplicationIdRetriever {

private static final Logger LOG = LoggerFactory.getLogger(ApplicationIdRetriever.class);

public static List<ApplicationId> determineApplicationIds(
public static Optional<ApplicationId> determineApplicationId(
HiveConf conf, ExecutionMode executionMode) {
switch (executionMode) {
case MR:
return determineMapReduceApplicationIds();
return determineMapReduceApplicationId();
case TEZ:
return determineTezApplicationIds();
return determineTezApplicationId();
case LLAP:
return determineLlapApplicationIds(conf, executionMode);
return determineLlapApplicationId(conf, executionMode);
default:
return new ArrayList<>();
return Optional.empty();
}
}

Expand All @@ -58,28 +55,27 @@ public static List<ApplicationId> determineApplicationIds(
* application always have only one queue – if queue changes in the session, new application is
* created.
*/
private static List<ApplicationId> determineTezApplicationIds() {
private static Optional<ApplicationId> determineTezApplicationId() {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
TezSessionState tezSessionState = sessionState.getTezSession();
if (tezSessionState != null) {
TezClient tezClient = tezSessionState.getSession();
if (tezClient != null) {
ApplicationId applicationId = tezClient.getAppMasterApplicationId();
return applicationId != null ? Collections.singletonList(tezClient.getAppMasterApplicationId()) : new ArrayList<>();
return Optional.ofNullable(tezClient.getAppMasterApplicationId());
}
}
}

LOG.info("Failed to retrieve Application ID from Tez session");
return new ArrayList<>();
return Optional.empty();
}

/**
* Retrieves Application ID from the first MapReduce job as multiple MapReduce jobs created for a
* single query are submitted to the same queue.
*/
private static List<ApplicationId> determineMapReduceApplicationIds() {
private static Optional<ApplicationId> determineMapReduceApplicationId() {
return SessionState.get().getMapRedStats().values().stream()
.map(MapRedStats::getJobId)
.flatMap(
Expand All @@ -93,14 +89,15 @@ private static List<ApplicationId> determineMapReduceApplicationIds() {
jobId);
return Stream.empty();
}
}).collect(Collectors.toList());
})
.findFirst();
}

/**
* Retrieve Application ID for Llap daemon. They are long-living YARN applications, using the same
* queue, so it should be relatively static.
*/
public static List<ApplicationId> determineLlapApplicationIds(
public static Optional<ApplicationId> determineLlapApplicationId(
HiveConf conf, ExecutionMode mode) {
// Note: for now, LLAP is only supported in Tez tasks. Will never come to MR; others may
// be added here, although this is only necessary to have extra debug information.
Expand All @@ -110,7 +107,7 @@ public static List<ApplicationId> determineLlapApplicationIds(
String hosts = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (hosts != null && !hosts.isEmpty()) {
try {
return Collections.singletonList(LlapRegistryService.getClient(conf).getApplicationId());
return Optional.of(LlapRegistryService.getClient(conf).getApplicationId());
} catch (IOException e) {
LOG.error("Error trying to get llap instance. Hosts: {}", hosts, e);
}
Expand All @@ -119,6 +116,6 @@ public static List<ApplicationId> determineLlapApplicationIds(
}
}

return new ArrayList<>();
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE;

import com.google.cloud.bigquery.dwhassessment.hooks.logger.utils.TasksRetriever;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -53,7 +49,6 @@
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
Expand Down Expand Up @@ -147,38 +142,19 @@ private GenericRecord getPostHookEvent(HookContext hookContext, EventStatus stat
.set("PerfObject", dumpPerfData(hookContext.getPerfLogger()))
.set("OperationId", hookContext.getOperationId());

List<ImmutableMap<String, String> > listOfApplicationData = new ArrayList<>();
ApplicationIdRetriever.determineApplicationIds(conf, getExecutionMode(plan))
.forEach(applicationId -> {
ImmutableMap.Builder<String, String> applicationDataMap = ImmutableMap.builder();

applicationDataMap.put("YarnApplicationId", applicationId.toString());
yarnApplicationRetriever.retrieve(conf, applicationId)
ApplicationIdRetriever.determineApplicationId(conf, getExecutionMode(plan))
.ifPresent(
applicationId -> {
recordBuilder.set("YarnApplicationId", applicationId.toString());
yarnApplicationRetriever
.retrieve(conf, applicationId)
.ifPresent(
applicationReport -> {
recordBuilder
.set("HiveHostName", applicationReport.getHost())
.set("Queue", applicationReport.getQueue());
applicationDataMap.put("HiveHostName", applicationReport.getHost())
.put("Queue", applicationReport.getQueue())
.put("YarnProcess", String.valueOf(applicationReport.getProgress()))
.put("YarnApplicationType", applicationReport.getApplicationType())
.put("YarnApplicationState", applicationReport.getYarnApplicationState().toString())
.put("YarnDiagnostics", applicationReport.getDiagnostics())
.put("YarnCurrentApplicationAttemptId", applicationReport.getCurrentApplicationAttemptId().toString())
.put("YarnUser", applicationReport.getUser())
.put("YarnStartTime", String.valueOf(applicationReport.getStartTime()))
.put("YarnFinishTime", String.valueOf(applicationReport.getFinishTime()))
.put("YarnFinalApplicationStatus", applicationReport.getFinalApplicationStatus().toString());
retrieveApplicationResourceUsageReport(applicationReport.getApplicationResourceUsageReport(), applicationDataMap);
}
);
listOfApplicationData.add(applicationDataMap.build());
recordBuilder.set("HiveHostName", applicationReport.getHost());
recordBuilder.set("Queue", applicationReport.getQueue());
});
});

generateYarnApplicationJson(listOfApplicationData)
.ifPresent(applications -> recordBuilder.set("ApplicationData", applications));

dumpTezCounters(plan)
.map(Optional::of)
.orElseGet(EventRecordConstructor::dumpMapReduceCounters)
Expand All @@ -187,90 +163,6 @@ private GenericRecord getPostHookEvent(HookContext hookContext, EventStatus stat
return recordBuilder.build();
}

private void retrieveApplicationResourceUsageReport(
ApplicationResourceUsageReport applicationResourceUsageReport,
ImmutableMap.Builder<String, String> applicationDataMap
) {
ImmutableMap<String, String> reportMethods =
ImmutableMap.<String, String>builder()
.put("getNumUsedContainers", "YarnReportNumUsedContainers")
.put("getNumReservedContainers", "YarnReportNumReservedContainers")
.put("getMemorySeconds", "YarnReportMemorySeconds")
.put("getVcoreSeconds", "YarnReportVcoreSeconds")
.put("getQueueUsagePercentage", "YarnReportQueueUsagePercentage")
.put("getClusterUsagePercentage", "YarnReportClusterUsagePercentage")
.put("getPreemptedMemorySeconds", "YarnReportPreemptedMemorySeconds")
.put("getPreemptedVcoreSeconds", "YarnReportPreemptedVcoreSeconds")
.build();

reportMethods.forEach(
(methodName, fieldName) ->
callMethodWithReflection(
methodName, fieldName, applicationDataMap, applicationResourceUsageReport));

applicationDataMap.put(
"YarnReportUsedResources",
String.valueOf(applicationResourceUsageReport.getUsedResources().toString()));
callMethodWithReflection(
"getMemorySize",
"YarnReportUsedResourcesMemory",
applicationDataMap,
applicationResourceUsageReport.getUsedResources());
callMethodWithReflection(
"getVirtualCores",
"YarnReportUsedResourcesVcore",
applicationDataMap,
applicationResourceUsageReport.getUsedResources());

applicationDataMap.put(
"YarnReportReservedResources",
String.valueOf(applicationResourceUsageReport.getReservedResources().toString()));
callMethodWithReflection(
"getMemorySize",
"YarnReportReservedResourcesMemory",
applicationDataMap,
applicationResourceUsageReport.getReservedResources());
callMethodWithReflection(
"getVirtualCores",
"YarnReportReservedResourcesVcore",
applicationDataMap,
applicationResourceUsageReport.getReservedResources());

applicationDataMap.put(
"YarnReportNeededResources",
String.valueOf(applicationResourceUsageReport.getNeededResources().toString()));
callMethodWithReflection(
"getMemorySize",
"YarnReportNeededResourcesMemory",
applicationDataMap,
applicationResourceUsageReport.getNeededResources());
callMethodWithReflection(
"getVirtualCores",
"YarnReportNeededResourcesVcore",
applicationDataMap,
applicationResourceUsageReport.getNeededResources());

}

private <T> void callMethodWithReflection(
String methodName, String fieldNameInAvro, ImmutableMap.Builder<String, String> applicationDataMap, T object) {
try {
Method method = Class.forName(object.getClass().getName()).getMethod(methodName);
applicationDataMap.put(fieldNameInAvro, String.valueOf(method.invoke(object)));
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.warn("Failed to invoke method '{}'", methodName);
} catch (ClassNotFoundException e) {
LOG.warn(
"Failed to find class {} to invoke method '{}'", object.getClass().getName(), methodName);
}
}

private static Optional<String> generateYarnApplicationJson(List<ImmutableMap<String, String> > listOfApplicationData) {
JSONArray jsonArray = new JSONArray();
listOfApplicationData.stream().map(JSONObject::new).forEach(jsonArray::put);
return jsonArray.length() > 0 ? Optional.of(jsonArray.toString()) : Optional.empty();
}

/**
* Retrieves YARN queue name where query is supposed to land. It might be different in reality,
* depending on YARN configuration.
Expand Down
Loading

0 comments on commit 98d0db2

Please sign in to comment.