TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)
authorRajesh Balamohan <rbalamohan@apache.org>
Tue, 11 Aug 2015 12:49:55 +0000 (18:19 +0530)
committerRajesh Balamohan <rbalamohan@apache.org>
Tue, 11 Aug 2015 12:49:55 +0000 (18:19 +0530)
17 files changed:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java [moved from tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java with 69% similarity]
tez-tools/analyzers/job-analyzer/pom.xml
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java [new file with mode: 0644]

index b37eb9e..3de9fb7 100644 (file)
@@ -16,6 +16,7 @@ INCOMPATIBLE CHANGES
   TEZ-2699. Internalize strings in ATF parser
 
 ALL CHANGES:
+  TEZ-2692. bugfixes & enhancements related to job parser and analyzer.
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
   TEZ-2630. TezChild receives IP address instead of FQDN.
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
index 6de9c59..eb2bd41 100644 (file)
@@ -62,6 +62,7 @@ public class Graph {
     List<Edge> outs;
     String label;
     String shape;
+    String color;
 
     public Node(String id) {
       this(id, null);
@@ -104,6 +105,10 @@ public class Graph {
     public void setShape(String shape) {
       this.shape = shape;
     }
+
+    public void setColor(String color) {
+      this.color = color;
+    }
   }
 
   private String name;
@@ -196,17 +201,19 @@ public class Graph {
     for (Node n : nodes) {
       if (n.shape != null && !n.shape.isEmpty()) {
         sb.append(String.format(
-            "%s%s [ label = %s, shape = %s ];",
+            "%s%s [ label = %s, shape = %s , color= %s];",
             indent,
             wrapSafeString(n.getUniqueId()),
             wrapSafeString(n.getLabel()),
-            wrapSafeString(n.shape)));
+            wrapSafeString(n.shape),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       } else {
         sb.append(String.format(
-            "%s%s [ label = %s ];",
+            "%s%s [ label = %s , color= %s ];",
             indent,
             wrapSafeString(n.getUniqueId()),
-            wrapSafeString(n.getLabel())));
+            wrapSafeString(n.getLabel()),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       }
       sb.append(System.getProperty("line.separator"));
       List<Edge> combinedOuts = combineEdges(n.outs);
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
new file mode 100644 (file)
index 0000000..09c010a
--- /dev/null
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory datamodel provided
+ * in org.apache.tez.history.parser.datamodel
+ * <p/>
+ * <p/>
+ * Most of the information should be available. Minor info like VersionInfo may not be available,
+ * as it is not captured in SimpleHistoryLogging.
+ */
+public class SimpleHistoryParser extends BaseParser {
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryParser.class);
+  private static final String UTF8 = "UTF-8";
+  private final File historyFile;
+
+
+  public SimpleHistoryParser(File historyFile) {
+    super();
+    Preconditions.checkArgument(historyFile.exists(), historyFile + " does not exist");
+    this.historyFile = historyFile;
+  }
+
+  /**
+   * Get in-memory representation of DagInfo
+   *
+   * @return DagInfo
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId");
+      dagId = dagId.trim();
+      parseContents(historyFile, dagId);
+      linkParsedContents();
+      return dagInfo;
+    } catch (IOException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    } catch (JSONException e) {
+      LOG.error("Error in parsing DAG ", e);
+      throw new TezException(e);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, JSONObject destination) throws JSONException {
+    if (source == null || destination == null) {
+      return;
+    }
+    for (Iterator it = source.keys(); it.hasNext(); ) {
+      String key = (String) it.next();
+      Object val = source.get(key);
+      destination.put(key, val);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, String entityName,
+      Map<String, JSONObject> destMap) throws JSONException {
+    JSONObject destinationJson = destMap.get(entityName);
+    JSONObject destOtherInfo = destinationJson.getJSONObject(Constants.OTHER_INFO);
+    populateOtherInfo(source, destOtherInfo);
+  }
+
+  private void parseContents(File historyFile, String dagId)
+      throws JSONException, FileNotFoundException, TezException {
+    Scanner scanner = new Scanner(historyFile, UTF8);
+    scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
+    JSONObject dagJson = null;
+    Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    while (scanner.hasNext()) {
+      String line = scanner.next();
+      JSONObject jsonObject = new JSONObject(line);
+      String entity = jsonObject.getString(Constants.ENTITY);
+      String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
+      switch (entityType) {
+      case Constants.TEZ_DAG_ID:
+        if (!dagId.equals(entity)) {
+          LOG.warn(dagId + " is not matching with " + entity);
+          continue;
+        }
+        // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them
+        // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish
+        // time etc).
+        if (dagJson == null) {
+          dagJson = jsonObject;
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, dagOtherInfo);
+        break;
+      case Constants.TEZ_VERTEX_ID:
+        String vertexName = entity;
+        TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
+        if (!tezDAGID.equals(tezVertexID.getDAGId())) {
+          LOG.warn(vertexName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!vertexJsonMap.containsKey(vertexName)) {
+          vertexJsonMap.put(vertexName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+        break;
+      case Constants.TEZ_TASK_ID:
+        String taskName = entity;
+        TezTaskID tezTaskID = TezTaskID.fromString(taskName);
+        if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
+          LOG.warn(taskName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!taskJsonMap.containsKey(taskName)) {
+          taskJsonMap.put(taskName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskName, taskJsonMap);
+        break;
+      case Constants.TEZ_TASK_ATTEMPT_ID:
+        String taskAttemptName = entity;
+        TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName);
+        if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
+          LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!attemptJsonMap.containsKey(taskAttemptName)) {
+          attemptJsonMap.put(taskAttemptName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+        break;
+      default:
+        break;
+      }
+    }
+    scanner.close();
+    if (dagJson != null) {
+      this.dagInfo = DagInfo.create(dagJson);
+    } else {
+      LOG.error("Dag is not yet parsed. Looks like partial file.");
+      throw new TezException(
+          "Please provide a valid/complete history log file containing " + dagId);
+    }
+    for (JSONObject jsonObject : vertexJsonMap.values()) {
+      VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+      this.vertexList.add(vertexInfo);
+      LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+    }
+    for (JSONObject jsonObject : taskJsonMap.values()) {
+      TaskInfo taskInfo = TaskInfo.create(jsonObject);
+      this.taskList.add(taskInfo);
+      LOG.debug("Parsed task {}", taskInfo.getTaskId());
+    }
+    for (JSONObject jsonObject : attemptJsonMap.values()) {
+      /**
+       * For converting SimpleHistoryLogging to in-memory representation
+       *
+       * We need to get "relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+       * "entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+       * "entitytype":"containerId"} and populate it in otherInfo object so that in-memory
+       * representation can parse it correctly
+       */
+    JSONObject subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+        .optJSONObject(0);
+      if (subJsonObject != null) {
+        String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+        if (!Strings.isNullOrEmpty(nodeId) && nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+          //populate it in otherInfo
+          JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+          String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+          if (otherInfo != null && nodeIdVal != null) {
+            otherInfo.put(Constants.NODE_ID, nodeIdVal);
+          }
+        }
+      }
+
+      subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+          .optJSONObject(1);
+      if (subJsonObject != null) {
+        String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+        if (!Strings.isNullOrEmpty(containerId) && containerId.equalsIgnoreCase(Constants.CONTAINER_ID)) {
+          //populate it in otherInfo
+          JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+          String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+          if (otherInfo != null && containerIdVal != null) {
+            otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+          }
+        }
+      }
+      TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+      this.attemptList.add(attemptInfo);
+      LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+    }
+  }
+}
\ No newline at end of file
index d2dac7d..6e227a5 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -164,13 +165,25 @@ public class VertexInfo extends BaseInfo {
     updateEdgeInfo();
   }
 
+  public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() {
+    return Collections.unmodifiableList(additionalInputInfoList);
+  }
+
+  public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() {
+    return Collections.unmodifiableList(additionalOutputInfoList);
+  }
+
   @Override
   public final long getStartTimeInterval() {
     return startTime - (dagInfo.getStartTime());
   }
 
   public final long getFirstTaskStartTimeInterval() {
-    return getFirstTaskToStart().getStartTimeInterval();
+    TaskInfo firstTask = getFirstTaskToStart();
+    if (firstTask == null) {
+      return 0;
+    }
+    return firstTask.getStartTimeInterval();
   }
 
   public final long getLastTaskFinishTimeInterval() {
@@ -270,14 +283,32 @@ public class VertexInfo extends BaseInfo {
 
   }
 
+
+  private List<TaskInfo> getTasksInternal() {
+    return Lists.newLinkedList(taskInfoMap.values());
+  }
+
   /**
    * Get all tasks
    *
    * @return list of taskInfo
    */
   public final List<TaskInfo> getTasks() {
-    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
-    Collections.sort(taskInfoList, orderingOnStartTime());
+    return Collections.unmodifiableList(getTasksInternal());
+  }
+
+  /**
+   * Get all tasks in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskInfo
+   */
+  public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) {
+    List<TaskInfo> taskInfoList = getTasksInternal();
+    if (sorted) {
+      Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering));
+    }
     return Collections.unmodifiableList(taskInfoList);
   }
 
