TEZ-2690. Add critical path analyser (bikas)
authorBikas Saha <bikas@apache.org>
Tue, 25 Aug 2015 18:10:10 +0000 (11:10 -0700)
committerBikas Saha <bikas@apache.org>
Tue, 25 Aug 2015 18:10:10 +0000 (11:10 -0700)
CHANGES.txt
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
tez-tools/analyzers/job-analyzer/pom.xml
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java [new file with mode: 0644]
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java

index 9fa3d33..d2c39e9 100644 (file)
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2690. Add critical path analyser
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
   TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
index ba676a2..ccec0db 100644 (file)
@@ -19,7 +19,9 @@
 package org.apache.tez.history.parser.datamodel;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.common.ATSConstants;
@@ -29,6 +31,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import java.util.Comparator;
 import java.util.Map;
 
 import static org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -53,6 +56,7 @@ public class TaskAttemptInfo extends BaseInfo {
   private final long lastDataEventTime;
   private final String lastDataEventSourceTA;
   private final String terminationCause;
+  private final long executionTimeInterval;
 
   private TaskInfo taskInfo;
 
@@ -88,6 +92,17 @@ public class TaskAttemptInfo extends BaseInfo {
         otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
     terminationCause = StringInterner
         .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+    executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
+  }
+  
+  public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override
+      public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getAllocationTime() < o2.getAllocationTime() ? -1
+            : o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0);
+      }
+    });
   }
 
   void setTaskInfo(TaskInfo taskInfo) {
@@ -105,6 +120,22 @@ public class TaskAttemptInfo extends BaseInfo {
   public final long getFinishTimeInterval() {
     return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
   }
+  
+  public final long getExecutionTimeInterval() {
+    return executionTimeInterval;
+  }
+
+  public final long getAllocationToEndTimeInterval() {
+    return (endTime - allocationTime);
+  }
+  
+  public final long getAllocationToStartTimeInterval() {
+    return (startTime - allocationTime);
+  }
+  
+  public final long getCreationToAllocationTimeInterval() {
+    return (allocationTime - creationTime);
+  }
 
   public final long getStartTime() {
     return startTime;
@@ -141,6 +172,11 @@ public class TaskAttemptInfo extends BaseInfo {
   public final long getAllocationTime() {
     return allocationTime;
   }
+  
+  public final String getShortName() {
+    return getTaskInfo().getVertexInfo().getVertexName() + " : " + 
+    taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1);
+  }
 
   @Override
   public final String getDiagnostics() {
@@ -169,6 +205,13 @@ public class TaskAttemptInfo extends BaseInfo {
     }
     return false;
   }
+  
+  public final String getDetailedStatus() {
+    if (!Strings.isNullOrEmpty(getTerminationCause())) {
+      return getStatus() + ":" + getTerminationCause();
+    }
+    return getStatus();
+  }
 
   public final TezCounter getLocalityInfo() {
     Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
index 5e63efa..a30d311 100644 (file)
@@ -28,6 +28,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 
+import org.apache.directory.api.util.Strings;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.codehaus.jettison.json.JSONException;
@@ -204,6 +205,14 @@ public class TaskInfo extends BaseInfo {
    * @return TaskAttemptInfo
    */
   public final TaskAttemptInfo getSuccessfulTaskAttempt() {
+    if (Strings.isNotEmpty(getSuccessfulAttemptId())) {
+      for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+        if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
+          return attemptInfo;
+        }
+      }
+    }
+    // fall back to checking status if successful attemt id is not available
     for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
       if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
         return attemptInfo;
index 6e227a5..35da2d4 100644 (file)
@@ -74,6 +74,8 @@ public class VertexInfo extends BaseInfo {
 
   private final List<AdditionalInputOutputDetails> additionalInputInfoList;
   private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
+  
+  private long avgExecutionTimeInterval = -1;
 
   private DagInfo dagInfo;
 
@@ -143,7 +145,7 @@ public class VertexInfo extends BaseInfo {
     this.additionalInputInfoList.clear();
     this.additionalInputInfoList.addAll(additionalInputInfoList);
   }
-
+  
   void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) {
     this.additionalOutputInfoList.clear();
     this.additionalOutputInfoList.addAll(additionalOutputInfoList);
@@ -192,6 +194,22 @@ public class VertexInfo extends BaseInfo {
     }
     return getLastTaskToFinish().getFinishTimeInterval();
   }
+  
+  public final long getAvgExecutionTimeInterval() {
+    if (avgExecutionTimeInterval == -1) {
+      long totalExecutionTime = 0;
+      long totalAttempts = 0;
+      for (TaskInfo task : getTasks()) {
+        TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt();
+        totalExecutionTime += attempt.getExecutionTimeInterval();
+        totalAttempts++;
+      }
+      if (totalAttempts > 0) {
+        avgExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
+      }
+    }
+    return avgExecutionTimeInterval;
+  }
 
   public final long getStartTime() {
     return startTime;
index 36b12fe..543ba1b 100644 (file)
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.plutext</groupId>
-      <artifactId>jaxb-svg11</artifactId>
-      <version>1.0.2</version>
-    </dependency>
   </dependencies>
 
   <build>
     <plugins>
+    <plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+     <artifactId>maven-jar-plugin</artifactId>
+      <configuration>
+       <archive>
+         <manifest>
+           <mainClass>org.apache.tez.analyzer.plugins.AnalyzerDriver</mainClass>
+         </manifest>
+       </archive>
+     </configuration>
+    </plugin>
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
new file mode 100644 (file)
index 0000000..33dbead
--- /dev/null
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+public class AnalyzerDriver {
+
+  public static void main(String argv[]){
+    int exitCode = -1;
+    ProgramDriver pgd = new ProgramDriver();
+    try {
+      pgd.addClass("CriticalPath", CriticalPathAnalyzer.class,
+          "Find the critical path of a DAG");
+      exitCode = pgd.run(argv);
+    } catch(Throwable e){
+      e.printStackTrace();
+    }
+
+    System.exit(exitCode);
+  }
+
+}
\ No newline at end of file
index 88d45f3..448e785 100644 (file)
 
 package org.apache.tez.analyzer.plugins;
 
-import com.google.common.base.Functions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.analyzer.Analyzer;
 import org.apache.tez.analyzer.CSVResult;
-import org.apache.tez.analyzer.utils.Utils;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
+import org.apache.tez.analyzer.utils.SVGUtils;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.Container;
 import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Identify a set of vertices which fall in the critical path in a DAG.
- */
-public class CriticalPathAnalyzer implements Analyzer {
-  private final Configuration config;
-
-  private static final String[] headers = { "CriticalPath", "Score" };
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
-  private final CSVResult csvResult;
+public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
 
-  private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
-  private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+  String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
+  String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name());
 
-  private final String dotFileLocation;
+  private final static String DATA_DEPENDENCY = "Data-Dependency";
+  private final static String INIT_DEPENDENCY = "Init-Dependency";
+  private final static String COMMIT_DEPENDENCY = "Commit-Dependency";
+  private final static String NON_DATA_DEPENDENCY = "Non-Data-Dependency";
+  private final static String OUTPUT_LOST = "Previous version outputs lost";
 
-  private static final String CONNECTOR = "-->";
+  public static class CriticalPathStep {
+    public enum EntityType {
+      ATTEMPT,
+      VERTEX_INIT,
+      DAG_COMMIT
+    }
 
-  public CriticalPathAnalyzer(Configuration config) {
-    this.config = config;
-    this.csvResult = new CSVResult(headers);
-    this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
+    EntityType type;
+    TaskAttemptInfo attempt;
+    String reason; // reason linking this to the previous step on the critical path
+    long startCriticalPathTime; // time at which attempt is on critical path
+    long stopCriticalPathTime; // time at which attempt is off critical path
+    List<String> notes = Lists.newLinkedList();
+    
+    public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) {
+      this.type = type;
+      this.attempt = attempt;
+    }
+    public EntityType getType() {
+      return type;
+    }
+    public TaskAttemptInfo getAttempt() {
+      return attempt;
+    }
+    public long getStartCriticalTime() {
+      return startCriticalPathTime;
+    }
+    public long getStopCriticalTime() {
+      return stopCriticalPathTime;
+    }
+    public String getReason() {
+      return reason;
+    }
+    public List<String> getNotes() {
+      return notes;
+    }
   }
+  
+  List<CriticalPathStep> criticalPath = Lists.newLinkedList();
+  
+  Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
 
-  @Override public void analyze(DagInfo dagInfo) throws TezException {
-    Map<String, Long> result = Maps.newLinkedHashMap();
-    getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+  public CriticalPathAnalyzer() {
+  }
 
-    Map<String, Long> sortedByValues = sortByValues(result);
-    for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
-      List<String> record = Lists.newLinkedList();
-      record.add(entry.getKey());
-      record.add(entry.getValue() + "");
-      csvResult.addRecord(record.toArray(new String[record.size()]));
+  @Override 
+  public void analyze(DagInfo dagInfo) throws TezException {
+    // get all attempts in the dag and find the last failed/succeeded attempt.
+    // ignore killed attempt to handle kills that happen upon dag completion
+    TaskAttemptInfo lastAttempt = null;
+    long lastAttemptFinishTime = 0;
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+        attempts.put(attempt.getTaskAttemptId(), attempt);
+        if (attempt.getStatus().equals(succeededState) ||
+            attempt.getStatus().equals(failedState)) {
+          if (lastAttemptFinishTime < attempt.getFinishTime()) {
+            lastAttempt = attempt;
+            lastAttemptFinishTime = attempt.getFinishTime();
+          }
+        }
+      }
+    }
+    
+    if (lastAttempt == null) {
+      System.out.println("Cannot find last attempt to finish in DAG " + dagInfo.getDagId());
+      return;
     }
+    
+    createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, attempts);
+    
+    analyzeCriticalPath(dagInfo);
 
-    String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
-    try {
-      List<String> criticalVertices = null;
-      if (!sortedByValues.isEmpty()) {
-        String criticalPath = sortedByValues.keySet().iterator().next();
-        criticalVertices = getVertexNames(criticalPath);
-      } else {
-        criticalVertices = Lists.newLinkedList();
+    saveCriticalPathAsSVG(dagInfo);
+  }
+  
+  private void saveCriticalPathAsSVG(DagInfo dagInfo) {
+    SVGUtils svg = new SVGUtils();
+    String outputFileName = getOutputDir() + File.separator + dagInfo.getDagId() + ".svg";
+    System.out.println("Writing output to: " + outputFileName);
+    svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
+  }
+  
+  private void analyzeCriticalPath(DagInfo dag) {
+    if (!criticalPath.isEmpty()) {
+      System.out.println("Walking critical path for dag " + dag.getDagId());
+      long dagStartTime = dag.getStartTime();
+      long dagTime = dag.getFinishTime() - dagStartTime;
+      long totalAttemptCriticalTime = 0;
+      for (int i = 0; i < criticalPath.size(); ++i) {
+        CriticalPathStep step = criticalPath.get(i);
+        totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
+        TaskAttemptInfo attempt = step.attempt;
+        if (step.getType() == EntityType.ATTEMPT) {
+          // analyze execution overhead
+          long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
+              .getAvgExecutionTimeInterval();
+          if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
+            step.notes
+                .add("Potential straggler. Execution time " + attempt.getExecutionTimeInterval()
+                    + " compared to vertex average of " + avgExecutionTime);
+          }
+          
+          if (attempt.getStartTime() > step.startCriticalPathTime) {
+            // the attempt is critical before launching. So allocation overhead needs analysis
+            // analyzer allocation overhead
+            Container container = attempt.getContainer();
+            if (container != null) {
+              Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
+              if (attempts != null && !attempts.isEmpty()) {
+                // arrange attempts by allocation time
+                List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
+                Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
+                // walk the list to record allocation time before the current attempt
+                long containerPreviousAllocatedTime = 0;
+                for (TaskAttemptInfo containerAttempt : attemptsList) {
+                  if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
+                    break;
+                  }
+                  System.out.println("Container: " + container.getId() + " running att: " + 
+                  containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
+                  containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
+                }
+                if (containerPreviousAllocatedTime == 0) {
+                  step.notes.add("Container " + container.getId() + " newly allocated.");
+                } else {
+                  if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
+                    step.notes.add("Container " + container.getId() + " was fully allocated");
+                  } else {
+                    step.notes.add("Container " + container.getId() + " allocated for " + 
+                    SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
+                        SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + 
+                        " of allocation wait time");
+                  }
+                }
+              }
+            }
+          }
+        }
       }
-      Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
-    } catch (IOException e) {
-      throw new TezException(e);
+      System.out
+          .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
+              + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
     }
   }
+  
+  private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt,
+      long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) {
+    List<CriticalPathStep> tempCP = Lists.newLinkedList();
+    if (lastAttempt != null) {
+      TaskAttemptInfo currentAttempt = lastAttempt;
+      CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT);
+      long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
+
+      // add the commit step
+      currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+      currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
+      currentStep.reason = COMMIT_DEPENDENCY;
+      tempCP.add(currentStep);
 
+      while (true) {
+        Preconditions.checkState(currentAttempt != null);
+        Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
+        System.out.println(
+            "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
+        currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
+        currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+        tempCP.add(currentStep);
+  
+        // find the next attempt on the critical path
+        boolean dataDependency = false;
+        // find out predecessor dependency
+        if (currentAttempt.getLastDataEventTime() > currentAttempt.getCreationTime()) {
+          dataDependency = true;
+        }
+  
+        long startCriticalPathTime = 0;
+        String nextAttemptId = null;
+        String reason = null;
+        if (dataDependency) {
+          // last data event was produced after the attempt was scheduled. use
+          // data dependency
+          // typically case when scheduling ahead of time
+          System.out.println("Has data dependency");
+          if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+            // there is a valid data causal TA. Use it.
+            nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+            reason = DATA_DEPENDENCY;
+            startCriticalPathTime = currentAttempt.getLastDataEventTime();
+            System.out.println("Using data dependency " + nextAttemptId);
+          } else {
+            // there is no valid data causal TA. This means data event came from the same vertex
+            VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo();
+            Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(),
+                "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
+                    + "TA is null for " + currentAttempt.getTaskAttemptId());
+            nextAttemptId = null;
+            reason = INIT_DEPENDENCY;
+            System.out.println("Using init dependency");
+          }
+        } else {
+          // attempt was scheduled after last data event. use scheduling dependency
+          // typically happens for retries
+          System.out.println("Has scheduling dependency");
+          if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
+            // there is a scheduling causal TA. Use it.
+            nextAttemptId = currentAttempt.getCreationCausalTA();
+            reason = NON_DATA_DEPENDENCY;
+            TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
+            if (nextAttempt != null) {
+              VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
+              VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
+              if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
+                // cause from different vertex. Might be rerun to re-generate outputs
+                for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
+                  if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
+                    // next vertex is an output vertex
+                    reason = OUTPUT_LOST;
+                    break;
+                  }
+                }
+              }
+            }
+            startCriticalPathTime = currentAttempt.getCreationTime();
+            System.out.println("Using scheduling dependency " + nextAttemptId);
+          } else {
+            // there is no scheduling causal TA.
+            if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+              // there is a data event going to the vertex. Count the time between data event and
+              // scheduling time as Initializer/Manager overhead and follow data dependency
+              nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+              reason = DATA_DEPENDENCY;
+              startCriticalPathTime = currentAttempt.getLastDataEventTime();
+              long overhead = currentAttempt.getCreationTime()
+                  - currentAttempt.getLastDataEventTime();
+              currentStep.notes
+                  .add("Initializer/VertexManager scheduling overhead " + overhead + " ms");
+              System.out.println("Using data dependency " + nextAttemptId);
+            } else {
+              // there is no scheduling causal TA and no data event casual TA.
+              // the vertex has external input that sent the last data events
+              // or the vertex has external input but does not use events
+              // or the vertex has no external inputs or edges
+              nextAttemptId = null;
+              reason = INIT_DEPENDENCY;
+              System.out.println("Using init dependency");
+            }
+          }
+        }
+  
+        currentStep.startCriticalPathTime = startCriticalPathTime;
+        currentStep.reason = reason;
+  
+        if (Strings.isNullOrEmpty(nextAttemptId)) {
+          Preconditions.checkState(reason.equals(INIT_DEPENDENCY));
+          Preconditions.checkState(startCriticalPathTime == 0);
+          // no predecessor attempt found. this is the last step in the critical path
+          // assume attempts start critical path time is when its scheduled. before that is 
+          // vertex initialization time
+          currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime();
+          
+          // add vertex init step
+          long initStepStopCriticalTime = currentStep.startCriticalPathTime;
+          currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
+          currentStep.stopCriticalPathTime = initStepStopCriticalTime;
+          currentStep.startCriticalPathTime = dagInfo.getStartTime();
+          currentStep.reason = INIT_DEPENDENCY;
+          tempCP.add(currentStep);
+          
+          if (!tempCP.isEmpty()) {
+            for (int i=tempCP.size() - 1; i>=0; --i) {
+              criticalPath.add(tempCP.get(i));
+            }
+          }
+          return;
+        }
+  
+        currentAttempt = attempts.get(nextAttemptId);
+        currentAttemptStopCriticalPathTime = startCriticalPathTime;
+      }
+    }
+  }
+  
   @Override
   public CSVResult getResult() throws TezException {
+    String[] headers = { "Entity", "PathReason", "Status", "CriticalStartTime", 
+        "CriticalStopTime", "Notes" };
+
+    CSVResult csvResult = new CSVResult(headers);
+    for (CriticalPathStep step : criticalPath) {
+      String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId()
+          : (step.getType() == EntityType.VERTEX_INIT
+              ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT"));
+      String [] record = {entity, step.getReason(), 
+          step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), 
+          String.valueOf(step.getStopCriticalTime()),
+          Joiner.on(";").join(step.getNotes())};
+      csvResult.addRecord(record);
+    }
     return csvResult;
   }
 
