TEZ-3943. TezClient leaks DAGClient for prewarm (Sergey Shelukhin via jlowe)
authorJason Lowe <jlowe@apache.org>
Tue, 29 May 2018 19:23:03 +0000 (14:23 -0500)
committerJason Lowe <jlowe@apache.org>
Tue, 29 May 2018 19:23:03 +0000 (14:23 -0500)
tez-api/src/main/java/org/apache/tez/client/TezClient.java
tez-api/src/test/java/org/apache/tez/client/TestTezClient.java

index d2c1af4..9dd4a69 100644 (file)
@@ -142,7 +142,7 @@ public class TezClient {
   @VisibleForTesting
   final ServicePluginsDescriptor servicePluginsDescriptor;
   private JavaOptsChecker javaOptsChecker = null;
-
+  private DAGClient prewarmDagClient = null;
   private int preWarmDAGCounter = 0;
 
   /* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */
@@ -591,6 +591,25 @@ public class TezClient {
     }
   }
 
+  private void closePrewarmDagClient() {
+    if (prewarmDagClient == null) {
+      return;
+    }
+    try {
+       prewarmDagClient.tryKillDAG();
+       LOG.info("Waiting for prewarm DAG to shut down");
+       prewarmDagClient.waitForCompletion();
+    } catch (Exception ex) {
+       LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex);
+    }
+    try {
+      prewarmDagClient.close();
+    } catch (Exception e) {
+      LOG.warn("Failed to close prewarm DagClient " + prewarmDagClient, e);
+    }
+    prewarmDagClient = null;
+  }
+  
   private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
     Preconditions.checkState(isSession == true, 
         "submitDAG with additional resources applies to only session mode. " + 
@@ -693,6 +712,7 @@ public class TezClient {
    * @throws IOException
    */
   public synchronized void stop() throws TezException, IOException {
+    closePrewarmDagClient();
     try {
       if (amKeepAliveService != null) {
         amKeepAliveService.shutdownNow();
@@ -925,7 +945,7 @@ public class TezClient {
           "available", e);
     }
     if(isReady) {
-      submitDAG(dag);
+      prewarmDagClient = submitDAG(dag);
     } else {
       throw new SessionNotReady("Tez AM not ready, could not submit DAG");
     }
index 2c04061..e959a55 100644 (file)
@@ -38,6 +38,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.atLeast;
@@ -87,10 +88,15 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -405,7 +411,7 @@ public class TestTezClient {
     client.start();
 
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
-    .thenReturn(YarnApplicationState.RUNNING);
+        .thenReturn(YarnApplicationState.RUNNING);
     
     when(
         client.sessionAmProxy.getAMStatus((RpcController) any(), (GetAMStatusRequestProto) any()))
@@ -419,9 +425,21 @@ public class TestTezClient {
     SubmitDAGRequestProto proto = captor1.getValue();
     assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
 
+    setClientToReportStoppedDags(client);
     client.stop();
   }
 
+  private void setClientToReportStoppedDags(TezClientForTest client) throws Exception {
+    when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+      .thenReturn(YarnApplicationState.FINISHED);
+    when(client.sessionAmProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
+      .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGStatusProto.newBuilder()
+          .addDiagnostics("Diagnostics_0").setState(DAGStatusStateProto.DAG_SUCCEEDED)
+          .setDAGProgress(ProgressProto.newBuilder()
+                  .setFailedTaskCount(0).setKilledTaskCount(0).setRunningTaskCount(0)
+                  .setSucceededTaskCount(1).setTotalTaskCount(1).build()).build()).build());
+  }
+
   @Test (timeout=30000)
   public void testPreWarmWithTimeout() throws Exception {
     long startTime = 0 , endTime = 0;
@@ -506,6 +524,7 @@ public class TestTezClient {
     assertTrue("Time taken is not as expected",
         (endTime - startTime) <= timeout);
     verify(spyClient, times(2)).submitDAG(any(DAG.class));
+    setClientToReportStoppedDags(client);
     spyClient.stop();
     client.stop();
   }