@@ -352,12 +383,36 @@ public class VertexInfo extends BaseInfo {
     return Collections.unmodifiableList(outputVertices);
   }
 
-  public List<TaskAttemptInfo> getTaskAttempts() {
+  private List<TaskAttemptInfo> getTaskAttemptsInternal() {
     List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
     for (TaskInfo taskInfo : getTasks()) {
       taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
     }
-    Collections.sort(taskAttemptInfos, orderingOnAttemptStartTime());
+    return taskAttemptInfos;
+  }
+
+  /**
+   * Get all task attempts
+   *
+   * @return List<TaskAttemptInfo> list of attempts
+   */
+  public List<TaskAttemptInfo> getTaskAttempts() {
+    return Collections.unmodifiableList(getTaskAttemptsInternal());
+  }
+
+  /**
+   * Get all task attempts in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskAttemptInfo
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted,
+      @Nullable Ordering<TaskAttemptInfo> ordering) {
+    List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal();
+    if (sorted) {
+      Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering));
+    }
     return Collections.unmodifiableList(taskAttemptInfos);
   }
 
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.DAGCounter;
@@ -53,9 +54,12 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.examples.WordCount;
 import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.BaseInfo;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.EdgeInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
@@ -80,12 +84,16 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class TestATSFileParser {
+public class TestHistoryParser {
 
   private static MiniDFSCluster miniDFSCluster;
   private static MiniTezClusterWithTimeline miniTezCluster;
@@ -97,19 +105,17 @@ public class TestATSFileParser {
   private final static String OUTPUT = "Output";
   private final static String TOKENIZER = "Tokenizer";
   private final static String SUMMATION = "Summation";
+  private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+  private final static String HISTORY_TXT = "history.txt";
 
   private static Configuration conf = new Configuration();
   private static FileSystem fs;
   private static String TEST_ROOT_DIR =
-      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tmpDir";
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tmpDir";
   private static String TEZ_BASE_DIR =
-      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez";
   private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
 
-  private static TezClient tezClient;
-
-  private static int dagNumber;
-
   @BeforeClass
   public static void setupCluster() throws Exception {
     conf = new Configuration();
@@ -127,15 +133,6 @@ public class TestATSFileParser {
   @AfterClass
   public static void shutdownCluster() {
     try {
-      if (tezClient != null) {
-        try {
-          tezClient.stop();
-        } catch (TezException e) {
-          //ignore
-        } catch (IOException e) {
-          //ignore
-        }
-      }
       if (miniDFSCluster != null) {
         miniDFSCluster.shutdown();
       }
@@ -165,6 +162,8 @@ public class TestATSFileParser {
     conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
         .class.getName());
 
+    conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
+
     miniTezCluster =
         new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
 
@@ -180,15 +179,13 @@ public class TestATSFileParser {
     tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
         ATSHistoryLoggingService.class.getName());
 
-    tezClient = TezClient.create("WordCount", tezConf, true);
-    tezClient.start();
-    tezClient.waitTillReady();
   }
 
 
   /**
    * Run a word count example in mini cluster and check if it is possible to download
-   * data from ATS and parse it.
+   * data from ATS and parse it. Also, run with SimpleHistoryLogging option and verify
+   * if it matches with ATS data.
    *
    * @throws Exception
    */
@@ -196,7 +193,7 @@ public class TestATSFileParser {
   public void testParserWithSuccessfulJob() throws Exception {
     //Run basic word count example.
     String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
-        WordCount.SumProcessor.class.getName(), "WordCount");
+        WordCount.SumProcessor.class.getName(), "WordCount", true);
 
     //Export the data from ATS
     String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
@@ -204,12 +201,45 @@ public class TestATSFileParser {
     int result = ATSImportTool.process(args);
     assertTrue(result == 0);
 
-    //Parse ATS data
-    DagInfo dagInfo = getDagInfo(dagId);
+    //Parse ATS data and verify results
+    DagInfo dagInfoFromATS = getDagInfo(dagId);
+    verifyDagInfo(dagInfoFromATS, true);
+    verifyJobSpecificInfo(dagInfoFromATS);
 
-    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
-    verifyDagInfo(dagInfo);
+    //Now run with SimpleHistoryLogging
+    dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", false);
+    Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked download.
+
+    DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId);
+    verifyDagInfo(shDagInfo, false);
+    verifyJobSpecificInfo(shDagInfo);
+
+    //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing SimpleHistoryLog
+    isDAGEqual(dagInfoFromATS, shDagInfo);
+  }
+
+  private DagInfo getDagInfoFromSimpleHistory(String dagId) throws TezException, IOException {
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID
+        .getApplicationId(), 1);
+    Path historyPath = new Path(conf.get("fs.defaultFS")
+        + SIMPLE_HISTORY_DIR + HISTORY_TXT + "."
+        + applicationAttemptId);
+    FileSystem fs = historyPath.getFileSystem(conf);
+
+    Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT);
+    fs.copyToLocalFile(historyPath, localPath);
+    File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
+
+    //Now parse via SimpleHistory
+    SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
 