@@ -105,38 +369,12 @@ public class CriticalPathAnalyzer implements Analyzer {
 
   @Override
   public Configuration getConfiguration() {
-    return config;
+    return getConf();
   }
-
-  private static Map<String, Long> sortByValues(Map<String, Long> result) {
-    //Sort result by time in reverse order
-    final Ordering<String> reversValueOrdering =
-        Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
-    Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
-    return orderedMap;
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
+    System.exit(res);
   }
 
-  private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
-      Map<String, Long> result) {
-    String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
-
-    if (dest != null) {
-      time += dest.getTimeTaken();
-      predecessor += destVertexName + CONNECTOR;
-
-      for (VertexInfo incomingVertex : dest.getInputVertices()) {
-        getCriticalPath(predecessor, incomingVertex, time, result);
-      }
-
-      result.put(predecessor, time);
-    }
-  }
-
-  private static List<String> getVertexNames(String criticalPath) {
-    if (Strings.isNullOrEmpty(criticalPath)) {
-      return Lists.newLinkedList();
-    }
-    return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
-        (criticalPath));
-  }
 }
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
new file mode 100644 (file)
index 0000000..3eb2f57
--- /dev/null
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+import com.google.common.base.Preconditions;
+
+public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer {
+
+  
+  private static final String ATS_FILE_NAME = "atsFileName";
+  private static final String OUTPUT_DIR = "outputDir";
+  private static final String DAG_ID = "dagId";
+  private static final String HELP = "help";
+
+  private String outputDir;
+  
+  @SuppressWarnings("static-access")
+  private static Options buildOptions() {
+    Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+        .withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create();
+
+    Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR)
+        .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create();
+
+    Option inputATSFileNameOption = OptionBuilder.withArgName(ATS_FILE_NAME).withLongOpt
+        (ATS_FILE_NAME)
+        .withDescription("File with ATS data for the DAG").hasArg()
+        .isRequired(true).create();
+    Option help = OptionBuilder.withArgName(HELP).withLongOpt
+        (HELP)
+        .withDescription("print help")
+        .isRequired(false).create();
+
+    Options opts = new Options();
+    opts.addOption(dagIdOption);
+    opts.addOption(outputDirOption);
+    opts.addOption(inputATSFileNameOption);
+    opts.addOption(help);
+    return opts;
+  }
+  
+  protected String getOutputDir() {
+    return outputDir;
+  }
+  
+  private void printUsage() {
+    System.err.println("Analyzer base options are");
+    Options options = buildOptions();
+    for (Object obj : options.getOptions()) {
+      Option option = (Option) obj;
+      System.err.println(option.getArgName() + " : " + option.getDescription());
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    //Parse downloaded contents
+    CommandLine cmdLine = null;
+    try {
+      cmdLine = new GnuParser().parse(buildOptions(), args);
+    } catch (ParseException e) {
+      System.err.println("Invalid options on command line");
+      printUsage();
+      return -1;
+    }
+    
+    if(cmdLine.hasOption(HELP)) {
+      printUsage();
+      return 0;
+    }
+
+    outputDir = cmdLine.getOptionValue(OUTPUT_DIR);
+    if (outputDir == null) {
+      outputDir = System.getProperty("user.dir");
+    }
+    
+    File file = new File(cmdLine.getOptionValue(ATS_FILE_NAME));
+    String dagId = cmdLine.getOptionValue(DAG_ID);
+    
+    ATSFileParser parser = new ATSFileParser(file);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    Preconditions.checkState(dagInfo.getDagId().equals(dagId));
+    analyze(dagInfo);
+    return 0;
+  }
+
+}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
new file mode 100644 (file)
index 0000000..9661ea3
--- /dev/null
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify a set of vertices which fall in the critical path in a DAG.
+ */
+public class VertexLevelCriticalPathAnalyzer implements Analyzer {
+  private final Configuration config;
+
+  private static final String[] headers = { "CriticalPath", "Score" };
+
+  private final CSVResult csvResult;
+
+  private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+  private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+  private final String dotFileLocation;
+
+  private static final String CONNECTOR = "-->";
+
+  public VertexLevelCriticalPathAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+    this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
+  }
+
+  @Override public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Long> result = Maps.newLinkedHashMap();
+    getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+
+    Map<String, Long> sortedByValues = sortByValues(result);
+    for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
+      List<String> record = Lists.newLinkedList();
+      record.add(entry.getKey());
+      record.add(entry.getValue() + "");
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+
+    String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+    try {
+      List<String> criticalVertices = null;
+      if (!sortedByValues.isEmpty()) {
+        String criticalPath = sortedByValues.keySet().iterator().next();
+        criticalVertices = getVertexNames(criticalPath);
+      } else {
+        criticalVertices = Lists.newLinkedList();
+      }
+      Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+    } catch (IOException e) {
+      throw new TezException(e);
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "CriticalPathAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze vertex level critical path of the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  private static Map<String, Long> sortByValues(Map<String, Long> result) {
+    //Sort result by time in reverse order
+    final Ordering<String> reversValueOrdering =
+        Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
+    Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
+    return orderedMap;
+  }
+
+  private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
+      Map<String, Long> result) {
+    String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
+
+    if (dest != null) {
+      time += dest.getTimeTaken();
+      predecessor += destVertexName + CONNECTOR;
+
+      for (VertexInfo incomingVertex : dest.getInputVertices()) {
+        getCriticalPath(predecessor, incomingVertex, time, result);
+      }
+
+      result.put(predecessor, time);
+    }
+  }
+
+  private static List<String> getVertexNames(String criticalPath) {
+    if (Strings.isNullOrEmpty(criticalPath)) {
+      return Lists.newLinkedList();
+    }
+    return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+        (criticalPath));
+  }
+}
index 4a582bb..50fe033 100644 (file)
 package org.apache.tez.analyzer.utils;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.commons.io.output.FileWriterWithEncoding;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
