TEZ-3934. LegacySpeculator sometime issues wrong number of speculative attempts ...
authorJonathan Eagles <jeagles@yahoo-inc.com>
Fri, 27 Jul 2018 14:56:10 +0000 (09:56 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Fri, 27 Jul 2018 14:56:10 +0000 (09:56 -0500)
tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java

index 9fbea19..c132fb1 100644 (file)
@@ -85,8 +85,7 @@ public class LegacySpeculator {
   // in progress.
   private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
 
-
-  private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+  private final Set<TezTaskID> waitingToSpeculate = new HashSet<TezTaskID>();
 
   private Vertex vertex;
   private TaskRuntimeEstimator estimator;
@@ -229,24 +228,44 @@ public class LegacySpeculator {
     if (task.getState() == TaskState.SUCCEEDED) {
       return NOT_RUNNING;
     }
-    
-    if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
-      acceptableRuntime = estimator.thresholdRuntime(taskID);
-      if (acceptableRuntime == Long.MAX_VALUE) {
-        return ON_SCHEDULE;
-      }
-    }
-
-    TezTaskAttemptID runningTaskAttemptID = null;
 
     int numberRunningAttempts = 0;
 
     for (TaskAttempt taskAttempt : attempts.values()) {
-      if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.STARTING) {
+      TaskAttemptState taskAttemptState = taskAttempt.getState();
+      if (taskAttemptState == TaskAttemptState.RUNNING
+          || taskAttemptState == TaskAttemptState.STARTING) {
         if (++numberRunningAttempts > 1) {
+          waitingToSpeculate.remove(taskID);
           return ALREADY_SPECULATING;
         }
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+    if ((numberRunningAttempts == 1) && waitingToSpeculate.contains(taskID)) {
+      return ALREADY_SPECULATING;
+    }
+    else {
+      if (!shouldUseTimeout) {
+        acceptableRuntime = estimator.thresholdRuntime(taskID);
+        if (acceptableRuntime == Long.MAX_VALUE) {
+          return ON_SCHEDULE;
+        }
+      }
+    }
+
+    TezTaskAttemptID runningTaskAttemptID = null;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      TaskAttemptState taskAttemptState = taskAttempt.getState();
+      if (taskAttemptState == TaskAttemptState.RUNNING
+          || taskAttemptState == TaskAttemptState.STARTING) {
+
         runningTaskAttemptID = taskAttempt.getID();
 
         long taskAttemptStartTime
@@ -311,13 +330,6 @@ public class LegacySpeculator {
       }
     }
 
-    // If we are here, there's at most one task attempt.
-    if (numberRunningAttempts == 0) {
-      return NOT_RUNNING;
-    }
-
-
-
     if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
       acceptableRuntime = estimator.thresholdRuntime(taskID);
       if (acceptableRuntime == Long.MAX_VALUE) {
@@ -332,7 +344,7 @@ public class LegacySpeculator {
   protected void addSpeculativeAttempt(TezTaskID taskID) {
     LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
     vertex.scheduleSpeculativeTask(taskID);
-    mayHaveSpeculated.add(taskID);
+    waitingToSpeculate.add(taskID);
   }
 
   private int maybeScheduleASpeculation() {