+  private void verifyJobSpecificInfo(DagInfo dagInfo) {
     //Job specific
     assertTrue(dagInfo.getNumVertices() == 2);
     assertTrue(dagInfo.getName().equals("WordCount"));
@@ -297,7 +327,7 @@ public class TestATSFileParser {
   public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
     //Run basic word count example.
     String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
-        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL");
+        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true);
 
     //Export the data from ATS
     String atsAddress = "--atsAddress=http://atsHost:8188";
@@ -317,7 +347,7 @@ public class TestATSFileParser {
   public void testParserWithFailedJob() throws Exception {
     //Run a job which would fail
     String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
-        .getName(), "WordCount-With-Exception");
+        .getName(), "WordCount-With-Exception", true);
 
     //Export the data from ATS
     String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
@@ -329,7 +359,7 @@ public class TestATSFileParser {
     DagInfo dagInfo = getDagInfo(dagId);
 
     //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
-    verifyDagInfo(dagInfo);
+    verifyDagInfo(dagInfo, true);
 
     //Dag specific
     VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
@@ -357,7 +387,7 @@ public class TestATSFileParser {
         20); //Every line has 2 words. 10 lines x 2 words = 20
     verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
         "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
-    
+
     for (TaskInfo taskInfo : summationVertex.getTasks()) {
       String lastAttemptId = null;
       for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
@@ -378,6 +408,151 @@ public class TestATSFileParser {
   }
 
   /**
+   * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode also needs to
+   * change. Also, some custom comparisons are done here for unit testing.
+   */
+  private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) {
+    assertNotNull(dagInfo1);
+    assertNotNull(dagInfo2);
+    assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus());
+    isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges());
+    isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices());
+  }
+
+  private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) {
+    assertTrue(vertexInfo1 != null);
+    assertTrue(vertexInfo2 != null);
+    assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName()));
+    assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName()));
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    assertTrue(vertexInfo1.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount());
+    assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus()));
+
+    isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges());
+    isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges());
+
+    assertTrue(vertexInfo1.getInputVertices().size() == vertexInfo2.getInputVertices().size());
+    assertTrue(vertexInfo1.getOutputVertices().size() == vertexInfo2.getOutputVertices().size());
+
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks());
+  }
+
+  private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> vertexList2) {
+    assertTrue("Vertices sizes should be the same", vertexList1.size() == vertexList2.size());
+    Iterator<VertexInfo> it1 = vertexList1.iterator();
+    Iterator<VertexInfo> it2 = vertexList2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      VertexInfo info1 = it1.next();
+      VertexInfo info2 = it2.next();
+      isVertexEqual(info1, info2);
+    }
+  }
+
+  private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) {
+    assertTrue(edgeInfo1 != null);
+    assertTrue(edgeInfo2 != null);
+    String info1 = edgeInfo1.toString();
+    String info2 = edgeInfo1.toString();
+    assertTrue(info1.equals(info2));
+  }
+
+  private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<EdgeInfo> it1 = info1.iterator();
+    Iterator<EdgeInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isEdgeEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskInfo> it1 = info1.iterator();
+    Iterator<TaskInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) {
+    assertTrue(taskInfo1 != null);
+    assertTrue(taskInfo2 != null);
+    assertTrue(taskInfo1.getVertexInfo() != null);
+    assertTrue(taskInfo2.getVertexInfo() != null);
+    assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus()));
+    assertTrue(
+        taskInfo1.getVertexInfo().getVertexName()
+            .equals(taskInfo2.getVertexInfo().getVertexName()));
+    isTaskAttemptEqual(taskInfo1.getTaskAttempts(), taskInfo2.getTaskAttempts());
+
+    //Verify counters
+    isCountersSame(taskInfo1, taskInfo2);
+  }
+
+  private void isCountersSame(BaseInfo info1, BaseInfo info2) {
+    isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()),
+        info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()),
+        info2.getCounter(TaskCounter.SPILLED_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()),
+        info2.getCounter(TaskCounter.OUTPUT_BYTES.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()));
+  }
+
+  private void isCounterSame(Map<String, TezCounter> counter1, Map<String, TezCounter> counter2) {
+    for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) {
+      String source = entry.getKey();
+      long val = entry.getValue().getValue();
+
+      //check if other counter has the same value
+      assertTrue(counter2.containsKey(entry.getKey()));
+      assertTrue(counter2.get(entry.getKey()).getValue() == val);
+    }
+  }
+
+  private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1,
+      Collection<TaskAttemptInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskAttemptInfo> it1 = info1.iterator();
+    Iterator<TaskAttemptInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskAttemptEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo info2) {
+    assertTrue(info1 != null);
+    assertTrue(info2 != null);
+    assertTrue(info1.getTaskInfo() != null);
+    assertTrue(info2.getTaskInfo() != null);
+    assertTrue(info1.getStatus().equals(info2.getStatus()));
+    assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo()
+        .getVertexInfo().getVertexName()));
+
+    //Verify counters
+    isCountersSame(info1, info2);
+  }
+
+
+  /**
    * Create sample file for wordcount program
    *
    * @param inputLoc
@@ -419,11 +594,28 @@ public class TestATSFileParser {
     }
   }
 
+  TezClient getTezClient(boolean withTimeline) throws Exception {
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    if (withTimeline) {
+      tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline);
+      tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+    } else {
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          SimpleHistoryLoggingService.class.getName());
+    }
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+
+    TezClient tezClient = TezClient.create("WordCount", tezConf, false);
+    tezClient.start();
+    tezClient.waitTillReady();
+    return tezClient;
+  }
+
   private String runWordCount(String tokenizerProcessor, String summationProcessor,
-      String dagName)
+      String dagName, boolean withTimeline)
       throws Exception {
-    dagNumber++;
-
     //HDFS path
     Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
 
@@ -448,10 +640,14 @@ public class TestATSFileParser {
     dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
         Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
 
+    TezClient tezClient = getTezClient(withTimeline);
     DAGClient client = tezClient.submitDAG(dag);
     client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-    TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), dagNumber);
+    TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
 
+    if (tezClient != null) {
+      tezClient.stop();
+    }
     return tezDAGID.toString();
   }
 
@@ -469,12 +665,14 @@ public class TestATSFileParser {
     }
   }
 
-  private void verifyDagInfo(DagInfo dagInfo) {
-    VersionInfo versionInfo = dagInfo.getVersionInfo();
-    assertTrue(versionInfo != null); //should be present post 0.5.4
-    assertTrue(versionInfo.getVersion() != null);
-    assertTrue(versionInfo.getRevision() != null);
-    assertTrue(versionInfo.getBuildTime() != null);
+  private void verifyDagInfo(DagInfo dagInfo, boolean ats) {
+    if (ats) {
+      VersionInfo versionInfo = dagInfo.getVersionInfo();
+      assertTrue(versionInfo != null); //should be present post 0.5.4
+      assertTrue(versionInfo.getVersion() != null);
+      assertTrue(versionInfo.getRevision() != null);
+      assertTrue(versionInfo.getBuildTime() != null);
+    }
 
     assertTrue(dagInfo.getStartTime() > 0);
     assertTrue(dagInfo.getFinishTimeInterval() > 0);
index fe28b14..36b12fe 100644 (file)
       <artifactId>tez-history-parser</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.plutext</groupId>
+      <artifactId>jaxb-svg11</artifactId>
+      <version>1.0.2</version>
+    </dependency>
   </dependencies>
 
   <build>
index 4151a90..27ad95e 100644 (file)
 package org.apache.tez.analyzer;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.base.Strings;
 import org.apache.tez.dag.api.TezException;
 
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
@@ -99,7 +98,7 @@ public class CSVResult implements Result {
 
       StringBuilder sb = new StringBuilder();
       for(int i=0;i<record.length;i++) {
-        sb.append(Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
+        sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
         if (i < record.length - 1) {
           sb.append(",");
         }
index 6748f3f..88d45f3 100644 (file)
@@ -19,6 +19,8 @@
 package org.apache.tez.analyzer.plugins;
 
 import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -26,10 +28,13 @@ import com.google.common.collect.Ordering;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.analyzer.Analyzer;
 import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -43,24 +48,43 @@ public class CriticalPathAnalyzer implements Analyzer {
 
   private final CSVResult csvResult;
 
+  private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+  private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+  private final String dotFileLocation;
+
+  private static final String CONNECTOR = "-->";
+
   public CriticalPathAnalyzer(Configuration config) {
     this.config = config;
     this.csvResult = new CSVResult(headers);
+    this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
   }
 
   @Override public void analyze(DagInfo dagInfo) throws TezException {
     Map<String, Long> result = Maps.newLinkedHashMap();
     getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
 
-    System.out.println();
-    System.out.println();
-
-    for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) {
+    Map<String, Long> sortedByValues = sortByValues(result);
+    for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
       List<String> record = Lists.newLinkedList();
       record.add(entry.getKey());
       record.add(entry.getValue() + "");
       csvResult.addRecord(record.toArray(new String[record.size()]));
-      System.out.println(entry.getKey() + ", " + entry.getValue());
+    }
+
+    String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+    try {
+      List<String> criticalVertices = null;
+      if (!sortedByValues.isEmpty()) {
+        String criticalPath = sortedByValues.keySet().iterator().next();
+        criticalVertices = getVertexNames(criticalPath);
+      } else {
+        criticalVertices = Lists.newLinkedList();
+      }
+      Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+    } catch (IOException e) {
+      throw new TezException(e);
     }
   }
 
@@ -98,7 +122,7 @@ public class CriticalPathAnalyzer implements Analyzer {
 
     if (dest != null) {
       time += dest.getTimeTaken();
-      predecessor += destVertexName + "-->";
+      predecessor += destVertexName + CONNECTOR;
 
       for (VertexInfo incomingVertex : dest.getInputVertices()) {
         getCriticalPath(predecessor, incomingVertex, time, result);
@@ -107,4 +131,12 @@ public class CriticalPathAnalyzer implements Analyzer {
       result.put(predecessor, time);
     }
   }
+
+  private static List<String> getVertexNames(String criticalPath) {
+    if (Strings.isNullOrEmpty(criticalPath)) {
+      return Lists.newLinkedList();
+    }
+    return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+        (criticalPath));
+  }
 }
index 67b4c51..7ed52da 100644 (file)
@@ -108,7 +108,8 @@ public class LocalityAnalyzer implements Analyzer {
         record.add(otherTaskResult.avgRuntime + "");
 
         //Get the number of inputs to this vertex
-        record.add(vertexInfo.getInputEdges().size() + "");
+        record.add(vertexInfo.getInputEdges().size()
+            + vertexInfo.getAdditionalInputInfoList().size() + "");
 
         //Get the avg HDFS bytes read in this vertex for different type of locality
         record.add(dataLocalResult.avgHDFSBytesRead + "");
index 8df40ba..a570493 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.apache.tez.analyzer.plugins;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.analyzer.Analyzer;
@@ -43,15 +44,21 @@ import java.util.Map;
  */
 public class ShuffleTimeAnalyzer implements Analyzer {
 
-  private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio";
-  private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f;
+  /**
+   * ratio of (total time taken by task - shuffle time) / (total time taken by task)
+   */
+  private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
+  private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
 
+  /**
+   * Number of min records that needs to get in as reduce input records.
+   */
   private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
   private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
 
   private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
       "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
-      "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+      "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
       "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
       "SHUFFLE_BYTES_DISK_DIRECT" };
 
