Merge branch 'TEZ-3334' into TEZ-3334-MERGE1
authorJonathan Eagles <jeagles@yahoo-inc.com>
Fri, 19 May 2017 19:56:37 +0000 (14:56 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Fri, 19 May 2017 19:56:37 +0000 (14:56 -0500)
1  2 
tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java

@@@ -158,10 -159,8 +158,8 @@@ public class AMContainerHelpers 
      ContainerLaunchContext commonContainerSpec = null;
      synchronized (commonContainerSpecLock) {
        if (!commonContainerSpecs.containsKey(tezDAGID)) {
-         String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-             TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
          commonContainerSpec =
 -            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService);
 +            createCommonContainerLaunchContext(acls, credentials, localResources, auxiliaryService);
          commonContainerSpecs.put(tezDAGID, commonContainerSpec);
        } else {
          commonContainerSpec = commonContainerSpecs.get(tezDAGID);
@@@ -335,19 -329,8 +336,20 @@@ public class AMContainerImpl implement
      this.schedulerId = schedulerId;
      this.launcherId = launcherId;
      this.taskCommId = taskCommId;
 -    this.stateMachine = stateMachineFactory.make(this);
+     this.auxiliaryService = auxiliaryService;
 +    this.stateMachine = new StateMachineTez<>(stateMachineFactory.make(this), this);
 +    augmentStateMachine();
 +  }
 +
 +
 +  private void augmentStateMachine() {
 +    stateMachine
 +        .registerStateEnteredCallback(AMContainerState.STOP_REQUESTED,
 +            NON_RUNNING_STATE_ENTERED_CALLBACK)
 +        .registerStateEnteredCallback(AMContainerState.STOPPING,
 +            NON_RUNNING_STATE_ENTERED_CALLBACK)
 +        .registerStateEnteredCallback(AMContainerState.COMPLETED,
 +            NON_RUNNING_STATE_ENTERED_CALLBACK);
    }
  
    @Override
@@@ -44,8 -42,8 +45,9 @@@ public class AMContainerMap extends Abs
    private final TaskCommunicatorManagerInterface tal;
    private final AppContext context;
    private final ContainerSignatureMatcher containerSignatureMatcher;
 -  private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
 +  @VisibleForTesting
 +  final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+   private String auxiliaryService;
  
    public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
        ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
    }
  
    public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
 -    AMContainer amc = new AMContainerImpl(container, chh, tal,
 -      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService);
 +    AMContainer amc = createAmContainer(container, chh, tal,
-         containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
++        containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService);
      return (containerMap.putIfAbsent(container.getId(), amc) == null);
    }
  
-                                 int launcherId, int taskCommId) {
 +  AMContainer createAmContainer(Container container,
 +                                ContainerHeartbeatHandler chh,
 +                                TaskCommunicatorManagerInterface tal,
 +                                ContainerSignatureMatcher signatureMatcher,
 +                                AppContext appContext, int schedulerId,
-         signatureMatcher, appContext, schedulerId, launcherId, taskCommId);
++                                int launcherId, int taskCommId, String auxiliaryService) {
 +    AMContainer amc = new AMContainerImpl(container, chh, tal,
++        signatureMatcher, appContext, schedulerId, launcherId, taskCommId, auxiliaryService);
 +    return amc;
 +  }
 +
    public AMContainer get(ContainerId containerId) {
      return containerMap.get(containerId);
    }
@@@ -63,8 -63,8 +63,9 @@@ import org.apache.hadoop.yarn.event.Eve
  import org.apache.hadoop.yarn.util.SystemClock;
  import org.apache.tez.common.security.JobTokenIdentifier;
  import org.apache.tez.common.security.TokenCache;
+ import org.apache.tez.dag.api.TezConfiguration;
  import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 +import org.apache.tez.dag.app.rm.node.AMNodeEventType;
  import org.apache.tez.serviceplugins.api.ContainerEndReason;
  import org.apache.tez.serviceplugins.api.ServicePluginException;
  import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@@ -1,4 -1,4 +1,4 @@@
