diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
index 73e358179b..fdb93f1194 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
@@ -19,16 +19,68 @@
package org.apache.tez.common.counters;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
+/**
+ * FileSystemCounter is an enum for defining which filesystem/storage statistics are exposed in Tez.
+ */
@Private
public enum FileSystemCounter {
- BYTES_READ,
- BYTES_WRITTEN,
- READ_OPS,
- LARGE_READ_OPS,
- WRITE_OPS,
- HDFS_BYTES_READ,
- HDFS_BYTES_WRITTEN,
- FILE_BYTES_READ,
- FILE_BYTES_WRITTEN
+ BYTES_READ("bytesRead"),
+ BYTES_WRITTEN("bytesWritten"),
+ READ_OPS("readOps"),
+ LARGE_READ_OPS("largeReadOps"),
+ WRITE_OPS("writeOps"),
+
+ // Additional counters from HADOOP-13305
+ OP_APPEND(CommonStatisticNames.OP_APPEND),
+ OP_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE),
+ OP_CREATE(CommonStatisticNames.OP_CREATE),
+ OP_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE),
+ OP_DELETE(CommonStatisticNames.OP_DELETE),
+ OP_EXISTS(CommonStatisticNames.OP_EXISTS),
+ OP_GET_CONTENT_SUMMARY(CommonStatisticNames.OP_GET_CONTENT_SUMMARY),
+ OP_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN),
+ OP_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),
+ OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS),
+ OP_GET_STATUS(CommonStatisticNames.OP_GET_STATUS),
+ OP_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS),
+ OP_IS_FILE(CommonStatisticNames.OP_IS_FILE),
+ OP_IS_DIRECTORY(CommonStatisticNames.OP_IS_DIRECTORY),
+ OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES),
+ OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
+ OP_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS),
+ OP_MKDIRS(CommonStatisticNames.OP_MKDIRS),
+ OP_MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES),
+ OP_OPEN(CommonStatisticNames.OP_OPEN),
+ OP_REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL),
+ OP_REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES),
+ OP_REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL),
+ OP_RENAME(CommonStatisticNames.OP_RENAME),
+ OP_SET_ACL(CommonStatisticNames.OP_SET_ACL),
+ OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
+ OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
+ OP_SET_TIMES(CommonStatisticNames.OP_SET_TIMES),
+ OP_TRUNCATE(CommonStatisticNames.OP_TRUNCATE),
+
+ // counters below are not needed in production, as the scheme_countername expansion is taken care of by the
+ // FileSystemCounterGroup, the only reason they are here is that some analyzers still depend on them
+ @Deprecated
+ HDFS_BYTES_READ("hdfsBytesRead"),
+ @Deprecated
+ HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
+ @Deprecated
+ FILE_BYTES_READ("fileBytesRead"),
+ @Deprecated
+ FILE_BYTES_WRITTEN("fileBytesWritten");
+
+ private final String opName;
+
+ FileSystemCounter(String opName) {
+ this.opName = opName;
+ }
+
+ public String getOpName() {
+ return opName;
+ }
}
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index 503bf2b103..e235e933cd 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -49,6 +49,23 @@
org.apache.hadoop
hadoop-common
+
+ org.apache.hadoop
+ hadoop-common
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+ test-jar
+
org.apache.hadoop
hadoop-yarn-api
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
index bb15ef159f..ad48d0d624 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,9 +17,7 @@
package org.apache.tez.runtime.metrics;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -30,50 +28,22 @@
*/
public class FileSystemStatisticUpdater {
- private List stats;
- private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter,
- writeOpsCounter;
- private String scheme;
- private TezCounters counters;
+ private final StorageStatistics stats;
+ private final TezCounters counters;
- FileSystemStatisticUpdater(TezCounters counters, List stats, String scheme) {
- this.stats = stats;
- this.scheme = scheme;
+ FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) {
+ this.stats = storageStatistics;
this.counters = counters;
}
void updateCounters() {
- if (readBytesCounter == null) {
- readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ);
- }
- if (writeBytesCounter == null) {
- writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN);
- }
- if (readOpsCounter == null) {
- readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS);
- }
- if (largeReadOpsCounter == null) {
- largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS);
- }
- if (writeOpsCounter == null) {
- writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS);
- }
- long readBytes = 0;
- long writeBytes = 0;
- long readOps = 0;
- long largeReadOps = 0;
- long writeOps = 0;
- for (FileSystem.Statistics stat : stats) {
- readBytes = readBytes + stat.getBytesRead();
- writeBytes = writeBytes + stat.getBytesWritten();
- readOps = readOps + stat.getReadOps();
- largeReadOps = largeReadOps + stat.getLargeReadOps();
- writeOps = writeOps + stat.getWriteOps();
+ // loop through FileSystemCounter enums as it is a smaller set
+ for (FileSystemCounter fsCounter : FileSystemCounter.values()) {
+ Long val = stats.getLong(fsCounter.getOpName());
+ if (val != null && val != 0) {
+ TezCounter counter = counters.findCounter(stats.getScheme(), fsCounter);
+ counter.setValue(val);
+ }
}
- readBytesCounter.setValue(readBytes);
- writeBytesCounter.setValue(writeBytes);
- readOpsCounter.setValue(readOps);
- largeReadOpsCounter.setValue(largeReadOps);
- writeOpsCounter.setValue(writeOps);
}
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
index 48676e225b..49f8fca25f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
@@ -18,17 +18,17 @@
package org.apache.tez.runtime.metrics;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
import java.util.Map;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
+import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.GcTimeUpdater;
import org.apache.tez.common.counters.TaskCounter;
@@ -49,10 +49,9 @@ public class TaskCounterUpdater {
private final Configuration conf;
/**
- * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+ * A Map where Key-> URIScheme and value->Map
*/
- private Map statisticUpdaters =
- new HashMap();
+ private final Map> statisticUpdaters = new HashMap<>();
protected final GcTimeUpdater gcUpdater;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
@@ -67,34 +66,18 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid)
recordInitialCpuStats();
}
-
+
public void updateCounters() {
- // FileSystemStatistics are reset each time a new task is seen by the
- // container.
- // This doesn't remove the fileSystem, and does not clear all statistics -
- // so there is a potential of an unused FileSystem showing up for a
- // Container, and strange values for READ_OPS etc.
- Map> map = new
- HashMap>();
- for(Statistics stat: FileSystem.getAllStatistics()) {
- String uriScheme = stat.getScheme();
- if (map.containsKey(uriScheme)) {
- List list = map.get(uriScheme);
- list.add(stat);
- } else {
- List list = new ArrayList();
- list.add(stat);
- map.put(uriScheme, list);
- }
- }
- for (Map.Entry> entry: map.entrySet()) {
- FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
- if(updater==null) {//new FileSystem has been found in the cache
- updater =
- new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
- entry.getKey());
- statisticUpdaters.put(entry.getKey(), updater);
- }
+ GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics();
+ Iterator iter = globalStorageStatistics.iterator();
+ while (iter.hasNext()) {
+ StorageStatistics stats = iter.next();
+ // Fetch or initialize the updater set for the scheme
+ Map updaterSet = statisticUpdaters
+ .computeIfAbsent(stats.getScheme(), k -> new HashMap<>());
+ // Fetch or create the updater for the specific statistic
+ FileSystemStatisticUpdater updater = updaterSet
+ .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats));
updater.updateCounters();
}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java
new file mode 100644
index 0000000000..b07f811ded
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static final Configuration CONF = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ CONF.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Before
+ public void setup() throws IOException {
+ FileSystem.clearStatistics();
+ try {
+ // tear down the whole cluster before each test to completely get rid of file system statistics
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ dfsCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @Test
+ public void basicTest() throws IOException {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, CONF, "pid");
+
+ DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc.txt"), "xyz");
+
+ updater.updateCounters();
+ LOG.info("Counters (after first update): {}", counters);
+ assertCounter(counters, FileSystemCounter.OP_MKDIRS, 0); // DFSTestUtil doesn't call separate mkdirs
+ assertCounter(counters, FileSystemCounter.OP_CREATE, 1);
+ assertCounter(counters, FileSystemCounter.BYTES_WRITTEN, 3); // "xyz"
+ assertCounter(counters, FileSystemCounter.WRITE_OPS, 1);
+ assertCounter(counters, FileSystemCounter.OP_GET_FILE_STATUS, 1); // DFSTestUtil calls fs.exists
+ assertCounter(counters, FileSystemCounter.OP_CREATE, 1);
+
+ DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc1.txt"), "xyz");
+
+ updater.updateCounters();
+ LOG.info("Counters (after second update): {}", counters);
+ assertCounter(counters, FileSystemCounter.OP_CREATE, 2);
+ assertCounter(counters, FileSystemCounter.BYTES_WRITTEN, 6); // "xyz" has been written twice
+ assertCounter(counters, FileSystemCounter.WRITE_OPS, 2);
+ assertCounter(counters, FileSystemCounter.OP_GET_FILE_STATUS, 2); // DFSTestUtil calls fs.exists again
+ assertCounter(counters, FileSystemCounter.OP_CREATE, 2);
+
+ // Ensure all numbers are reset
+ updater.updateCounters();
+ LOG.info("Counters (after third update): {}", counters);
+ // counter holds its value after clearStatistics + updateCounters
+ assertCounter(counters, FileSystemCounter.OP_CREATE, 2);
+ }
+
+ private void assertCounter(TezCounters counters, FileSystemCounter fsCounter, int value) {
+ TezCounter counter = counters.findCounter(remoteFs.getScheme(), fsCounter);
+ Assert.assertNotNull(counter);
+ Assert.assertEquals(value, counter.getValue());
+ }
+}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java
new file mode 100644
index 0000000000..aa782396cb
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTaskCounterUpdater.class);
+ private static final Configuration CONF = new Configuration();
+
+ @Test
+ public void basicTest() {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, CONF, "pid");
+
+ updater.updateCounters();
+ LOG.info("Counters (after first update): {}", counters);
+ assertCounter(counters, TaskCounter.GC_TIME_MILLIS);
+ TezCounter cpuCounter = assertCounter(counters, TaskCounter.CPU_MILLISECONDS);
+
+ long oldVal = cpuCounter.getValue();
+ Assert.assertTrue(cpuCounter.getValue() > 0);
+
+ updater.updateCounters();
+ LOG.info("Counters (after second update): {}", counters);
+ Assert.assertTrue("Counter not updated, old=" + oldVal
+ + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal);
+ }
+
+ private TezCounter assertCounter(TezCounters counters, TaskCounter taskCounter) {
+ TezCounter counter = counters.findCounter(taskCounter);
+ Assert.assertNotNull(counter);
+ return counter;
+ }
+}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index aeaec53124..9c26a321ca 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.tez.common.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -181,7 +180,9 @@ public void testMultipleSuccessfulTasks() throws IOException, InterruptedExcepti
assertFalse(TestProcessor.wasAborted());
umbilical.resetTrackedEvents();
TezCounters tezCounters = runtimeTask.getCounters();
- verifySysCounters(tezCounters, 5, 5);
+ // with TEZ-3331, fs counters are not set if the value is 0 (see FileSystemStatisticUpdater2), so there can be
+ // a mismatch in task counter count and fs counter count
+ verifySysCounters(tezCounters, 5, 0);
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY, false);
@@ -693,10 +694,6 @@ public void testClusterStorageCapacityFatalError() throws IOException {
private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) {
- Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) ||
- (minTaskCounterCount <= 0 && minFsCounterCount <= 0),
- "Both targetCounter counts should be postitive or negative. A mix is not expected");
-
int numTaskCounters = 0;
int numFsCounters = 0;
for (CounterGroup counterGroup : tezCounters) {