@@ -59,15 +66,15 @@ public class ShuffleTimeAnalyzer implements Analyzer {
 
   private final Configuration config;
 
-  private final float shuffleTimeRatio;
+  private final float realWorkDoneRatio;
   private final long minShuffleRecords;
 
 
   public ShuffleTimeAnalyzer(Configuration config) {
     this.config = config;
 
-    shuffleTimeRatio = config.getFloat
-        (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT);
+    realWorkDoneRatio = config.getFloat
+        (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
     minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
   }
 
@@ -105,15 +112,20 @@ public class ShuffleTimeAnalyzer implements Analyzer {
             result.add(counterGroupName);
 
             //Real work done in the task
-            long timeTakenForRealWork = attemptInfo.getTimeTaken() -
-                Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName,
-                    attemptInfo));
-
             String comments = "";
-            if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio) {
-              comments = "Time taken in shuffle is more than the actual work being done in task. "
-                  + " Check if source/destination machine is a slow node. Check if merge phase "
-                  + "time is more to understand disk bottlenecks in this node.  Check for skew";
+            String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME,
+                counterGroupName, attemptInfo);
+            String timeTakenForRealWork = "";
+            if (!Strings.isNullOrEmpty(mergePhaseTime)) {
+              long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime);
+
+              if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) {
+                comments = "Time taken in shuffle is more than the actual work being done in task. "
+                    + " Check if source/destination machine is a slow node. Check if merge phase "
+                    + "time is more to understand disk bottlenecks in this node.  Check for skew";
+              }
+
+              timeTakenForRealWork = Long.toString(realWorkDone);
             }
             result.add(comments);
 
