// in progress.
private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
-
- private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+ private final Set<TezTaskID> waitingToSpeculate = new HashSet<TezTaskID>();
private Vertex vertex;
private TaskRuntimeEstimator estimator;
if (task.getState() == TaskState.SUCCEEDED) {
return NOT_RUNNING;
}
-
- if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
- acceptableRuntime = estimator.thresholdRuntime(taskID);
- if (acceptableRuntime == Long.MAX_VALUE) {
- return ON_SCHEDULE;
- }
- }
-
- TezTaskAttemptID runningTaskAttemptID = null;
int numberRunningAttempts = 0;
for (TaskAttempt taskAttempt : attempts.values()) {
- if (taskAttempt.getState() == TaskAttemptState.RUNNING
- || taskAttempt.getState() == TaskAttemptState.STARTING) {
+ TaskAttemptState taskAttemptState = taskAttempt.getState();
+ if (taskAttemptState == TaskAttemptState.RUNNING
+ || taskAttemptState == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) {
+ waitingToSpeculate.remove(taskID);
return ALREADY_SPECULATING;
}
+ }
+ }
+
+ // If we are here, there's at most one task attempt.
+ if (numberRunningAttempts == 0) {
+ return NOT_RUNNING;
+ }
+
+ if ((numberRunningAttempts == 1) && waitingToSpeculate.contains(taskID)) {
+ return ALREADY_SPECULATING;
+ }
+ else {
+ if (!shouldUseTimeout) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+ }
+
+ TezTaskAttemptID runningTaskAttemptID = null;
+
+ for (TaskAttempt taskAttempt : attempts.values()) {
+ TaskAttemptState taskAttemptState = taskAttempt.getState();
+ if (taskAttemptState == TaskAttemptState.RUNNING
+ || taskAttemptState == TaskAttemptState.STARTING) {
+
runningTaskAttemptID = taskAttempt.getID();
long taskAttemptStartTime
}
}
- // If we are here, there's at most one task attempt.
- if (numberRunningAttempts == 0) {
- return NOT_RUNNING;
- }
-
-
-
if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
protected void addSpeculativeAttempt(TezTaskID taskID) {
LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
vertex.scheduleSpeculativeTask(taskID);
- mayHaveSpeculated.add(taskID);
+ waitingToSpeculate.add(taskID);
}
private int maybeScheduleASpeculation() {