TEZ-3935. DAG aware scheduler should release unassigned new containers rather than...
authorJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 22 May 2018 18:28:55 +0000 (13:28 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 22 May 2018 18:28:55 +0000 (13:28 -0500)
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java

index 243f278..50b17b9 100644 (file)
@@ -1021,6 +1021,20 @@ public class TezConfiguration extends Configuration {
       TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT = false;
 
   /**
+   * Boolean value. Whether to reuse new containers that could not be immediately assigned to
+   * pending requests. If enabled then newly assigned containers that cannot be immediately
+   * allocated will be held for potential reuse as if it were a container that had just completed
+   * a task. If disabled then newly assigned containers that cannot be immediately allocated will
+   * be released.  Active only if container reuse is enabled.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED =
+      TEZ_AM_PREFIX + "container.reuse.new-containers.enabled";
+  public static final boolean
+      TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED_DEFAULT = false;
+
+  /**
    * Int value. The amount of time to wait before assigning a container to the next level
    * of locality. NODE -> RACK -> NON_LOCAL. Delay scheduling parameter. Expert level setting.
    */
index dab1cad..167d879 100644 (file)
@@ -147,6 +147,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
   private boolean shouldReuseContainers;
   private boolean reuseRackLocal;
   private boolean reuseNonLocal;
+  private boolean reuseNewContainers;
   private long localitySchedulingDelay;
   private long idleContainerTimeoutMin;
   private long idleContainerTimeoutMax;
@@ -192,6 +193,10 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
       "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
       + " enabled");
 
+    reuseNewContainers = shouldReuseContainers && conf.getBoolean(
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED,
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED_DEFAULT);
+
     localitySchedulingDelay = conf.getLong(
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
@@ -362,7 +367,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
     }
 
     for (HeldContainer hc : unassigned) {
-      if (shouldReuseContainers) {
+      if (reuseNewContainers) {
         idleTracker.add(hc);
         TaskRequest assigned = tryAssignReuseContainer(hc, appState, isSession);
         if (assigned != null) {
index 529f65c..0910ed2 100644 (file)
@@ -1229,7 +1229,7 @@ public class TestDagAwareYarnTaskScheduler {
   }
 
   @Test(timeout=50000)
-  public void testIdleContainerAssignment() throws Exception {
+  public void testContainerAssignmentReleaseNewContainers() throws Exception {
     AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
 
     String appHost = "host";
@@ -1241,6 +1241,77 @@ public class TestDagAwareYarnTaskScheduler {
     conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 4000);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 5000);
+    conf.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 5);
+
+    DagInfo mockDagInfo = mock(DagInfo.class);
+    when(mockDagInfo.getTotalVertices()).thenReturn(10);
+    when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo);
+    when(mockApp.isSession()).thenReturn(true);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    MockClock clock = new MockClock(1000);
+    NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback,
+        mockRMClient, clock);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    scheduler.start();
+    drainableAppCallback.drain();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+        regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(),
+        regResponse.getQueue());
+
+    assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
+
+    final String rack1 = "/r1";
+    final String rack2 = "/r2";
+    final String node1Rack1 = "n1r1";
+    final String node2Rack1 = "n2r1";
+    final String node1Rack2 = "n1r2";
+    MockDNSToSwitchMapping.addRackMapping(node1Rack1, rack1);
+    MockDNSToSwitchMapping.addRackMapping(node2Rack1, rack1);
+    MockDNSToSwitchMapping.addRackMapping(node1Rack2, rack2);
+
+    Priority priorityv0 = Priority.newInstance(1);
+    MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, node1Rack1, rack1);
+
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    ContainerId cid1 = ContainerId.newContainerId(attemptId, 1);
+    NodeId n2r1 = NodeId.newInstance(node2Rack1, 1);
+    Container container1 = Container.newInstance(cid1, n2r1, null, taskv0t0.capability, priorityv0, null);
+
+    // verify new container is released is not immediately allocated
+    scheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    // app is not notified of the container being released since it never launched
+    verify(mockApp, never()).containerBeingReleased(cid1);
+    verify(mockRMClient).releaseAssignedContainer(eq(cid1));
+  }
+
+  @Test(timeout=50000)
+  public void testIdleContainerAssignmentReuseNewContainers() throws Exception {
+    AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest());
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED, true);
     conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
     conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 4000);
     conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 5000);