@@ -122,13 +134,14 @@ public class ShuffleTimeAnalyzer implements Analyzer {
             result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
             result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
 
+            result.add(Long.toString(attemptInfo.getTimeTaken()));
+
             //Total time taken for receiving all events from source tasks
             result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo));
 
-
-            result.add(Long.toString(timeTakenForRealWork));
+            result.add(timeTakenForRealWork);
 
             result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo));
@@ -150,11 +163,16 @@ public class ShuffleTimeAnalyzer implements Analyzer {
    * @return String
    */
   private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) {
-    long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
-        counterGroupName, attemptInfo));
-    long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
-        counterGroupName, attemptInfo));
-    return Long.toString(lastEventReceived - firstEventReceived);
+    String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+    String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+
+    if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) {
+      return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived));
+    } else {
+      return "";
+    }
   }
 
   private String getCounterValue(TaskCounter counter, String counterGroupName,
index 8152344..f09380d 100644 (file)
@@ -57,6 +57,10 @@ import java.util.Map;
  */
 public class SkewAnalyzer implements Analyzer {
 
+  /**
+   * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold,
+   * it would not be considered for analysis.
+   */
   private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
       + ".bytes.per.source";
   private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;
index 7c7f5c0..1a8d9d3 100644 (file)
@@ -41,7 +41,7 @@ import java.util.List;
 public class SlowTaskIdentifier implements Analyzer {
 
   private static final String[] headers = { "vertexName", "taskAttemptId",
-      "Node", "taskDuration", "Status",
+      "Node", "taskDuration", "Status", "diagnostics",
       "NoOfInputs" };
 
   private final CSVResult csvResult;
@@ -72,14 +72,21 @@ public class SlowTaskIdentifier implements Analyzer {
       }
     });
 
-    int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT);
-    for(int i=0;i<limit;i++) {
+    int limit = Math.min(taskAttempts.size(),
+        Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+
+    if (limit == 0) {
+      return;
+    }
+
+    for (int i = 0; i < limit - 1; i++) {
       List<String> record = Lists.newLinkedList();
       record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
       record.add(taskAttempts.get(i).getTaskAttemptId());
       record.add(taskAttempts.get(i).getContainer().getHost());
       record.add(taskAttempts.get(i).getTimeTaken() + "");
       record.add(taskAttempts.get(i).getStatus());
+      record.add(taskAttempts.get(i).getDiagnostics());
       record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + "");
 
       csvResult.addRecord(record.toArray(new String[record.size()]));
index b7fca0b..c8d9695 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
 import java.util.List;
@@ -41,7 +42,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
   private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
       "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
-      "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+      "75thPercentile", "95thPercentile", "98thPercentile", "Median",
       "observation", "comments" };
 
   private final CSVResult csvResult = new CSVResult(headers);
@@ -50,8 +51,27 @@ public class SlowestVertexAnalyzer implements Analyzer {
   private final MetricRegistry metrics = new MetricRegistry();
   private Histogram taskAttemptRuntimeHistorgram;
 
+  private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
+  private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000;
+
+  private final long vertexRuntimeThreshold;
+
   public SlowestVertexAnalyzer(Configuration config) {
     this.config = config;
+    this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
+        MAX_VERTEX_RUNTIME_DEFAULT));
+
+  }
+
+  private long getTaskRuntime(VertexInfo vertexInfo) {
+    TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
+    TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
+
+    DagInfo dagInfo = vertexInfo.getDagInfo();
+    long totalTime = ((lastTaskToFinish == null) ?
+        dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) -
+        ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
+    return totalTime;
   }
 
   @Override
