TEZ-3938. Task attempts failing due to not making progress (Kuhu Shukla via jeagles)
authorJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 5 Jun 2018 21:24:58 +0000 (16:24 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 5 Jun 2018 21:24:58 +0000 (16:24 -0500)
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java

index c43bd98..6ad41f8 100644 (file)
@@ -575,8 +575,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
     this.creationTime = clock.getTime();
-    //set last notified progress time to current time
-    this.lastNotifyProgressTimestamp = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
@@ -1434,6 +1432,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
       ta.nodeRackName = StringInterner.weakIntern(RackResolver.resolve(ta.containerNodeId.getHost())
           .getNetworkLocation());
+      ta.lastNotifyProgressTimestamp = ta.clock.getTime();
 
       ta.launchTime = ta.clock.getTime();
 
@@ -1585,7 +1584,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         ta.lastNotifyProgressTimestamp = ta.clock.getTime();
       } else {
         long currTime = ta.clock.getTime();
-        if (ta.hungIntervalMax > 0 &&
+        if (ta.hungIntervalMax > 0 && ta.lastNotifyProgressTimestamp > 0 &&
             currTime - ta.lastNotifyProgressTimestamp > ta.hungIntervalMax) {
           // task is hung
           String diagnostics = "Attempt failed because it appears to make no progress for " + 
index 2bad2ef..503e418 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -1138,6 +1140,65 @@ public class TestTaskAttempt {
     assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue());
   }
 
+  @Test (timeout = 60000L)
+  public void testProgressAfterSubmit() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 50);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    MockClock mockClock = new MockClock();
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, mockClock,
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    mockClock.incrementTime(20L);
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    mockClock.incrementTime(55L);
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    verify(eventHandler, atLeast(1)).handle(arg.capture());
+    if (arg.getValue() instanceof  TaskAttemptEvent) {
+      taImpl.handle((TaskAttemptEvent) arg.getValue());
+    }
+    Assert.assertEquals("Task Attempt's internal state should be SUBMITTED!",
+        taImpl.getInternalState(), TaskAttemptStateInternal.SUBMITTED);
+  }
+
   @Test (timeout = 5000)
   public void testNoProgressFail() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);