TEZ-2645. Provide standard analyzers for job analysis (rbalamohan)
authorRajesh Balamohan <rbalamohan@apache.org>
Wed, 29 Jul 2015 07:57:51 +0000 (13:27 +0530)
committerRajesh Balamohan <rbalamohan@apache.org>
Wed, 29 Jul 2015 07:57:51 +0000 (13:27 +0530)
19 files changed:
CHANGES.txt
pom.xml
tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
tez-tools/analyzers/job-analyzer/findbugs-exclude.xml [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/pom.xml [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java [new file with mode: 0644]
tez-tools/analyzers/pom.xml [new file with mode: 0644]
tez-tools/pom.xml

index f7f2e76..687f996 100644 (file)
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2645. Provide standard analyzers for job analysis.
   TEZ-2627. Support for Tez Job Priorities.
   TEZ-2623. Fix module dependencies related to hadoop-auth.
   TEZ-2464. Move older releases to dist archive.
diff --git a/pom.xml b/pom.xml
index ddb7324..a0b82e4 100644 (file)
--- a/pom.xml
+++ b/pom.xml
         <version>${pig.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>3.1.0</version>
+      </dependency>
+      <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
         <version>0.4.9</version>
index 57d1053..73e3581 100644 (file)
@@ -27,4 +27,8 @@ public enum FileSystemCounter {
   READ_OPS,
   LARGE_READ_OPS,
   WRITE_OPS,
+  HDFS_BYTES_READ,
+  HDFS_BYTES_WRITTEN,
+  FILE_BYTES_READ,
+  FILE_BYTES_WRITTEN
 }
diff --git a/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
new file mode 100644 (file)
index 0000000..5bebb05
--- /dev/null
@@ -0,0 +1,28 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+
+  <Match>
+    <Class name="org.apache.tez.analyzer.CSVResult"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.analyzer.CSVResult"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+
+</FindBugsFilter>
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml
new file mode 100644 (file)
index 0000000..6312a34
--- /dev/null
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-perf-analyzer</artifactId>
+    <version>0.8.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>job-analyzer</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-history-parser</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>  
+</project>
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
new file mode 100644 (file)
index 0000000..6021c58
--- /dev/null
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+
+public interface Analyzer {
+
+  /**
+   * Analyze Dag
+   *
+   * @param dagInfo
+   * @throws TezException
+   */
+  public void analyze(DagInfo dagInfo) throws TezException;
+
+  /**
+   * Get the result of analysis
+   *
+   * @return analysis result
+   * @throws TezException
+   */
+  public Result getResult() throws TezException;
+
+  /**
+   * Get name of the analyzer
+   *
+   * @return name of analyze
+   */
+  public String getName();
+
+  /**
+   * Get description of the analyzer
+   *
+   * @return description of analyzer
+   */
+  public String getDescription();
+
+  /**
+   * Get config properties related to this analyzer
+   *
+   * @return config related to analyzer
+   */
+  public Configuration getConfiguration();
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
new file mode 100644 (file)
index 0000000..5e454b4
--- /dev/null
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.directory.api.util.Strings;
+import org.apache.tez.dag.api.TezException;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Simple placeholder for storing CSV results.
+ * Contains headers and records in string format.
+ */
+public class CSVResult implements Result {
+
+  private final String[] header;
+  private final List<String[]> recordsList;
+  private String comments;
+
+  public CSVResult(String[] header) {
+    this.header = header;
+    recordsList = Lists.newLinkedList();
+  }
+
+  public String[] getHeaders() {
+    return header;
+  }
+
+  public void addRecord(String[] record) {
+    Preconditions.checkArgument(record != null, "Record can't be null");
+    Preconditions.checkArgument(record.length == header.length, "Record length" + record.length +
+        " does not match header length " + header.length);
+    recordsList.add(record);
+  }
+
+  public Iterator<String[]> getRecordsIterator() {
+    return Iterators.unmodifiableIterator(recordsList.iterator());
+  }
+
+
+  public void setComments(String comments) {
+    this.comments = comments;
+  }
+
+  @Override public String toJson() throws TezException {
+    return "";
+  }
+
+  @Override public String getComments() {
+    return comments;
+  }
+
+  @Override public String toString() {
+    return "CSVResult{" +
+        "header=" + Arrays.toString(header) +
+        ", recordsList=" + recordsList +
+        '}';
+  }
+
+  //For testing
+  public void dumpToFile(String fileName) throws IOException {
+    OutputStreamWriter writer = new OutputStreamWriter(
+        new FileOutputStream(new File(fileName)),
+        Charset.forName("UTF-8").newEncoder());
+    BufferedWriter bw = new BufferedWriter(writer);
+    for (String[] record : recordsList) {
+
+      if (record.length != header.length) {
+        continue; //LOG error msg?
+      }
+
+      StringBuilder sb = new StringBuilder();
+      for(int i=0;i<record.length;i++) {
+        sb.append(Strings.isNotEmpty(record[i]) ? record[i] : " ");
+        if (i < record.length - 1) {
+          sb.append(",");
+        }
+      }
+      bw.write(sb.toString());
+      bw.newLine();
+    }
+    bw.flush();
+    bw.close();
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
new file mode 100644 (file)
index 0000000..d1881eb
--- /dev/null
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import org.apache.tez.dag.api.TezException;
+
+public interface Result {
+
+  /**
+   * Convert result to json format
+   *
+   * @return json
+   * @throws TezException
+   */
+  public String toJson() throws TezException;
+
+  /**
+   * Recommendation / comments about the analysis if any.
+   *
+   * @return comments
+   */
+  public String getComments();
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
new file mode 100644 (file)
index 0000000..905f966
--- /dev/null
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.Container;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+
+
+/**
+ * Get container reuse information at a per vertex level basis.
+ */
+public class ContainerReuseAnalyzer implements Analyzer {
+
+  private final Configuration config;
+
+  private static final String[] headers =
+      { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };
+
+  private final CSVResult csvResult;
+
+  public ContainerReuseAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      Multimap<Container, TaskAttemptInfo> containers = vertexInfo.getContainersMapping();
+      for (Container container : containers.keySet()) {
+        List<String> record = Lists.newLinkedList();
+        record.add(vertexInfo.getVertexName());
+        record.add(vertexInfo.getTaskAttempts().size() + "");
+        record.add(container.getHost());
+        record.add(container.getId());
+        record.add(Integer.toString(containers.get(container).size()));
+        csvResult.addRecord(record.toArray(new String[record.size()]));
+      }
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Container Reuse Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Get details on container reuse analysis";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
new file mode 100644 (file)
index 0000000..6748f3f
--- /dev/null
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify a set of vertices which fall in the critical path in a DAG.
+ */
+public class CriticalPathAnalyzer implements Analyzer {
+  private final Configuration config;
+
+  private static final String[] headers = { "CriticalPath", "Score" };
+
+  private final CSVResult csvResult;
+
+  public CriticalPathAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Long> result = Maps.newLinkedHashMap();
+    getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+
+    System.out.println();
+    System.out.println();
+
+    for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) {
+      List<String> record = Lists.newLinkedList();
+      record.add(entry.getKey());
+      record.add(entry.getValue() + "");
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+      System.out.println(entry.getKey() + ", " + entry.getValue());
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "CriticalPathAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze critical path of the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  private static Map<String, Long> sortByValues(Map<String, Long> result) {
+    //Sort result by time in reverse order
+    final Ordering<String> reversValueOrdering =
+        Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
+    Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
+    return orderedMap;
+  }
+
+  private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
+      Map<String, Long> result) {
+    String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
+
+    if (dest != null) {
+      time += dest.getTimeTaken();
+      predecessor += destVertexName + "-->";
+
+      for (VertexInfo incomingVertex : dest.getInputVertices()) {
+        getCriticalPath(predecessor, incomingVertex, time, result);
+      }
+
+      result.put(predecessor, time);
+    }
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
new file mode 100644 (file)
index 0000000..67b4c51
--- /dev/null
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Get locality information for tasks for vertices and get their task execution times.
+ * This would be helpeful to co-relate if the vertex runtime is anyways related to the data
+ * locality.
+ */
+public class LocalityAnalyzer implements Analyzer {
+
+  private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio",
+      "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime",
+      "avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal",
+      "avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", "recommendation" };
+
+  private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio";
+  private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
+
+  private final Configuration config;
+
+  private final CSVResult csvResult;
+
+  public LocalityAnalyzer(Configuration config) {
+    this.config = config;
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+
+      Map<String, TezCounter> dataLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
+          DAGCounter.DATA_LOCAL_TASKS.toString());
+      Map<String, TezCounter> rackLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
+          DAGCounter.RACK_LOCAL_TASKS.toString());
+
+      long dataLocalTasks = 0;
+      long rackLocalTasks = 0;
+
+      if (!dataLocalTask.isEmpty()) {
+        dataLocalTasks = dataLocalTask.get(DAGCounter.class.getName()).getValue();
+      }
+
+      if (!rackLocalTask.isEmpty()) {
+        rackLocalTasks = rackLocalTask.get(DAGCounter.class.getName()).getValue();
+      }
+
+      long totalVertexTasks = vertexInfo.getNumTasks();
+
+      if (dataLocalTasks > 0 || rackLocalTasks > 0) {
+        //compute locality details.
+        float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks;
+        float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks;
+        float othersRatio = (totalVertexTasks - (dataLocalTasks + rackLocalTasks)) * 1.0f /
+            totalVertexTasks;
+
+        List<String> record = Lists.newLinkedList();
+        record.add(vertexName);
+        record.add(totalVertexTasks + "");
+        record.add(dataLocalRatio + "");
+        record.add(rackLocalRatio + "");
+        record.add(othersRatio + "");
+
+        TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo,
+            DAGCounter.DATA_LOCAL_TASKS);
+        TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo,
+            DAGCounter.RACK_LOCAL_TASKS);
+        TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo,
+            DAGCounter.OTHER_LOCAL_TASKS);
+
+        record.add(dataLocalResult.avgRuntime + "");
+        record.add(rackLocalResult.avgRuntime + "");
+        record.add(otherTaskResult.avgRuntime + "");
+
+        //Get the number of inputs to this vertex
+        record.add(vertexInfo.getInputEdges().size() + "");
+
+        //Get the avg HDFS bytes read in this vertex for different type of locality
+        record.add(dataLocalResult.avgHDFSBytesRead + "");
+        record.add(rackLocalResult.avgHDFSBytesRead + "");
+        record.add(otherTaskResult.avgHDFSBytesRead + "");
+
+        String recommendation = "";
+        if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
+          recommendation = "Data locality is poor for this vertex. Try tuning "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED;
+        }
+
+        record.add(recommendation);
+        csvResult.addRecord(record.toArray(new String[record.size()]));
+      }
+    }
+  }
+
+  /**
+   * Compute counter averages for specific vertex
+   *
+   * @param vertexInfo
+   * @param counter
+   * @return task attempt details
+   */
+  private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter counter) {
+    long totalTime = 0;
+    long totalTasks = 0;
+    long totalHDFSBytesRead = 0;
+
+    TaskAttemptDetails result = new TaskAttemptDetails();
+
+    for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+      Map<String, TezCounter> localityCounter = attemptInfo.getCounter(DAGCounter.class.getName(),
+          counter.toString());
+
+      if (!localityCounter.isEmpty() &&
+          localityCounter.get(DAGCounter.class.getName()).getValue() > 0) {
+        totalTime += attemptInfo.getTimeTaken();
+        totalTasks++;
+
+        //get HDFSBytes read counter
+        Map<String, TezCounter> hdfsBytesReadCounter = attemptInfo.getCounter(FileSystemCounter
+            .class.getName(), FileSystemCounter.HDFS_BYTES_READ.name());
+        for(Map.Entry<String, TezCounter> entry : hdfsBytesReadCounter.entrySet()) {
+          totalHDFSBytesRead += entry.getValue().getValue();
+        }
+      }
+    }
+    if (totalTasks > 0) {
+      result.avgRuntime = (totalTime * 1.0f / totalTasks);
+      result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks);
+    }
+    return result;
+  }
+
+  @Override public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override public String getName() {
+    return "Locality Analyzer";
+  }
+
+  @Override public String getDescription() {
+    return "Analyze for locality information (data local, rack local, off-rack)";
+  }
+
+  @Override public Configuration getConfiguration() {
+    return config;
+  }
+
+  /**
+   * Placeholder for task attempt details
+   */
+  static class TaskAttemptDetails {
+    float avgHDFSBytesRead;
+    float avgRuntime;
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
new file mode 100644 (file)
index 0000000..8df40ba
--- /dev/null
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Analyze the time taken by merge phase, shuffle phase, time taken to do realistic work etc in
+ * tasks.
+ *
+ * Just dump REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, its ratio and SHUFFLE_BYTES for tasks
+ * grouped by vertices. Provide time taken as well.  Just render it as a table for now.
+ *
+ */
+public class ShuffleTimeAnalyzer implements Analyzer {
+
+  private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio";
+  private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f;
+
+  private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
+  private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
+
+  private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
+      "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
+      "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+      "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
+      "SHUFFLE_BYTES_DISK_DIRECT" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  private final float shuffleTimeRatio;
+  private final long minShuffleRecords;
+
+
+  public ShuffleTimeAnalyzer(Configuration config) {
+    this.config = config;
+
+    shuffleTimeRatio = config.getFloat
+        (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT);
+    minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //counter_group (basically source) --> counter
+        Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+            .REDUCE_INPUT_GROUPS.toString());
+        Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+            .REDUCE_INPUT_RECORDS.toString());
+
+        if (reduceInputGroups == null) {
+          continue;
+        }
+
+        for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+          String counterGroupName = entry.getKey();
+          long reduceInputGroupsVal = entry.getValue().getValue();
+          long reduceInputRecordsVal = (reduceInputRecords.get(counterGroupName) != null) ?
+          reduceInputRecords.get(counterGroupName).getValue() : 0;
+
+          if (reduceInputRecordsVal <= 0) {
+            continue;
+          }
+          float ratio = (reduceInputGroupsVal * 1.0f / reduceInputRecordsVal);
+
+          if (ratio > 0 && reduceInputRecordsVal > minShuffleRecords) {
+            List<String> result = Lists.newLinkedList();
+            result.add(vertexInfo.getVertexName());
+            result.add(attemptInfo.getTaskAttemptId());
+            result.add(attemptInfo.getNodeId());
+            result.add(counterGroupName);
+
+            //Real work done in the task
+            long timeTakenForRealWork = attemptInfo.getTimeTaken() -
+                Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName,
+                    attemptInfo));
+
+            String comments = "";
+            if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio) {
+              comments = "Time taken in shuffle is more than the actual work being done in task. "
+                  + " Check if source/destination machine is a slow node. Check if merge phase "
+                  + "time is more to understand disk bottlenecks in this node.  Check for skew";
+            }
+            result.add(comments);
+
+            result.add(reduceInputGroupsVal + "");
+            result.add(reduceInputRecordsVal + "");
+            result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
+
+            //Total time taken for receiving all events from source tasks
+            result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo));
+
+
+            result.add(Long.toString(timeTakenForRealWork));
+
+            result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, counterGroupName, attemptInfo));
+
+            csvResult.addRecord(result.toArray(new String[result.size()]));
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Time taken to receive all events from source tasks
+   *
+   * @param counterGroupName
+   * @param attemptInfo
+   * @return String
+   */
+  private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) {
+    long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo));
+    long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo));
+    return Long.toString(lastEventReceived - firstEventReceived);
+  }
+
+  private String getCounterValue(TaskCounter counter, String counterGroupName,
+      TaskAttemptInfo attemptInfo) {
+    Map<String, TezCounter> tezCounterMap = attemptInfo.getCounter(counter.toString());
+    if (tezCounterMap != null) {
+      for (Map.Entry<String, TezCounter> entry : tezCounterMap.entrySet()) {
+        String groupName = entry.getKey();
+        long val = entry.getValue().getValue();
+        if (groupName.equals(counterGroupName)) {
+          return Long.toString(val);
+        }
+      }
+    }
+    return "";
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Shuffle time analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze the time taken for shuffle, merge "
+        + "and the real work done in the task";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
new file mode 100644 (file)
index 0000000..8152344
--- /dev/null
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p/>
+ * Identify the skew (RECORD_INPUT_GROUPS / REDUCE_INPUT_RECORDS) ratio for all task attempts
+ * and report if they are below a certain threshold.
+ * <p/>
+ * <p/>
+ * - Case 1: Ratio of (reduce_input_groups / reduce_input_records) < 0.2  && SHUFFLE_BYTES > 1 GB
+ * per task attempt from a source. This means couple of keys having too many records. Either
+ * partitioning is wrong, or we need to increase memory limit for this vertex.
+ * <p/>
+ * - Case 2: Ratio of (reduce_input_groups / reduce_input_records) > 0.6 & Number of reduce input
+ * records in task attempt is closer to say 60% of overall number of records
+ * in vertex level & numTasks in vertex is greater than 1.  This might have any number of reducer
+ * groups.  This means that, partitioning is wrong (can also consider reducing number of tasks
+ * for that vertex). In some cases, too many reducers are launched and this can help find those.
+ * <p/>
+ * - Case 3: Ratio of (reduce_input_groups / reduce_input_records) is between 0.2 & 0.6 per task
+ * attempt & numTasks is greater than 1 & SHUFFLE_BYTES > 1 GB per task attempt from a
+ * source. This means, may be consider increasing parallelism based on the task attempt runtime.
+ * <p/>
+ */
+public class SkewAnalyzer implements Analyzer {
+
+  private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
+      + ".bytes.per.source";
+  private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;
+
+  //Min reducer input group : reducer keys ratio for computation
+  private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO = "tez.skew-analyzer.shuffle.key"
+      + ".group.min.ratio";
+  private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT = 0.2f;
+
+  //Max reducer input group : reducer keys ratio for computation
+  private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO = "tez.skew-analyzer.shuffle.key"
+      + ".group.max.ratio";
+  private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT = 0.4f;
+
+
+
+  private static final String[] headers = { "vertexName", "taskAttemptId", "counterGroup", "node",
+      "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", "timeTaken",
+      "observation" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  private final float minRatio;
+  private final float maxRatio;
+  private final long maxShuffleBytesPerSource;
+
+  public SkewAnalyzer(Configuration config) {
+    this.config = config;
+    maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO,
+        ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT);
+    minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO,
+        ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT);
+    maxShuffleBytesPerSource = config.getLong(SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE,
+        SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Preconditions.checkArgument(dagInfo != null, "DAG can't be null");
+    analyzeReducers(dagInfo);
+  }
+
+  private void analyzeReducers(DagInfo dagInfo) {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        analyzeGroupSkewPerSource(attemptInfo);
+        analyzeRecordSkewPerSource(attemptInfo);
+        analyzeForParallelism(attemptInfo);
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where couple keys are having too many records per source
+   *
+   * @param attemptInfo
+   */
+  private void analyzeGroupSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get
+          (counterGroup).getValue() : 0;
+
+      float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+      //Case 1: Couple of keys having too many records per source.
+      if (shuffleBytesPerSource > maxShuffleBytesPerSource) {
+        if (ratio < minRatio) {
+          List<String> result = Lists.newLinkedList();
+          result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+          result.add(attemptInfo.getTaskAttemptId());
+          result.add(counterGroup);
+          result.add(attemptInfo.getNodeId());
+          result.add(inputGroupsCount + "");
+          result.add(inputRecordsCount + "");
+          result.add(ratio + "");
+          result.add(shuffleBytesPerSource + "");
+          result.add(attemptInfo.getTimeTaken() + "");
+          result.add("Please check partitioning. Otherwise consider increasing memLimit");
+
+          csvResult.addRecord(result.toArray(new String[result.size()]));
+        }
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where one task is getting > 60% of the vertex level records
+   *
+   * @param attemptInfo
+   */
+  private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+    Map<String, TezCounter> vertexLevelReduceInputRecords =
+        attemptInfo.getTaskInfo().getVertexInfo()
+            .getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString());
+
+    int vertexNumTasks = attemptInfo.getTaskInfo().getVertexInfo().getNumTasks();
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ?shuffleBytes.get
+          (counterGroup).getValue() : 0;
+      long vertexLevelInputRecordsCount = (vertexLevelReduceInputRecords.get(counterGroup) !=
+          null) ?
+          vertexLevelReduceInputRecords.get(counterGroup).getValue() : 0;
+
+      float ratio = (inputRecordsCount * 1.0f / vertexLevelInputRecordsCount);
+
+      if (vertexNumTasks > 1) {
+        if (ratio > maxRatio) {
+          //input records > 60% of vertex level record count
+          if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) {
+            List<String> result = Lists.newLinkedList();
+            result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+            result.add(attemptInfo.getTaskAttemptId());
+            result.add(counterGroup);
+            result.add(attemptInfo.getNodeId());
+            result.add(inputGroupsCount + "");
+            result.add(inputRecordsCount + "");
+            result.add(ratio + "");
+            result.add(shuffleBytesPerSource + "");
+            result.add(attemptInfo.getTimeTaken() + "");
+            result.add("Some task attempts are getting > 60% of reduce input records. "
+                + "Consider adjusting parallelism & check partition logic");
+
+            csvResult.addRecord(result.toArray(new String[result.size()]));
+
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where a vertex would need to increase parallelism
+   *
+   * @param attemptInfo
+   */
+  private void analyzeForParallelism(TaskAttemptInfo attemptInfo) {
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get
+          (counterGroup).getValue() : 0;
+
+      float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+      //Case 3: Shuffle_Bytes > 1 GB.  Ratio between 0.2 & < 0.6. Consider increasing
+      // parallelism based on task runtime.
+      if (shuffleBytesPerSource > SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT) {
+        if (ratio > minRatio && ratio < maxRatio) {
+          //couple of keys have too many records. Classic case of partition issue.
+          List<String> result = Lists.newLinkedList();
+          result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+          result.add(attemptInfo.getTaskAttemptId());
+          result.add(counterGroup);
+          result.add(attemptInfo.getNodeId());
+          result.add(inputGroupsCount + "");
+          result.add(inputRecordsCount + "");
+          result.add(ratio + "");
+          result.add(shuffleBytesPerSource + "");
+          result.add(attemptInfo.getTimeTaken() + "");
+          result.add("Consider increasing parallelism.");
+
+          csvResult.addRecord(result.toArray(new String[result.size()]));
+        }
+      }
+    }
+
+
+  }
+
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Skew Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyzer reducer skews by mining reducer task counters";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return null;
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
new file mode 100644 (file)
index 0000000..407cf47
--- /dev/null
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This will provide the set of nodes participated in the DAG in descending order of task execution
+ * time.
+ * <p/>
+ * Combine it with other counters to understand slow nodes better.
+ */
+public class SlowNodeAnalyzer implements Analyzer {
+
+  private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class);
+
+  private static final String[] headers = { "nodeName", "noOfTasksExecuted", "noOfKilledTasks",
+      "noOfFailedTasks", "avgSucceededTaskExecutionTime", "avgKilledTaskExecutionTime",
+      "avgFailedTaskExecutionTime", "avgHDFSBytesRead", "avgHDFSBytesWritten",
+      "avgFileBytesRead", "avgFileBytesWritten", "avgGCTimeMillis", "avgCPUTimeMillis" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  public SlowNodeAnalyzer(Configuration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Multimap<String, TaskAttemptInfo> nodeDetails = dagInfo.getNodeDetails();
+    for (String nodeName : nodeDetails.keySet()) {
+      List<String> record = Lists.newLinkedList();
+
+      Collection<TaskAttemptInfo> taskAttemptInfos = nodeDetails.get(nodeName);
+
+      record.add(nodeName);
+      record.add(taskAttemptInfos.size() + "");
+      record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.KILLED) + "");
+      record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.FAILED) + "");
+
+      Iterable<TaskAttemptInfo> succeedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.SUCCEEDED);
+      record.add(getAvgTaskExecutionTime(succeedTasks) + "");
+
+      Iterable<TaskAttemptInfo> killedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.KILLED);
+      record.add(getAvgTaskExecutionTime(killedTasks) + "");
+
+      Iterable<TaskAttemptInfo> failedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.FAILED);
+      record.add(getAvgTaskExecutionTime(failedTasks) + "");
+
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.HDFS_BYTES_READ.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.HDFS_BYTES_WRITTEN.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.FILE_BYTES_READ.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.FILE_BYTES_WRITTEN.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+          .getName(), TaskCounter.GC_TIME_MILLIS.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+              .getName(), TaskCounter.CPU_MILLISECONDS.name()) + "");
+
+          csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+  }
+
+  private Iterable<TaskAttemptInfo> getFilteredTaskAttempts(Collection<TaskAttemptInfo>
+      taskAttemptInfos, final TaskAttemptState status) {
+    return Iterables.filter(taskAttemptInfos, new
+        Predicate<TaskAttemptInfo>() {
+          @Override public boolean apply(TaskAttemptInfo input) {
+            return input.getStatus().equalsIgnoreCase(status.toString());
+          }
+        });
+  }
+
+  private float getAvgTaskExecutionTime(Iterable<TaskAttemptInfo> taskAttemptInfos) {
+    long totalTime = 0;
+    int size = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      totalTime += attemptInfo.getTimeTaken();
+      size++;
+    }
+    return (size > 0) ? (totalTime * 1.0f / size) : 0;
+  }
+
+  private int getNumberOfTasks(Collection<TaskAttemptInfo> taskAttemptInfos, TaskAttemptState
+      status) {
+    int tasks = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      if (attemptInfo.getStatus().equalsIgnoreCase(status.toString())) {
+        tasks++;
+      }
+    }
+    return tasks;
+  }
+
+  private float getAvgCounter(Collection<TaskAttemptInfo> taskAttemptInfos, String
+      counterGroupName, String counterName) {
+    long total = 0;
+    int taskCount = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      TezCounters tezCounters = attemptInfo.getTezCounters();
+      TezCounter counter = tezCounters.findCounter(counterGroupName, counterName);
+      if (counter != null) {
+        total += counter.getValue();
+        taskCount++;
+      } else {
+        LOG.info("Could not find counterGroupName=" + counterGroupName + ", counter=" +
+            counterName + " in " + attemptInfo);
+      }
+    }
+    return (taskCount > 0) ? (total * 1.0f / taskCount) : 0;
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Slow Node Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Analyze node details for the DAG.").append("\n");
+    sb.append("This could be used to find out the set of nodes where the tasks are taking more "
+        + "time on average.").append("\n");
+    sb.append("This could be used to find out the set of nodes where the tasks are taking more "
+        + "time on average and to understand whether too many tasks got scheduled on a node.")
+        .append("\n");
+    sb.append("One needs to combine the task execution time with other metrics like bytes "
+        + "read/written etc to get better idea of bad nodes. In order to understand the slow "
+        + "nodes due to network, it might be worthwhile to consider the shuffle performance "
+        + "analyzer tool in tez-tools").append("\n");
+    return sb.toString();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
new file mode 100644 (file)
index 0000000..7c7f5c0
--- /dev/null
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * Analyze slow tasks in the DAG. Top 100 tasks are listed by default.
+ *
+ * <p/>
+ * //TODO: We do not get counters for killed task attempts yet.
+ */
+public class SlowTaskIdentifier implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttemptId",
+      "Node", "taskDuration", "Status",
+      "NoOfInputs" };
+
+  private final CSVResult csvResult;
+
+  private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count";
+  private static final int NO_OF_TASKS_DEFAULT = 100;
+
+  private final Configuration config;
+
+  public SlowTaskIdentifier(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    List<TaskAttemptInfo> taskAttempts = Lists.newArrayList();
+    for(VertexInfo vertexInfo : dagInfo.getVertices()) {
+      taskAttempts.addAll(vertexInfo.getTaskAttempts());
+    }
+
+    //sort them by runtime in descending order
+    Collections.sort(taskAttempts, new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getTimeTaken() > o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+
+    int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT);
+    for(int i=0;i<limit;i++) {
+      List<String> record = Lists.newLinkedList();
+      record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
+      record.add(taskAttempts.get(i).getTaskAttemptId());
+      record.add(taskAttempts.get(i).getContainer().getHost());
+      record.add(taskAttempts.get(i).getTimeTaken() + "");
+      record.add(taskAttempts.get(i).getStatus());
+      record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + "");
+
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Slow Task Identifier";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Identifies slow tasks in the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
new file mode 100644 (file)
index 0000000..7364506
--- /dev/null
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify the slowest vertex in the DAG.
+ */
+public class SlowestVertexAnalyzer implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
+      "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
+      "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+      "observation", "comments" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+  private final MetricRegistry metrics = new MetricRegistry();
+  private Histogram taskAttemptRuntimeHistorgram;
+
+  public SlowestVertexAnalyzer(Configuration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+      long totalTime = vertexInfo.getTimeTaken();
+
+      long max = Long.MIN_VALUE;
+      String maxSourceName = "";
+      taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
+
+
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+
+        taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken());
+
+        //Get the last event received from the incoming vertices
+        Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+            TaskCounter.LAST_EVENT_RECEIVED.toString());
+
+        for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) {
+          if (entry.getKey().equals(TaskCounter.class.getName())) {
+            //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+            // getting TaskCounter details as well.
+            continue;
+          }
+          //Find the slowest last event received
+          if (entry.getValue().getValue() > max) {
+            //w.r.t vertex start time.
+            max =(attemptInfo.getStartTime() +  entry.getValue().getValue()) -
+                (vertexInfo.getStartTime());
+            maxSourceName = entry.getKey();
+          }
+        }
+      }
+
+      long shuffleMax = Long.MIN_VALUE;
+      String shuffleMaxSource = "";
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //Get the last event received from the incoming vertices
+        Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+            TaskCounter.SHUFFLE_PHASE_TIME.toString());
+
+        for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) {
+          if (entry.getKey().equals(TaskCounter.class.getName())) {
+            //ignore. TODO: hack for taskcounter issue
+            continue;
+          }
+          //Find the slowest last event received
+          if (entry.getValue().getValue() > shuffleMax) {
+            //w.r.t vertex start time.
+            shuffleMax =(attemptInfo.getStartTime() +  entry.getValue().getValue()) -
+                (vertexInfo.getStartTime());
+            shuffleMaxSource = entry.getKey();
+          }
+        }
+      }
+
+      String comments = "";
+
+      List<String> record = Lists.newLinkedList();
+      record.add(vertexName);
+      record.add(vertexInfo.getTaskAttempts().size() + "");
+      record.add(totalTime + "");
+      record.add(Math.max(0, shuffleMax) + "");
+      record.add(shuffleMaxSource);
+      record.add(Math.max(0, max) + "");
+      record.add(maxSourceName);
+      record.add(Math.max(0,(totalTime - max)) + "");
+
+      StringBuilder sb = new StringBuilder();
+      double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
+      double percentile95 = taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile();
+      double percentile98 = taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile();
+      double percentile99 = taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile();
+      double medianAttemptRuntime = taskAttemptRuntimeHistorgram.getSnapshot().getMedian();
+
+      record.add("75th=" + percentile75);
+      record.add("95th=" + percentile95);
+      record.add("98th=" + percentile98);
+      record.add("median=" + medianAttemptRuntime);
+
+      if (percentile75 / percentile99 < 0.5) {
+        //looks like some straggler task is there.
+        sb.append("Looks like some straggler task is there");
+      }
+
+      record.add(sb.toString());
+
+      if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
+        if ((shuffleMax * 1.0f / totalTime) > 0.5) {
+          if ((max * 1.0f / totalTime) > 0.5) {
+            comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last"
+                + " event received";
+          } else {
+            comments =
+                "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
+          }
+        } else {
+          if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later.
+            comments = "Concentrate on this vertex (totalTime > 10 seconds)";
+          }
+        }
+      }
+
+      record.add(comments);
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+  }
+
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "SlowVertexAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Identify the slowest vertex in the DAG, which needs to be looked into first";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
new file mode 100644 (file)
index 0000000..c650104
--- /dev/null
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Find out tasks which have more than 1 spill (ADDITIONAL_SPILL_COUNT).
+ * <p/>
+ * Accompany this with OUTPUT_BYTES (> 1 GB data written)
+ */
+public class SpillAnalyzerImpl implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttemptId",
+      "Node", "counterGroupName",
+      "spillCount", "taskDuration",
+      "OUTPUT_BYTES", "OUTPUT_RECORDS",
+      "SPILLED_RECORDS", "Recommendation" };
+
+  private final CSVResult csvResult;
+
+  private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l;
+
+  private final Configuration config;
+
+  public SpillAnalyzerImpl(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //Get ADDITIONAL_SPILL_COUNT, OUTPUT_BYTES for every source
+        Map<String, TezCounter> spillCountMap =
+            attemptInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name());
+        Map<String, TezCounter> spilledRecordsMap =
+            attemptInfo.getCounter(TaskCounter.SPILLED_RECORDS.name());
+        Map<String, TezCounter> outputRecordsMap =
+            attemptInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name());
+
+        Map<String, TezCounter> outputBytesMap =
+            attemptInfo.getCounter(TaskCounter.OUTPUT_BYTES.name());
+
+        for (Map.Entry<String, TezCounter> entry : spillCountMap.entrySet()) {
+          String source = entry.getKey();
+          long spillCount = entry.getValue().getValue();
+          long outBytes = outputBytesMap.get(source).getValue();
+
+          long outputRecords = outputRecordsMap.get(source).getValue();
+          long spilledRecords = spilledRecordsMap.get(source).getValue();
+
+          if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) {
+            List<String> recorList = Lists.newLinkedList();
+            recorList.add(vertexName);
+            recorList.add(attemptInfo.getTaskAttemptId());
+            recorList.add(attemptInfo.getNodeId());
+            recorList.add(source);
+            recorList.add(spillCount + "");
+            recorList.add(attemptInfo.getTimeTaken() + "");
+            recorList.add(outBytes + "");
+            recorList.add(outputRecords + "");
+            recorList.add(spilledRecords + "");
+            recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
+                + ", try increasing container size.");
+
+            csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "SpillAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze spill details in the task";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}
diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml
new file mode 100644 (file)
index 0000000..97ce541
--- /dev/null
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-tools</artifactId>
+    <version>0.8.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-perf-analyzer</artifactId>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>job-analyzer</module>
+  </modules>
+</project>
index bf8fdf8..ed13143 100644 (file)
@@ -26,6 +26,9 @@
   <artifactId>tez-tools</artifactId>
   <packaging>pom</packaging>
 
+  <modules>
+    <module>analyzers</module>
+  </modules>
   <build>
     <plugins>
       <plugin>