TEZ-2739. Improve handling of read errors in critical path analyzer (bikas)
authorBikas Saha <bikas@apache.org>
Mon, 31 Aug 2015 18:10:57 +0000 (11:10 -0700)
committerBikas Saha <bikas@apache.org>
Mon, 31 Aug 2015 18:10:57 +0000 (11:10 -0700)
CHANGES.txt
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
tez-tools/analyzers/job-analyzer/pom.xml
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java [new file with mode: 0644]

index e145916..029d776 100644 (file)
@@ -11,6 +11,7 @@ ALL CHANGES:
   same name
   TEZ-2747. Update master to reflect 0.8.0-alpha release.
   TEZ-2662. Provide a way to check whether AM or task opts are valid and error if not.
+  TEZ-2739. Improve handling of read errors in critical path analyzer
 
 Release 0.8.0-alpha: 2015-08-29 
 
index 0e53d27..737df76 100644 (file)
@@ -433,7 +433,7 @@ public class ATSImportTool extends Configured implements Tool {
   }
 
   @VisibleForTesting
-  static int process(String[] args) {
+  public static int process(String[] args) {
     Options options = buildOptions();
     int result = -1;
     try {
index 7a89166..c6f89d6 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.tez.history.parser.datamodel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
@@ -205,14 +206,14 @@ public class TaskInfo extends BaseInfo {
    * @return TaskAttemptInfo
    */
   public final TaskAttemptInfo getSuccessfulTaskAttempt() {
-    if (isNotNullOrEmpty(getSuccessfulAttemptId())) {
+    if (!Strings.isNullOrEmpty(getSuccessfulAttemptId())) {
       for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
         if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
           return attemptInfo;
         }
       }
     }
-    // fall back to checking status if successful attemt id is not available
+    // fall back to checking status if successful attempt id is not available
     for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
       if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
         return attemptInfo;
@@ -350,8 +351,4 @@ public class TaskInfo extends BaseInfo {
     sb.append("]");
     return sb.toString();
   }
-
-  private static boolean isNotNullOrEmpty(String str) {
-    return str != null && !str.isEmpty();
-  }
 }
index 13cf48d..37a06bf 100644 (file)
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-    </dependency>
-    <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
index 448e785..c8d4225 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
 import org.apache.tez.analyzer.utils.SVGUtils;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.history.parser.datamodel.Container;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
@@ -49,11 +50,15 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
   String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
   String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name());
 
-  private final static String DATA_DEPENDENCY = "Data-Dependency";
-  private final static String INIT_DEPENDENCY = "Init-Dependency";
-  private final static String COMMIT_DEPENDENCY = "Commit-Dependency";
-  private final static String NON_DATA_DEPENDENCY = "Non-Data-Dependency";
-  private final static String OUTPUT_LOST = "Previous version outputs lost";
+  public enum CriticalPathDependency {
+    DATA_DEPENDENCY,
+    INIT_DEPENDENCY,
+    COMMIT_DEPENDENCY,
+    RETRY_DEPENDENCY,
+    OUTPUT_RECREATE_DEPENDENCY
+  }
+
+  public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg";
 
   public static class CriticalPathStep {
     public enum EntityType {
@@ -64,7 +69,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
 
     EntityType type;
     TaskAttemptInfo attempt;
-    String reason; // reason linking this to the previous step on the critical path
+    CriticalPathDependency reason; // reason linking this to the previous step on the critical path
     long startCriticalPathTime; // time at which attempt is on critical path
     long stopCriticalPathTime; // time at which attempt is off critical path
     List<String> notes = Lists.newLinkedList();
@@ -85,7 +90,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
     public long getStopCriticalTime() {
       return stopCriticalPathTime;
     }
-    public String getReason() {
+    public CriticalPathDependency getReason() {
       return reason;
     }
     public List<String> getNotes() {
@@ -128,7 +133,13 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
     
     analyzeCriticalPath(dagInfo);
 
-    saveCriticalPathAsSVG(dagInfo);
+    if (getConf().getBoolean(DRAW_SVG, true)) {
+      saveCriticalPathAsSVG(dagInfo);
+    }
+  }
+  
+  public List<CriticalPathStep> getCriticalPath() {
+    return criticalPath;
   }
   
   private void saveCriticalPathAsSVG(DagInfo dagInfo) {
@@ -212,7 +223,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
       // add the commit step
       currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
       currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
-      currentStep.reason = COMMIT_DEPENDENCY;
+      currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
       tempCP.add(currentStep);
 
       while (true) {
@@ -233,7 +244,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
   
         long startCriticalPathTime = 0;
         String nextAttemptId = null;
-        String reason = null;
+        CriticalPathDependency reason = null;
         if (dataDependency) {
           // last data event was produced after the attempt was scheduled. use
           // data dependency
@@ -242,7 +253,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
           if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
             // there is a valid data causal TA. Use it.
             nextAttemptId = currentAttempt.getLastDataEventSourceTA();
-            reason = DATA_DEPENDENCY;
+            reason = CriticalPathDependency.DATA_DEPENDENCY;
             startCriticalPathTime = currentAttempt.getLastDataEventTime();
             System.out.println("Using data dependency " + nextAttemptId);
           } else {
@@ -252,7 +263,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
                 "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
                     + "TA is null for " + currentAttempt.getTaskAttemptId());
             nextAttemptId = null;
-            reason = INIT_DEPENDENCY;
+            reason = CriticalPathDependency.INIT_DEPENDENCY;
             System.out.println("Using init dependency");
           }
         } else {
@@ -262,9 +273,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
           if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
             // there is a scheduling causal TA. Use it.
             nextAttemptId = currentAttempt.getCreationCausalTA();
-            reason = NON_DATA_DEPENDENCY;
+            reason = CriticalPathDependency.RETRY_DEPENDENCY;
             TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
-            if (nextAttempt != null) {
+            if (nextAttemptId != null) {
               VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
               VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
               if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
@@ -272,7 +283,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
                 for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
                   if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
                     // next vertex is an output vertex
-                    reason = OUTPUT_LOST;
+                    reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
                     break;
                   }
                 }
@@ -286,7 +297,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
               // there is a data event going to the vertex. Count the time between data event and
               // scheduling time as Initializer/Manager overhead and follow data dependency
               nextAttemptId = currentAttempt.getLastDataEventSourceTA();
-              reason = DATA_DEPENDENCY;
+              reason = CriticalPathDependency.DATA_DEPENDENCY;
               startCriticalPathTime = currentAttempt.getLastDataEventTime();
               long overhead = currentAttempt.getCreationTime()
                   - currentAttempt.getLastDataEventTime();
@@ -299,17 +310,102 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
               // or the vertex has external input but does not use events
               // or the vertex has no external inputs or edges
               nextAttemptId = null;
-              reason = INIT_DEPENDENCY;
+              reason = CriticalPathDependency.INIT_DEPENDENCY;
               System.out.println("Using init dependency");
             }
           }
         }
-  
+
+        
+        if (!Strings.isNullOrEmpty(nextAttemptId)) {
+          TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
+          TaskAttemptInfo attemptToCheck = nextAttempt;
+
+          // check if the next attempt is already on critical path to prevent infinite loop
+          boolean foundLoop = false;
+          CriticalPathDependency prevReason = null;
+          for (CriticalPathStep previousStep : tempCP) {
+            if (previousStep.attempt.equals(attemptToCheck)) {
+              foundLoop = true;
+              prevReason = previousStep.reason;
+            }
+          }
+
+          if (foundLoop) {
+            // found a loop - find the next step based on heuristics
+            /* only the losing outputs causes us to backtrack. There are 2 cases
+            * 1) Step N reported last data event to this step 
+            *    -> Step N+1 (current step) is the retry for read error reported
+            *    -> read error was reported by the Step N attempt and it did not exit after the 
+            *       error
+            *    -> So scheduling dependency of Step N points back to step N+1
+            * 2) Step N reported last data event to this step
+            *     -> Step N+1 is a retry for a read error reported
+            *     -> Step N+2 is the attempt that reported the read error
+            *     -> Step N+3 is the last data event of N+2 and points back to N+1
+            */
+            System.out.println("Reset " + currentAttempt.getTaskAttemptId() 
+            + " cause: " + currentAttempt.getTerminationCause() 
+            + " time: " + currentAttempt.getFinishTime()
+            + " reason: " + reason
+            + " because of: " + attemptToCheck.getTaskAttemptId());
+            TaskAttemptInfo attemptWithLostAncestor = currentAttempt;
+            if (reason != CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
+              // Case 2 above. If reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY
+              // then its Case 1 above
+              Preconditions.checkState(prevReason.equals(
+                  CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), prevReason);
+              reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
+              attemptWithLostAncestor = nextAttempt;
+            }
+            System.out.println("Reset " + currentAttempt.getTaskAttemptId() 
+            + " cause: " + currentAttempt.getTerminationCause() 
+            + " time: " + currentAttempt.getFinishTime()
+            + " reason: " + reason
+            + " because of: " + attemptToCheck.getTaskAttemptId()
+            + " looking at: " + attemptWithLostAncestor.getTaskAttemptId());
+            Preconditions.checkState(reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY);
+            // we dont track all input events to the consumer. So just jump to
+            // the previous successful version of the current attempt
+            TaskAttemptInfo prevSuccAttempt = null;
+            for (TaskAttemptInfo prevAttempt : attemptWithLostAncestor.getTaskInfo().getTaskAttempts()) {
+              System.out.println("Looking at " + prevAttempt.getTaskAttemptId() 
+              + " cause: " + prevAttempt.getTerminationCause() + 
+              " time: " + prevAttempt.getFinishTime());
+              if (prevAttempt.getTerminationCause()
+                  .equals(TaskAttemptTerminationCause.OUTPUT_LOST.name())) {
+                if (prevAttempt.getFinishTime() < currentAttempt.getFinishTime()) {
+                  // attempt finished before current attempt
+                  if (prevSuccAttempt == null
+                      || prevAttempt.getFinishTime() > prevSuccAttempt.getFinishTime()) {
+                    // keep the latest attempt that had lost outputs
+                    prevSuccAttempt = prevAttempt;
+                  }
+                }
+              }
+            }
+            Preconditions.checkState(prevSuccAttempt != null,
+                attemptWithLostAncestor.getTaskAttemptId());
+            System.out
+                .println("Resetting nextAttempt to : " + prevSuccAttempt.getTaskAttemptId()
+                    + " from " + nextAttempt.getTaskAttemptId());
+            nextAttemptId = prevSuccAttempt.getTaskAttemptId();
+            if (attemptWithLostAncestor == currentAttempt) {
+              startCriticalPathTime = currentAttempt.getCreationTime();
+            } else {
+              startCriticalPathTime = prevSuccAttempt.getFinishTime();
+            }
+          }
+          
+        }
+
         currentStep.startCriticalPathTime = startCriticalPathTime;
         currentStep.reason = reason;
+        
+        Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime);
   
         if (Strings.isNullOrEmpty(nextAttemptId)) {
-          Preconditions.checkState(reason.equals(INIT_DEPENDENCY));
+          Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY));
           Preconditions.checkState(startCriticalPathTime == 0);
           // no predecessor attempt found. this is the last step in the critical path
           // assume attempts start critical path time is when its scheduled. before that is 
@@ -321,7 +417,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
           currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
           currentStep.stopCriticalPathTime = initStepStopCriticalTime;
           currentStep.startCriticalPathTime = dagInfo.getStartTime();
-          currentStep.reason = INIT_DEPENDENCY;
+          currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY;
           tempCP.add(currentStep);
           
           if (!tempCP.isEmpty()) {
@@ -348,7 +444,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
       String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId()
           : (step.getType() == EntityType.VERTEX_INIT
               ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT"));
-      String [] record = {entity, step.getReason(), 
+      String [] record = {entity, step.getReason().name()
           step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), 
           String.valueOf(step.getStopCriticalTime()),
           Joiner.on(";").join(step.getNotes())};
index 50fe033..44408d4 100644 (file)
@@ -252,19 +252,19 @@ public class SVGUtils {
     // draw legend
     int legendX = 0;
     int legendY = (criticalPath.size() + 2) * STEP_GAP;
-    int legendWidth = 10000;
+    int legendWidth = dagFinishTimeInterval/5;
     
-    addRectStr(legendX, legendWidth, legendY, STEP_GAP, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
-    addTextStr(legendX, legendY + STEP_GAP/2, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "");
-    legendY += STEP_GAP;
-    addRectStr(legendX, legendWidth, legendY, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
-    addTextStr(legendX, legendY + STEP_GAP/2, "Task Allocation Overhead", "left", TEXT_SIZE, "");
-    legendY += STEP_GAP;
-    addRectStr(legendX, legendWidth, legendY, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
-    addTextStr(legendX, legendY + STEP_GAP/2, "Task Launch Overhead", "left", TEXT_SIZE, "");
-    legendY += STEP_GAP;
-    addRectStr(legendX, legendWidth, legendY, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
-    addTextStr(legendX, legendY + STEP_GAP/2, "Task Execution Time", "left", TEXT_SIZE, "");
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/3, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "");
+    legendY += STEP_GAP/2;
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/3, "Task Allocation Overhead", "left", TEXT_SIZE, "");
+    legendY += STEP_GAP/2;
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/3, "Task Launch Overhead", "left", TEXT_SIZE, "");
+    legendY += STEP_GAP/2;
+    addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+    addTextStr(legendX, legendY + STEP_GAP/3, "Task Execution Time", "left", TEXT_SIZE, "");
     
     Y_MAX += Y_BASE*2;
     X_MAX += X_BASE*2;
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
new file mode 100644 (file)
index 0000000..9a75461
--- /dev/null
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathDependency;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.history.ATSImportTool;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.test.SimpleTestDAG;
+import org.apache.tez.test.SimpleTestDAG3Vertices;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.SimpleReverseVTestDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class TestAnalyzer {
+  private static final Logger LOG = LoggerFactory.getLogger(TestAnalyzer.class);
+
+  private static String TEST_ROOT_DIR =
+      "target" + Path.SEPARATOR + TestAnalyzer.class.getName() + "-tmpDir";
+  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
+  private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+
+  private static MiniDFSCluster dfsCluster;
+  private static MiniTezClusterWithTimeline miniTezCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  
+  private static TezClient tezSession = null;
+  
+  private static int numDAGs = 0;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+    dfsCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    fs = dfsCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+
+    setupTezCluster();
+    numDAGs = 0;
+  }
+  
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    LOG.info("Stopping mini clusters");
+    if (tezSession != null) {
+      tezSession.stop();
+    }
+    if (miniTezCluster != null) {
+      miniTezCluster.stop();
+      miniTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+    
+  public CriticalPathAnalyzer setupCPAnalyzer() {
+    Configuration analyzerConf = new Configuration(false);
+    analyzerConf.setBoolean(CriticalPathAnalyzer.DRAW_SVG, false);
+    CriticalPathAnalyzer cp = new CriticalPathAnalyzer();
+    cp.setConf(analyzerConf);
+    return cp;
+  }
+
+  public static void setupTezCluster() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+    //Enable per edge counters
+    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
+        .class.getName());
+
+    conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
+
+    miniTezCluster =
+        new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 4, 1, 1, true);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    
+    Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+
+    tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+    tezSession.start();
+
+  }
+  
+  StepCheck createStep(String attempt, CriticalPathDependency reason) {
+    return new StepCheck(attempt, reason);
+  }
+  
+  class StepCheck {
+    String attempt; // attempt is the TaskAttemptInfo short name with regex
+    CriticalPathDependency reason;
+    StepCheck(String attempt, CriticalPathDependency reason) {
+      this.attempt = attempt;
+      this.reason = reason;
+    }
+    String getAttemptDetail() {
+      return attempt;
+    }
+    CriticalPathDependency getReason() {
+      return reason;
+    }
+  }
+
+  DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception {
+    tezSession.waitTillReady();
+    numDAGs++;
+    LOG.info("XXX Running DAG name: " + dag.getName());
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+          + " DAG name: " + dag.getName()
+          + " DAG appContext: " + dagClient.getExecutionContext()
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(100);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+
+    Assert.assertEquals(finalState, dagStatus.getState());
+    
+    String dagId = TezDAGID.getInstance(tezSession.getAppMasterApplicationId(), numDAGs).toString();
+    DagInfo dagInfo = getDagInfo(dagId);
+    
+    verifyCriticalPath(dagInfo, steps);
+    return dagInfo;
+  }
+  
+  DagInfo getDagInfo(String dagId) throws Exception {
+    // sleep for a bit to let ATS events be sent from AM
+    Thread.sleep(1000);
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data and verify results
+    //Parse downloaded contents
+    File downloadedFile = new File(DOWNLOAD_DIR
+        + Path.SEPARATOR + dagId
+        + Path.SEPARATOR + dagId + ".zip");
+    ATSFileParser parser = new ATSFileParser(downloadedFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+  
+  void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) throws Exception {
+    CriticalPathAnalyzer cp = setupCPAnalyzer();
+    cp.analyze(dagInfo);
+    
+    List<CriticalPathStep> criticalPath = cp.getCriticalPath();
+
+    for (CriticalPathStep step : criticalPath) {
+      LOG.info("XXX Step: " + step.getType());
+      if (step.getType() == EntityType.ATTEMPT) {
+        LOG.info("XXX Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
+      }
+      LOG.info("XXX Reason: " + step.getReason());
+      String notes = Joiner.on(";").join(step.getNotes());
+      LOG.info("XXX Notes: " + notes);
+    }
+
+    boolean foundMatchingLength = false;
+    for (StepCheck[] steps : stepsOptions) {
+      if (steps.length + 2 == criticalPath.size()) {
+        foundMatchingLength = true;
+        Assert.assertEquals(CriticalPathStep.EntityType.VERTEX_INIT, criticalPath.get(0).getType());
+        Assert.assertEquals(criticalPath.get(1).getAttempt().getShortName(),
+            criticalPath.get(0).getAttempt().getShortName());
+
+        for (int i=1; i<criticalPath.size() - 1; ++i) {
+          CriticalPathStep step = criticalPath.get(i);
+          Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType());
+          Assert.assertTrue(steps[i-1].getAttemptDetail(), 
+              step.getAttempt().getShortName().matches(steps[i-1].getAttemptDetail()));
+          //Assert.assertEquals(steps[i-1].getAttemptDetail(), step.getAttempt().getShortName());
+          Assert.assertEquals(steps[i-1].getReason(), step.getReason());
+        }
+    
+        Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT,
+            criticalPath.get(criticalPath.size() - 1).getType());
+        break;
+      }
+    }
+    
+    Assert.assertTrue(foundMatchingLength);
+    
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicSuccessScatterGather() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    StepCheck[] check = { 
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY)
+        };
+    DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicTaskFailure() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+    };
+    DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  @Test (timeout=60000)
+  public void testTaskMultipleFailures() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
+        createStep("v1 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+    };    
+    
+    DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicInputFailureWithExit() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+      };
+    
+    DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicInputFailureWithoutExit() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+      };
+
+    DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+
+  @Test (timeout=60000)
+  public void testMultiVersionInputFailureWithExit() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0,1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY),
+      };
+
+    DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithExit", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+
+  @Test (timeout=60000)
+  public void testMultiVersionInputFailureWithoutExit() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+      };
+
+    DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  /**
+   * Sets configuration for cascading input failure tests that
+   * use SimpleTestDAG3Vertices.
+   * @param testConf configuration
+   * @param failAndExit whether input failure should trigger attempt exit 
+   */
+  private void setCascadingInputFailureConfig(Configuration testConf, 
+                                              boolean failAndExit) {
+    // v2 attempt0 succeeds.
+    // v2 task0 attempt1 input0 fails up to version 0.
+    testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit);
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"),
+            0);
+
+    //v3 all-tasks attempt0 input0 fails up to version 0.
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit);
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"),
+            0);
+  }
+  
+  /**
+   * Test cascading input failure without exit. Expecting success.
+   * v1 -- v2 -- v3
+   * v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun.
+   * v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun.
+   * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds.
+   * v3 attempt0 accepts v2 attempt1 output.
+   * 
+   * AM vertex succeeded order is v1, v2, v1, v2, v3.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
+    Configuration testConf = new Configuration(false);
+    setCascadingInputFailureConfig(testConf, false);
+
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+    };
+    
+    DAG dag = SimpleTestDAG3Vertices.createDAG(
+              "testCascadingInputFailureWithoutExitSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  /**
+   * Test cascading input failure with exit. Expecting success.
+   * v1 -- v2 -- v3
+   * v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun.
+   * v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun.
+   * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds.
+   * v3 attempt1 accepts v2 attempt2 output.
+   * 
+   * AM vertex succeeded order is v1, v2, v3, v1, v2, v3.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testCascadingInputFailureWithExitSuccess() throws Exception {
+    Configuration testConf = new Configuration(false);
+    setCascadingInputFailureConfig(testConf, true);
+    
+    StepCheck[] check = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+      };
+
+    DAG dag = SimpleTestDAG3Vertices.createDAG(
+              "testCascadingInputFailureWithExitSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  /**
+   * Input failure of v3 causes rerun of both both v1 and v2 vertices. 
+   *   v1  v2
+   *    \ /
+   *    v3
+   * 
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1");
+    
+    StepCheck[] check = {
+        // use regex for either vertices being possible on the path
+        createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v[12] : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+      };
+
+    DAG dag = SimpleVTestDAG.createDAG(
+            "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  /**
+   * Downstream(v3) attempt failure of a vertex connected with 
+   * 2 upstream vertices.. 
+   *   v1  v2
+   *    \ /
+   *    v3
+   * 
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v3"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v3"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1);
+    
+    StepCheck[] check = {
+        // use regex for either vertices being possible on the path
+        createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
+        createStep("v3 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY),
+      };
+
+    DAG dag = SimpleVTestDAG.createDAG(
+            "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+    
+  /**
+   * Input failure of v2,v3 trigger v1 rerun.
+   * Both v2 and v3 report error on v1 and dont exit. So one of them triggers next
+   * version of v1 and also consume the output of the next version. While the other
+   * consumes the output of the next version of v1. 
+   * Reruns can send output to 2 downstream vertices. 
+   *     v1
+   *    /  \
+   *   v2   v3 
+   * 
+   * Also covers multiple consumer vertices report failure against same producer task.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleReverseVTestDAG.TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, 1);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), false);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), "0");
+    
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0");
+    
+    List<StepCheck[]> stepsOptions = Lists.newLinkedList();
+    StepCheck[] check1 = {
+        // use regex for either vertices being possible on the path
+      createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+      createStep("v[23] : 000000_0", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+      createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+      createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+    };
+    StepCheck[] check2 = {
+        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+    };
+    stepsOptions.add(check1);
+    stepsOptions.add(check2);
+    DAG dag = SimpleReverseVTestDAG.createDAG(
+            "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions);
+  }
+  
+}
\ No newline at end of file