-import org.apache.tez.history.parser.datamodel.TaskInfo;
-import org.apache.tez.history.parser.datamodel.VertexInfo;
-import org.plutext.jaxb.svg11.Line;
-import org.plutext.jaxb.svg11.ObjectFactory;
-import org.plutext.jaxb.svg11.Svg;
-import org.plutext.jaxb.svg11.Title;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.namespace.QName;
-import java.io.BufferedReader;
+import com.google.common.base.Joiner;
+
 import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.TreeSet;
+import java.text.DecimalFormat;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 public class SVGUtils {
 
-  private static final String UTF8 = "UTF-8";
-
-  private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class);
-
-
-  private final ObjectFactory objectFactory;
-  private final Svg svg;
-  private final QName titleName = new QName("title");
-
   private static int MAX_DAG_RUNTIME = 0;
   private static final int SCREEN_WIDTH = 1800;
 
-  private final DagInfo dagInfo;
-
-  //Gap between various components
-  private static final int DAG_GAP = 70;
-  private static final int VERTEX_GAP = 50;
-  private static final int TASK_GAP = 5;
-  private static final int STROKE_WIDTH = 5;
-
-  //To compute the size of the graph.
-  private long MIN_X = Long.MAX_VALUE;
-  private long MAX_X = Long.MIN_VALUE;
+  public SVGUtils() {    
+  }
 