--/**
++/*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@@ -48,104 -39,35 +48,104 @@@ import org.junit.Test
  
  public class TestAMContainerMap {
  
 -  private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
 -    return mock(ContainerHeartbeatHandler.class);
 -  }
  
 -  private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException {
 -    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
 -    TaskCommunicator taskComm = mock(TaskCommunicator.class);
 -    doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
 -    doReturn(taskComm).when(tal).getTaskCommunicator(0);
 -    return tal;
 -  }
 +  @Test (timeout = 10000)
 +  public void testCleanupOnDagComplete() {
  
 -  private AppContext mockAppContext() {
 +    ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
 +    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
      AppContext appContext = mock(AppContext.class);
 -    return appContext;
 -  }
  
 -  @SuppressWarnings("deprecation")
 -  private ContainerId mockContainerId(int cId) {
 -    ApplicationId appId = ApplicationId.newInstance(1000, 1);
 -    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
 -    ContainerId containerId = ContainerId.newInstance(appAttemptId, cId);
 -    return containerId;
 +
 +
 +    int numContainers = 7;
 +    WrappedContainer[] wContainers = new WrappedContainer[numContainers];
 +    for (int i = 0 ; i < numContainers ; i++) {
 +      WrappedContainer wc =
 +          new WrappedContainer(false, null, i);
 +      wContainers[i] = wc;
 +    }
 +
 +    AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock(
 +        ContainerSignatureMatcher.class), appContext, wContainers);
 +
 +    for (int i = 0 ; i < numContainers ; i++) {
 +      amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0);
 +    }
 +
 +
 +    // Container 1 in LAUNCHING state
 +    wContainers[0].launchContainer();
 +    wContainers[0].verifyState(AMContainerState.LAUNCHING);
 +
 +    // Container 2 in IDLE state
 +    wContainers[1].launchContainer();
 +    wContainers[1].containerLaunched();
 +    wContainers[1].verifyState(AMContainerState.IDLE);
 +
 +    // Container 3 RUNNING state
 +    wContainers[2].launchContainer();
 +    wContainers[2].containerLaunched();
 +    wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID);
 +    wContainers[2].verifyState(AMContainerState.RUNNING);
 +
 +    // Cointainer 4 STOP_REQUESTED
 +    wContainers[3].launchContainer();
 +    wContainers[3].containerLaunched();
 +    wContainers[3].stopRequest();
 +    wContainers[3].verifyState(AMContainerState.STOP_REQUESTED);
 +
 +    // Container 5 STOPPING
 +    wContainers[4].launchContainer();
 +    wContainers[4].containerLaunched();
 +    wContainers[4].stopRequest();
 +    wContainers[4].nmStopSent();
 +    wContainers[4].verifyState(AMContainerState.STOPPING);
 +
 +    // Container 6 COMPLETED
 +    wContainers[5].launchContainer();
 +    wContainers[5].containerLaunched();
 +    wContainers[5].stopRequest();
 +    wContainers[5].nmStopSent();
 +    wContainers[5].containerCompleted();
 +    wContainers[5].verifyState(AMContainerState.COMPLETED);
 +
 +    // Container 7 STOP_REQUESTED + ERROR
 +    wContainers[6].launchContainer();
 +    wContainers[6].containerLaunched();
 +    wContainers[6].containerLaunched();
 +    assertTrue(wContainers[6].amContainer.isInErrorState());
 +    wContainers[6].verifyState(AMContainerState.STOP_REQUESTED);
 +
 +    // 7 containers present, and registered with AMContainerMap at this point.
 +
 +    assertEquals(7, amContainerMap.containerMap.size());
 +    amContainerMap.dagComplete(mock(DAG.class));
 +    assertEquals(5, amContainerMap.containerMap.size());
    }
  
 -  private Container mockContainer(ContainerId containerId) {
 -    NodeId nodeId = NodeId.newInstance("localhost", 43255);
 -    Container container = Container.newInstance(containerId, nodeId, "localhost:33333",
 -        Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class));
 -    return container;
 +  private static class AMContainerMapForTest extends AMContainerMap {
 +
 +
 +    private WrappedContainer[] wrappedContainers;
 +
 +    public AMContainerMapForTest(ContainerHeartbeatHandler chh,
 +                                 TaskCommunicatorManagerInterface tal,
 +                                 ContainerSignatureMatcher containerSignatureMatcher,
 +                                 AppContext context, WrappedContainer[] wrappedContainers) {
 +      super(chh, tal, containerSignatureMatcher, context);
 +      this.wrappedContainers = wrappedContainers;
 +    }
 +
 +    @Override
 +    AMContainer createAmContainer(Container container,
 +                                  ContainerHeartbeatHandler chh,
 +                                  TaskCommunicatorManagerInterface tal,
 +                                  ContainerSignatureMatcher signatureMatcher,
 +                                  AppContext appContext, int schedulerId,
-                                   int launcherId, int taskCommId) {
++                                  int launcherId, int taskCommId, String auxiliaryService) {
 +      return wrappedContainers[container.getId().getId()].amContainer;
 +    }
 +
    }
  }