TEZ-3932. TaskSchedulerManager can throw NullPointerException during DAGAppMaster...
authorJason Lowe <jlowe@apache.org>
Wed, 9 May 2018 15:25:32 +0000 (10:25 -0500)
committerJason Lowe <jlowe@apache.org>
Wed, 9 May 2018 15:25:32 +0000 (10:25 -0500)
tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java

index 5777a2a..61e3702 100644 (file)
@@ -420,10 +420,14 @@ public class TaskSchedulerManager extends AbstractService implements
       // Inform the Node - the task has asked to be STOPPED / has already
       // stopped.
       // AMNodeImpl blacklisting logic does not account for KILLED attempts.
-      sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
-          get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
-          attemptContainerId,
-          attempt.getID(), event.getState() == TaskAttemptState.FAILED));
+      AMContainer amContainer = appContext.getAllContainers().get(attemptContainerId);
+      // DAG can be shutting down so protect against container cleanup race
+      if (amContainer != null) {
+        Container container = amContainer.getContainer();
+        sendEvent(new AMNodeEventTaskAttemptEnded(container.getNodeId(), event.getSchedulerId(),
+            attemptContainerId,
+            attempt.getID(), event.getState() == TaskAttemptState.FAILED));
+      }
     }
   }
 
@@ -436,9 +440,14 @@ public class TaskSchedulerManager extends AbstractService implements
     if (event.getUsedContainerId() != null) {
       sendEvent(new AMContainerEventTASucceeded(usedContainerId,
           event.getAttemptID()));
-      sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
-          get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
-          event.getAttemptID()));
+      AMContainer amContainer = appContext.getAllContainers().get(usedContainerId);
+      // DAG can be shutting down so protect against container cleanup race
+      if (amContainer != null) {
+        Container container = amContainer.getContainer();
+        sendEvent(new AMNodeEventTaskAttemptSucceeded(container.getNodeId(), event.getSchedulerId(),
+            usedContainerId,
+            event.getAttemptID()));
+      }
     }
 
     boolean wasContainerAllocated = false;
@@ -742,10 +751,15 @@ public class TaskSchedulerManager extends AbstractService implements
     // because the deallocateTask downcall may have raced with the
     // taskAllocated() upcall
     assert task.equals(taskAttempt);
-    if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
-      sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
-          event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
+
+    AMContainer amContainer = appContext.getAllContainers().get(containerId);
+    // Even though we just added this container,
+    // DAG can be shutting down so protect against container cleanup race
+    if (amContainer != null) {
+      if (amContainer.getState() == AMContainerState.ALLOCATED) {
+        sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
+            event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
+      }
     }
     sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
         event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
@@ -951,7 +965,10 @@ public class TaskSchedulerManager extends AbstractService implements
     // An AMContainer instance should already exist if an attempt is being made to preempt it
     AMContainer amContainer = appContext.getAllContainers().get(containerId);
     try {
-      taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
+      // DAG can be shutting down so protect against container cleanup race
+      if (amContainer != null) {
+        taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
+      }
     } catch (Exception e) {
       String msg = "Error in TaskScheduler when preempting container"
           + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), appContext)
index 5df25de..dcf9a5d 100644 (file)
@@ -70,6 +70,7 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClientServer;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.app.dag.impl.VertexImpl;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
@@ -217,6 +219,79 @@ public class TestTaskSchedulerManager {
     assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
   }
 
+  @Test(timeout = 5000)
+  public void testTASucceededAfterContainerCleanup() throws Exception {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+
+    TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
+    TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class);
+    when(mockAttemptId.getId()).thenReturn(0);
+    when(mockTaskAttempt.getID()).thenReturn(mockAttemptId);
+    Resource resource = Resource.newInstance(1024, 1);
+    ContainerContext containerContext =
+        new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(),
+            new HashMap<String, String>(), "");
+    int priority = 10;
+    TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(new HashSet<String>(), null);
+
+    ContainerId mockCId = mock(ContainerId.class);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(mockCId);
+
+    AMContainer mockAMContainer = mock(AMContainer.class);
+    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
+    when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE);
+
+    // Returning null container will replicate container cleanup scenario
+    when(mockAMContainerMap.get(mockCId)).thenReturn(null);
+
+    AMSchedulerEventTALaunchRequest lr =
+        new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
+            priority, containerContext, 0, 0, 0);
+    schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
+    assertEquals(1, mockEventHandler.events.size());
+    assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA);
+    AMContainerEventAssignTA assignEvent =
+        (AMContainerEventAssignTA) mockEventHandler.events.get(0);
+    assertEquals(priority, assignEvent.getPriority());
+    assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
+  }
+
+  @Test(timeout = 5000)
+  public void testTAUnsuccessfulAfterContainerCleanup() throws Exception {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+
+    TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
+    TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class);
+    when(mockAttemptId.getId()).thenReturn(0);
+    when(mockTaskAttempt.getID()).thenReturn(mockAttemptId);
+
+    ContainerId mockCId = mock(ContainerId.class);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(mockCId);
+
+    AMContainer mockAMContainer = mock(AMContainer.class);
+    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
+    when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE);
+    when(mockTaskAttempt.getAssignedContainerID()).thenReturn(mockCId);
+
+    // Returning null container will replicate container cleanup scenario
+    when(mockAMContainerMap.get(mockCId)).thenReturn(null);
+
+    schedulerHandler.handleEvent(
+        new AMSchedulerEventTAEnded(
+            mockTaskAttempt, mockCId, TaskAttemptState.KILLED, null, null, 0));
+    assertEquals(1, mockEventHandler.events.size());
+    assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventStopRequest);
+    AMContainerEventStopRequest stopEvent =
+        (AMContainerEventStopRequest) mockEventHandler.events.get(0);
+    assertEquals(mockCId, stopEvent.getContainerId());
+  }
+
   @Test (timeout = 5000)
   public void testTaskBasedAffinity() throws Exception {
     Configuration conf = new Configuration(false);
@@ -288,7 +363,7 @@ public class TestTaskSchedulerManager {
     schedulerHandler.stop();
     schedulerHandler.close();
   }
-  
+
   @Test (timeout = 5000)
   public void testContainerInternalPreempted() throws IOException, ServicePluginException {
     Configuration conf = new Configuration(false);
@@ -318,6 +393,37 @@ public class TestTaskSchedulerManager {
     schedulerHandler.stop();
     schedulerHandler.close();
   }
+
+  @Test(timeout = 5000)
+  public void testContainerInternalPreemptedAfterContainerCleanup() throws IOException, ServicePluginException {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+
+    AMContainer mockAmContainer = mock(AMContainer.class);
+    when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0);
+    when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0);
+    when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0);
+    ContainerId mockCId = mock(ContainerId.class);
+    verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any());
+    // Returning null container will replicate container cleanup scenario
+    when(mockAMContainerMap.get(mockCId)).thenReturn(null);
+    schedulerHandler.preemptContainer(0, mockCId);
+    verify(mockTaskScheduler, times(0)).deallocateContainer(mockCId);
+    assertEquals(1, mockEventHandler.events.size());
+    Event event = mockEventHandler.events.get(0);
+    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
+    assertEquals(mockCId, completedEvent.getContainerId());
+    assertEquals("Container preempted internally", completedEvent.getDiagnostics());
+    assertTrue(completedEvent.isPreempted());
+    Assert.assertFalse(completedEvent.isDiskFailed());
+    assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+        completedEvent.getTerminationCause());
+
+    schedulerHandler.stop();
+    schedulerHandler.close();
+  }
   
   @Test (timeout = 5000)
   public void testContainerDiskFailed() throws IOException {