-  private int x1 = 0;
-  private int y1 = 0;
-  private int y2 = 0;
+  private int Y_MAX;
+  private int X_MAX;
+  private static final DecimalFormat secondFormat = new DecimalFormat("#.##");
+  private static final int X_BASE = 100;
+  private static final int Y_BASE = 100;
+  private static final int TICK = 1;
+  private static final int STEP_GAP = 50;
+  private static final int TEXT_SIZE = 20;
+  private static final String RUNTIME_COLOR = "LightGreen";
+  private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod";
+  private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon";
+  private static final String BORDER_COLOR = "Sienna";
+  private static final String VERTEX_INIT_COMMIT_COLOR = "orange";
+  private static final String CRITICAL_COLOR = "IndianRed";
+  private static final float RECT_OPACITY = 1.0f;
+  private static final String TITLE_BR = "&#13;";
 
-  public SVGUtils(DagInfo dagInfo) {
-    this.dagInfo = dagInfo;
-    this.objectFactory = new ObjectFactory();
-    this.svg = objectFactory.createSvg();
+  public static String getTimeStr(final long millis) {
+    long minutes = TimeUnit.MILLISECONDS.toMinutes(millis)
+            - TimeUnit.HOURS.toMinutes(TimeUnit.MILLISECONDS.toHours(millis));
+    long hours = TimeUnit.MILLISECONDS.toHours(millis);
+    StringBuilder b = new StringBuilder();
+    b.append(hours == 0 ? "" : String.valueOf(hours) + "h");
+    b.append(minutes == 0 ? "" : String.valueOf(minutes) + "m");
+    long seconds = millis - TimeUnit.MINUTES.toMillis(
+        TimeUnit.MILLISECONDS.toMinutes(millis));
+    b.append(secondFormat.format(seconds/1000.0) + "s");
+    
+    return b.toString(); 
   }
-
-  private Line createLine(int x1, int y1, int x2, int y2) {
-    Line line = objectFactory.createLine();
-    line.setX1(scaleDown(x1) + "");
-    line.setY1(y1 + "");
-    line.setX2(scaleDown(x2) + "");
-    line.setY2(y2 + "");
-    return line;
+  
+  List<String> svgLines = new LinkedList<>();
+  
+  private final int addOffsetX(int x) {
+    int xOff = x + X_BASE;
+    X_MAX = Math.max(X_MAX, xOff);
+    return xOff;
   }
-
-  private Title createTitle(String msg) {
-    Title t = objectFactory.createTitle();
-    t.setContent(msg);
-    return t;
+  
+  private final int addOffsetY(int y) {
+    int yOff = y + Y_BASE;
+    Y_MAX = Math.max(Y_MAX, yOff);
+    return yOff;
   }
-
-  private Title createTitleForVertex(VertexInfo vertex) {
-    String titleStr = vertex.getVertexName() + ":"
-        + (vertex.getFinishTimeInterval())
-        + " ms, RelativeTimeToDAG:"
-        + (vertex.getInitTime() - this.dagInfo.getStartTime())
-        + " ms, counters:" + vertex.getTezCounters();
-    Title title = createTitle(titleStr);
-    return title;
+  
+  private int scaleDown(int len) {
+    return Math.round((len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH);
   }
-
-  private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) {
-    String titleStr = "RelativeTimeToVertex:"
-        + (taskAttemptInfo.getStartTime() -
-        taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) +
-        " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters();
-    Title title = createTitle(titleStr);
-    return title;
+  
+  private void addRectStr(int x, int width, int y, int height, 
+      String fillColor, String borderColor, float opacity, String title) {
+    String rectStyle = "stroke: " + borderColor + "; fill: " + fillColor + "; opacity: " + opacity;
+    String rectStr = "<rect x=\"" + addOffsetX(scaleDown(x)) + "\""
+               + " y=\"" + addOffsetY(y) + "\""
+               + " width=\"" + scaleDown(width) + "\""
+               + " height=\"" + height + "\""
+               + " style=\"" + rectStyle + "\""
+               + " >"
+               + " <title>" + title +"</title>"
+               + " </rect>";
+    svgLines.add(rectStr);    
   }
-
-  /**
-   * Draw DAG from dagInfo
-   *
-   * @param dagInfo
-   */
-  private void drawDAG(DagInfo dagInfo) {
-    Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms");
-    int duration = (int) dagInfo.getFinishTimeInterval();
-    MAX_DAG_RUNTIME = duration;
-    MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X);
-    MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X);
-    Line line = createLine(x1, y1, x1 + duration, y2);
-    line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title));
-    line.setStyle("stroke: black; stroke-width:20");
-    line.setOpacity("0.3");
-    svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
-    drawVertex();
+  
+  private void addTextStr(int x, int y, String text, String anchor, int size, String title) {
+    String textStyle = "text-anchor: " + anchor + "; font-size: " + size + "px;";
+    String textStr = "<text x=\"" + addOffsetX(scaleDown(x)) + "\" "
+        + "y=\"" + addOffsetY(y) + "\" "
+        + "style=\"" + textStyle + "\" transform=\"\">"
+        + text
+        + " <title>" + title +"</title>"
+        + "</text>";
+    svgLines.add(textStr);
   }
-
-  private Collection<VertexInfo> getSortedVertices() {
-    Collection<VertexInfo> vertices = this.dagInfo.getVertices();
-    // Add corresponding vertex details
-    TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>(
-        new Comparator<VertexInfo>() {
-          @Override
-          public int compare(VertexInfo o1, VertexInfo o2) {
-            return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval());
-          }
-        });
-    vertexSet.addAll(vertices);
-    return  vertexSet;
+  
+  private void addLineStr(int x1, int y1, int x2, int y2, String color, String title, int width) {
+    String style = "stroke: " + color + "; stroke-width:" + width;
+    String str = "<line x1=\"" + addOffsetX(scaleDown(x1)) + "\""
+               + " y1=\"" + addOffsetY(y1) + "\""
+               + " x2=\"" + addOffsetX(scaleDown(x2)) + "\""
+               + " y2=\"" + addOffsetY(y2) + "\""
+               + " style=\"" + style + "\""
+               + " >"
+               + " <title>" + title +"</title>"
+               + " </line>";
+    svgLines.add(str);
   }
-
-  private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) {
-    Collection<TaskInfo> tasks = vertexInfo.getTasks();
-    // Add corresponding task details
-    TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() {
-      @Override
-      public int compare(TaskInfo o1, TaskInfo o2) {
-        return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval()
-            - o2.getSuccessfulTaskAttempt().getStartTimeInterval());
+  
+  public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) {
+    if (step.getType() != EntityType.ATTEMPT) {
+      // draw initial vertex or final commit overhead
+      StringBuilder title = new StringBuilder();
+      String text = null;
+      if (step.getType() == EntityType.VERTEX_INIT) {
+        String vertex = step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(); 
+        text = vertex + " : Init";
+        title.append(text).append(TITLE_BR);
+      } else {
+        text = "Output Commit";
+        title.append(text).append(TITLE_BR);
       }
-    });
-    taskSet.addAll(tasks);
-    return taskSet;
-  }
+      title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR);
+      title.append(
+          "Critical Time: " + getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime()))
+          .append("");
+      title.append(Joiner.on(TITLE_BR).join(step.getNotes()));
+      String titleStr = title.toString();
+      int stopTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+      int startTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
+      addRectStr(startTimeInterval,
+          (stopTimeInterval - startTimeInterval), yOffset * STEP_GAP, STEP_GAP,
+          VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+      addTextStr((stopTimeInterval + startTimeInterval) / 2,
+          (yOffset * STEP_GAP + STEP_GAP / 2),
+          text, "middle",
+          TEXT_SIZE, titleStr);
+    } else {
+      TaskAttemptInfo attempt = step.getAttempt();
+      int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
+      int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+      int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime);
+      int allocationTimeInterval = (int) (attempt.getAllocationTime() - dagStartTime);
+      int launchTimeInterval = (int) (attempt.getStartTime() - dagStartTime);
+      int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime);
+      System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " "
+          + allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval);
+
+      StringBuilder title = new StringBuilder();
+      title.append("Attempt: " + attempt.getTaskAttemptId()).append(TITLE_BR);
+      title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR);
+      title.append("Completion Status: " + attempt.getDetailedStatus()).append(TITLE_BR);
+      title.append(
+          "Critical Time Contribution: " + 
+              getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())).append(TITLE_BR);
+      title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR);
+      title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR);
+      title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR);
+      title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR);
+      title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR);
+      title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR);
+      title.append(Joiner.on(TITLE_BR).join(step.getNotes()));
+      String titleStr = title.toString();
 