@@ -59,9 +79,13 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
     for (VertexInfo vertexInfo : dagInfo.getVertices()) {
       String vertexName = vertexInfo.getVertexName();
-      long totalTime = vertexInfo.getTimeTaken();
+      if (vertexInfo.getFirstTaskToStart()  == null || vertexInfo.getLastTaskToFinish() == null) {
+        continue;
+      }
+
+      long totalTime = getTaskRuntime(vertexInfo);
 
-      long max = Long.MIN_VALUE;
+      long slowestLastEventTime = Long.MIN_VALUE;
       String maxSourceName = "";
       taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
 
@@ -81,10 +105,8 @@ public class SlowestVertexAnalyzer implements Analyzer {
             continue;
           }
           //Find the slowest last event received
-          if (entry.getValue().getValue() > max) {
-            //w.r.t vertex start time.
-            max =(attemptInfo.getStartTimeInterval() +  entry.getValue().getValue()) -
-                (vertexInfo.getStartTimeInterval());
+          if (entry.getValue().getValue() > slowestLastEventTime) {
+            slowestLastEventTime = entry.getValue().getValue();
             maxSourceName = entry.getKey();
           }
         }
@@ -104,9 +126,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
           }
           //Find the slowest last event received
           if (entry.getValue().getValue() > shuffleMax) {
-            //w.r.t vertex start time.
-            shuffleMax =(attemptInfo.getStartTimeInterval() +  entry.getValue().getValue()) -
-                (vertexInfo.getStartTimeInterval());
+            shuffleMax = entry.getValue().getValue();
             shuffleMaxSource = entry.getKey();
           }
         }
@@ -120,9 +140,10 @@ public class SlowestVertexAnalyzer implements Analyzer {
       record.add(totalTime + "");
       record.add(Math.max(0, shuffleMax) + "");
       record.add(shuffleMaxSource);
-      record.add(Math.max(0, max) + "");
+      record.add(Math.max(0, slowestLastEventTime) + "");
       record.add(maxSourceName);
-      record.add(Math.max(0,(totalTime - max)) + "");
+      //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+      // that it went to starvation).
 
       StringBuilder sb = new StringBuilder();
       double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
@@ -145,7 +166,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
       if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
         if ((shuffleMax * 1.0f / totalTime) > 0.5) {
-          if ((max * 1.0f / totalTime) > 0.5) {
+          if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) {
             comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last"
                 + " event received";
           } else {
@@ -153,8 +174,9 @@ public class SlowestVertexAnalyzer implements Analyzer {
                 "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
           }
         } else {
-          if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later.
-            comments = "Concentrate on this vertex (totalTime > 10 seconds)";
+          if (totalTime > vertexRuntimeThreshold) { //greater than X seconds.
+            comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold
+                + " seconds)";
           }
         }
       }
index c650104..83b1bb0 100644 (file)
@@ -49,12 +49,21 @@ public class SpillAnalyzerImpl implements Analyzer {
 
   private final CSVResult csvResult;
 
-  private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l;
+  /**
+   * Minimum output bytes that should be chunrned out by a task
+   */
+  private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+      + ".threshold";
+  private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l;
+
+  private final long minOutputBytesPerTask;
 
   private final Configuration config;
 
   public SpillAnalyzerImpl(Configuration config) {
     this.config = config;
+    minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
+        OUTPUT_BYTES_THRESHOLD_DEFAULT));
     this.csvResult = new CSVResult(headers);
   }
 