-  /**
-   * Draw the vertices
-   *
-   */
-  public void drawVertex() {
-    Collection<VertexInfo> vertices = getSortedVertices();
-    for (VertexInfo vertex : vertices) {
-      //Set vertex start time as the one when its first task attempt started executing
-      x1 = (int) vertex.getStartTimeInterval();
-      y1 += VERTEX_GAP;
-      int duration = ((int) (vertex.getTimeTaken()));
-      Line line = createLine(x1, y1, x1 + duration, y1);
-      line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH);
-      line.setOpacity("0.3");
+      addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval,
+          yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+          titleStr);
 
-      Title vertexTitle = createTitleForVertex(vertex);
-      line.getSVGDescriptionClass().add(
-          new JAXBElement<Title>(titleName, Title.class, vertexTitle));
-      svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
-      // For each vertex, draw the tasks
-      drawTask(vertex);
+      addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
+          yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+          titleStr);
+
+      addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
+          STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+
+      addTextStr((finishTimeInterval + creationTimeInterval) / 2,
+          (yOffset * STEP_GAP + STEP_GAP / 2),   attempt.getShortName(), "middle", TEXT_SIZE, titleStr);
     }
-    x1 = x1 + (int) dagInfo.getFinishTimeInterval();
-    y1 = y1 + DAG_GAP;
-    y2 = y1;
   }
 
-  /**
-   * Draw tasks
-   *
-   * @param vertex
-   */
-  public void drawTask(VertexInfo vertex) {
-    Collection<TaskInfo> tasks = getSortedTasks(vertex);
-    for (TaskInfo task : tasks) {
-      for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) {
-        x1 = (int) taskAttemptInfo.getStartTimeInterval();
-        y1 += TASK_GAP;
-        int duration = (int) taskAttemptInfo.getTimeTaken();
-        Line line = createLine(x1, y1, x1 + duration, y1);
-        String color =
-            taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name())
-                ? "green" : "red";
-        line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH);
-        Title title = createTitleForTaskAttempt(taskAttemptInfo);
-        line.getSVGDescriptionClass().add(
-            new JAXBElement<Title>(titleName, Title.class, title));
-        svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass()
-            .add(line);
+  private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) {
+    int duration = (int) dagInfo.getFinishTimeInterval();
+    MAX_DAG_RUNTIME = duration;
+    long dagStartTime = dagInfo.getStartTime();
+    int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time
+    int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime);
+    
+    // draw grid
+    addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK);
+    int yGrid = (criticalPath.size() + 2)*STEP_GAP;
+    for (int i=0; i<11; ++i) {
+      int x = Math.round(((dagFinishTimeInterval - dagStartTimeInterval)/10.0f)*i);
+      addLineStr(x, 0, x, yGrid, BORDER_COLOR, "", TICK);  
+      addTextStr(x, 0, getTimeStr(x), "left", TEXT_SIZE, "");
+    }
+    addLineStr(dagStartTimeInterval, yGrid, dagFinishTimeInterval, yGrid, BORDER_COLOR, "", TICK);
+    addTextStr((dagFinishTimeInterval + dagStartTimeInterval) / 2, yGrid + STEP_GAP,
+        "Critical Path for " + dagInfo.getName() + " (" + dagInfo.getDagId() + ")", "middle",
+        TEXT_SIZE, "");
+
+    // draw steps
+    for (int i=1; i<=criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i-1); 
+      drawStep(step, dagStartTime, i);      
+    }
+    
+    // draw critical path on top
+    for (int i=1; i<=criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i-1); 
+      boolean isLast = i == criticalPath.size(); 
+      
+      // draw critical path for step
+      int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
+      int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+      addLineStr(startCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval,
+          (i + 1) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5);
+      
+      if (isLast) {
+        // last step. add commit overhead
+        int stepStopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+        addLineStr(stepStopCriticalTimeInterval, (i + 1) * STEP_GAP, dagFinishTimeInterval,
+            (i + 1) * STEP_GAP, CRITICAL_COLOR,
+            "Critical Time " + step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(), TICK*5);
+      } else {
+        // connect to next step in critical path
+        addLineStr(stopCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval,
+            (i + 2) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5);
       }
     }