@@ -83,7 +92,7 @@ public class SpillAnalyzerImpl implements Analyzer {
           long outputRecords = outputRecordsMap.get(source).getValue();
           long spilledRecords = spilledRecordsMap.get(source).getValue();
 
-          if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) {
+          if (spillCount > 1 && outBytes > minOutputBytesPerTask) {
             List<String> recorList = Lists.newLinkedList();
             recorList.add(vertexName);
             recorList.add(attemptInfo.getTaskAttemptId());
@@ -95,7 +104,7 @@ public class SpillAnalyzerImpl implements Analyzer {
             recorList.add(outputRecords + "");
             recorList.add(spilledRecords + "");
             recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
-                + ", try increasing container size.");
+                + ". Try increasing container size.");
 
             csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
           }
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
new file mode 100644 (file)
index 0000000..c07ff83
--- /dev/null
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Analyze concurrent tasks running in every vertex at regular intervals.
+ */
+public class TaskConcurrencyAnalyzer implements Analyzer {
+
+  private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
+
+  private final CSVResult csvResult;
+  private final Configuration config;
+
+  public TaskConcurrencyAnalyzer(Configuration conf) {
+    this.csvResult = new CSVResult(headers);
+    this.config = conf;
+  }
+
+  private enum EventType {START, FINISH}
+
+  static class TimeInfo {
+    EventType eventType;
+    long timestamp;
+    int concurrentTasks;
+
+    public TimeInfo(EventType eventType, long timestamp) {
+      this.eventType = eventType;
+      this.timestamp = timestamp;
+    }
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    //For each vertex find the concurrent tasks running at any point
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      List<TaskAttemptInfo> taskAttempts =
+          Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
+
+      String vertexName = vertexInfo.getVertexName();
+
+      /**
+       * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
+       * tasks starting/ending at same time.
+       * - Walk through the set
+       * - Increment concurrent tasks when start event is encountered
+       * - Decrement concurrent tasks when start event is encountered
+       */
+      TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() {
+        @Override public int compare(TimeInfo o1, TimeInfo o2) {
+          return (o1.timestamp < o2.timestamp) ? -1 :
+              ((o1.timestamp == o2.timestamp) ? 0 : 1);
+        }
+      });
+
+      for (TaskAttemptInfo attemptInfo : taskAttempts) {
+        TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
+        TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
+
+        timeInfoSet.add(startTimeInfo);
+        timeInfoSet.add(stopTimeInfo);
+      }
+
+      //Compute concurrent tasks in the list now.
+      int concurrentTasks = 0;
+      for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
+        switch (timeInfo.eventType) {
+        case START:
+          concurrentTasks += timeInfoSet.count(timeInfo);
+          break;
+        case FINISH:
+          concurrentTasks -= timeInfoSet.count(timeInfo);
+          break;
+        default:
+          break;
+        }
+        timeInfo.concurrentTasks = concurrentTasks;
+        addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
+      }
+    }
+  }
+
+  private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
+    String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "TaskConcurrencyAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze how many tasks were running in every vertex at given point in time. This "
+        + "would be helpful in understanding whether any starvation was there or not.";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
new file mode 100644 (file)
index 0000000..4a582bb
--- /dev/null
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.plutext.jaxb.svg11.Line;
+import org.plutext.jaxb.svg11.ObjectFactory;
+import org.plutext.jaxb.svg11.Svg;
+import org.plutext.jaxb.svg11.Title;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.namespace.QName;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+public class SVGUtils {
+
+  private static final String UTF8 = "UTF-8";
+
+  private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class);
+
+
+  private final ObjectFactory objectFactory;
+  private final Svg svg;
+  private final QName titleName = new QName("title");
+
+  private static int MAX_DAG_RUNTIME = 0;
+  private static final int SCREEN_WIDTH = 1800;
+
+  private final DagInfo dagInfo;
+
+  //Gap between various components
+  private static final int DAG_GAP = 70;
+  private static final int VERTEX_GAP = 50;
+  private static final int TASK_GAP = 5;
+  private static final int STROKE_WIDTH = 5;
+
+  //To compute the size of the graph.
+  private long MIN_X = Long.MAX_VALUE;
+  private long MAX_X = Long.MIN_VALUE;
+
+  private int x1 = 0;
+  private int y1 = 0;
+  private int y2 = 0;
+
+  public SVGUtils(DagInfo dagInfo) {
+    this.dagInfo = dagInfo;
+    this.objectFactory = new ObjectFactory();
+    this.svg = objectFactory.createSvg();
+  }
+
+  private Line createLine(int x1, int y1, int x2, int y2) {
+    Line line = objectFactory.createLine();
+    line.setX1(scaleDown(x1) + "");
+    line.setY1(y1 + "");
+    line.setX2(scaleDown(x2) + "");
+    line.setY2(y2 + "");
+    return line;
+  }
+
+  private Title createTitle(String msg) {
+    Title t = objectFactory.createTitle();
+    t.setContent(msg);
+    return t;
+  }
+
+  private Title createTitleForVertex(VertexInfo vertex) {
+    String titleStr = vertex.getVertexName() + ":"
+        + (vertex.getFinishTimeInterval())
+        + " ms, RelativeTimeToDAG:"
+        + (vertex.getInitTime() - this.dagInfo.getStartTime())
+        + " ms, counters:" + vertex.getTezCounters();
+    Title title = createTitle(titleStr);
+    return title;
+  }
+
+  private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) {
+    String titleStr = "RelativeTimeToVertex:"
+        + (taskAttemptInfo.getStartTime() -
+        taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) +
+        " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters();
+    Title title = createTitle(titleStr);
+    return title;
+  }
+
+  /**
+   * Draw DAG from dagInfo
+   *
+   * @param dagInfo
+   */
+  private void drawDAG(DagInfo dagInfo) {
+    Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms");
+    int duration = (int) dagInfo.getFinishTimeInterval();
+    MAX_DAG_RUNTIME = duration;
+    MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X);
+    MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X);
+    Line line = createLine(x1, y1, x1 + duration, y2);
+    line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title));
+    line.setStyle("stroke: black; stroke-width:20");
+    line.setOpacity("0.3");
+    svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+    drawVertex();
+  }
+
+  private Collection<VertexInfo> getSortedVertices() {
+    Collection<VertexInfo> vertices = this.dagInfo.getVertices();
+    // Add corresponding vertex details
+    TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>(
+        new Comparator<VertexInfo>() {
+          @Override
+          public int compare(VertexInfo o1, VertexInfo o2) {
+            return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval());
+          }
+        });
+    vertexSet.addAll(vertices);
+    return  vertexSet;
+  }
+
+  private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) {
+    Collection<TaskInfo> tasks = vertexInfo.getTasks();
+    // Add corresponding task details
+    TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() {
+      @Override
+      public int compare(TaskInfo o1, TaskInfo o2) {
+        return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval()
+            - o2.getSuccessfulTaskAttempt().getStartTimeInterval());
+      }
+    });
+    taskSet.addAll(tasks);
+    return taskSet;
+  }
+
+  /**
+   * Draw the vertices
+   *
+   */
+  public void drawVertex() {
+    Collection<VertexInfo> vertices = getSortedVertices();
+    for (VertexInfo vertex : vertices) {
+      //Set vertex start time as the one when its first task attempt started executing
+      x1 = (int) vertex.getStartTimeInterval();
+      y1 += VERTEX_GAP;
+      int duration = ((int) (vertex.getTimeTaken()));
+      Line line = createLine(x1, y1, x1 + duration, y1);
+      line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH);
+      line.setOpacity("0.3");
+
+      Title vertexTitle = createTitleForVertex(vertex);
+      line.getSVGDescriptionClass().add(
+          new JAXBElement<Title>(titleName, Title.class, vertexTitle));
+      svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+      // For each vertex, draw the tasks
+      drawTask(vertex);
+    }
+    x1 = x1 + (int) dagInfo.getFinishTimeInterval();
+    y1 = y1 + DAG_GAP;
+    y2 = y1;
+  }
+
+  /**
+   * Draw tasks
+   *
+   * @param vertex
+   */
+  public void drawTask(VertexInfo vertex) {
+    Collection<TaskInfo> tasks = getSortedTasks(vertex);
+    for (TaskInfo task : tasks) {
+      for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) {
+        x1 = (int) taskAttemptInfo.getStartTimeInterval();
+        y1 += TASK_GAP;
+        int duration = (int) taskAttemptInfo.getTimeTaken();
+        Line line = createLine(x1, y1, x1 + duration, y1);
+        String color =
+            taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name())
+                ? "green" : "red";
+        line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH);
+        Title title = createTitleForTaskAttempt(taskAttemptInfo);
+        line.getSVGDescriptionClass().add(
+            new JAXBElement<Title>(titleName, Title.class, title));
+        svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass()
+            .add(line);
+      }
+    }
+  }
+
+  /**
+   * Convert DAG to graph
+   *
+   * @throws java.io.IOException
+   * @throws javax.xml.bind.JAXBException
+   */
+  public void saveAsSVG(String fileName) throws IOException, JAXBException {
+    drawDAG(dagInfo);
+    svg.setHeight("" + y2);
+    svg.setWidth("" + (MAX_X - MIN_X));
+    String tempFileName = System.nanoTime() + ".svg";
+    File file = new File(tempFileName);
+    JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class);
+    Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
+    jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+    jaxbMarshaller.marshal(svg, file);
+    //TODO: dirty workaround to get rid of XMLRootException issue
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(new FileInputStream(file), UTF8));
+    BufferedWriter writer = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(fileName), UTF8));
+    try {
+      while (reader.ready()) {
+        String line = reader.readLine();
+        if (line != null) {
+          line = line.replaceAll(
+              " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", "");
+          writer.write(line);
+          writer.newLine();
+        }
+      }
+    } finally {
+      IOUtils.closeQuietly(reader);
+      IOUtils.closeQuietly(writer);
+      if (file.exists()) {
+        boolean deleted = file.delete();
+        LOG.debug("Deleted {}" + file.getAbsolutePath());
+      }
+    }
+  }
+
+  private float scaleDown(int len) {
+    return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH;
+  }
+}
\ No newline at end of file
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
new file mode 100644 (file)
index 0000000..8bcf265
--- /dev/null
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.utils;
+
+import com.sun.istack.Nullable;
+import org.apache.tez.dag.utils.Graph;
+import org.apache.tez.history.parser.datamodel.AdditionalInputOutputDetails;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Utils {
+
+  private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+
+  public static String getShortClassName(String className) {
+    int pos = className.lastIndexOf(".");
+    if (pos != -1 && pos < className.length() - 1) {
+      return className.substring(pos + 1);
+    }
+    return className;
+  }
+
+  public static String sanitizeLabelForViz(String label) {
+    Matcher m = sanitizeLabelPattern.matcher(label);
+    return m.replaceAll("_");
+  }
+
+  public static void generateDAGVizFile(DagInfo dagInfo, String fileName,
+      @Nullable List<String> criticalVertices) throws IOException {
+    Graph graph = new Graph(sanitizeLabelForViz(dagInfo.getName()));
+
+    for (VertexInfo v : dagInfo.getVertices()) {
+      String nodeLabel = sanitizeLabelForViz(v.getVertexName())
+          + "[" + getShortClassName(v.getProcessorClassName()
+          + ", tasks=" + v.getTasks().size() + ", time=" + v.getTimeTaken() +" ms]");
+      Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getVertexName()), nodeLabel);
+
+      boolean criticalVertex = (criticalVertices != null) ? criticalVertices.contains(v
+          .getVertexName()) : false;
+      if (criticalVertex) {
+        n.setColor("red");
+      }
+
+
+      for (AdditionalInputOutputDetails input : v.getAdditionalInputInfoList()) {
+        Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+            + "_" + sanitizeLabelForViz(input.getName()));
+        inputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+            + "[" + sanitizeLabelForViz(input.getName()) + "]");
+        inputNode.setShape("box");
+        inputNode.addEdge(n, "Input name=" + input.getName()
+            + " [inputClass=" + getShortClassName(input.getClazz())
+            + ", initializer=" + getShortClassName(input.getInitializer()) + "]");
+      }
+      for (AdditionalInputOutputDetails output : v.getAdditionalOutputInfoList()) {
+        Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+            + "_" + sanitizeLabelForViz(output.getName()));
+        outputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+            + "[" + sanitizeLabelForViz(output.getName()) + "]");
+        outputNode.setShape("box");
+        n.addEdge(outputNode, "Output name=" + output.getName()
+            + " [outputClass=" + getShortClassName(output.getClazz())
+            + ", committer=" + getShortClassName(output.getInitializer()) + "]");
+      }
+
+    }
+
+    for (EdgeInfo e : dagInfo.getEdges()) {
+      Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName()));
+      n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())),
+          "[input=" + getShortClassName(e.getEdgeSourceClass())
+              + ", output=" + getShortClassName(e.getEdgeDestinationClass())
+              + ", dataMovement=" + e.getDataMovementType().trim() + "]");
+    }
+
+    graph.save(fileName);
+  }
+}