+    
+    // draw legend
+    int legendX = 0;
+    int legendY = (criticalPath.size() + 2) * STEP_GAP;
+    int legendWidth = 10000;
+    
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/2, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "");
+    legendY += STEP_GAP;
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/2, "Task Allocation Overhead", "left", TEXT_SIZE, "");
+    legendY += STEP_GAP;
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/2, "Task Launch Overhead", "left", TEXT_SIZE, "");
+    legendY += STEP_GAP;
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/2, "Task Execution Time", "left", TEXT_SIZE, "");
+    
+    Y_MAX += Y_BASE*2;
+    X_MAX += X_BASE*2;
   }
-
-  /**
-   * Convert DAG to graph
-   *
-   * @throws java.io.IOException
-   * @throws javax.xml.bind.JAXBException
-   */
-  public void saveAsSVG(String fileName) throws IOException, JAXBException {
-    drawDAG(dagInfo);
-    svg.setHeight("" + y2);
-    svg.setWidth("" + (MAX_X - MIN_X));
-    String tempFileName = System.nanoTime() + ".svg";
-    File file = new File(tempFileName);
-    JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class);
-    Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
-    jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
-    jaxbMarshaller.marshal(svg, file);
-    //TODO: dirty workaround to get rid of XMLRootException issue
-    BufferedReader reader = new BufferedReader(
-        new InputStreamReader(new FileInputStream(file), UTF8));
-    BufferedWriter writer = new BufferedWriter(
-        new OutputStreamWriter(new FileOutputStream(fileName), UTF8));
+  
+  public void saveCriticalPathAsSVG(DagInfo dagInfo, 
+      String fileName, List<CriticalPathStep> criticalPath) {
+    drawCritical(dagInfo, criticalPath);
+    saveFileStr(fileName);
+  }
+  
+  private void saveFileStr(String fileName) {
+    String header = "<?xml version=\"1.0\" standalone=\"no\"?> "
+        + "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\" "
+        + "\"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">"
+        + "<svg xmlns=\"http://www.w3.org/2000/svg\" version=\"1.1\" "
+        + "xmlns:xlink=\"http://www.w3.org/1999/xlink\" "
+        + "height=\"" + Y_MAX + "\" "
+        + "width=\""  + X_MAX + "\"> "
+        + "<script type=\"text/ecmascript\" "
+        + "xlink:href=\"http://code.jquery.com/jquery-2.1.1.min.js\" />";
+    String footer = "</svg>";
+    String newline = System.getProperty("line.separator");
+    BufferedWriter writer = null;
     try {
-      while (reader.ready()) {
-        String line = reader.readLine();
-        if (line != null) {
-          line = line.replaceAll(
-              " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", "");
-          writer.write(line);
-          writer.newLine();
-        }
+      writer = new BufferedWriter(new FileWriterWithEncoding(fileName, "UTF-8"));
+      writer.write(header);
+      writer.write(newline);
+      for (String str : svgLines) {
+        writer.write(str);
+        writer.write(newline);
       }
+      writer.write(footer);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     } finally {
-      IOUtils.closeQuietly(reader);
-      IOUtils.closeQuietly(writer);
-      if (file.exists()) {
-        boolean deleted = file.delete();
-        LOG.debug("Deleted {}" + file.getAbsolutePath());
+      if (writer != null) {
+        IOUtils.closeQuietly(writer);
       }
     }
-  }
 
-  private float scaleDown(int len) {
-    return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH;
   }
+
 }
\ No newline at end of file