TEZ-3951. TezClient wait too long for the DAGClient for prewarm; tries to shut down...
authorHarish JP <harishjp@gmail.com>
Mon, 11 Jun 2018 13:27:22 +0000 (18:57 +0530)
committerHarish JP <harishjp@gmail.com>
Mon, 11 Jun 2018 13:27:22 +0000 (18:57 +0530)
tez-api/src/main/java/org/apache/tez/client/TezClient.java
tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java

index 9dd4a69..ad00592 100644 (file)
@@ -112,6 +112,7 @@ public class TezClient {
 
   private static final String appIdStrPrefix = "application";
   private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_';
+  private static final long PREWARM_WAIT_MS = 500;
   
   @VisibleForTesting
   static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found.";
@@ -584,23 +585,33 @@ public class TezClient {
    *           if submission timed out
    */  
   public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
-    if (isSession) {
-      return submitDAGSession(dag);
-    } else {
-      return submitDAGApplication(dag);
+    DAGClient result = isSession ? submitDAGSession(dag) : submitDAGApplication(dag);
+    if (result != null) {
+      closePrewarmDagClient(); // Assume the current DAG replaced the prewarm one; no need to kill.
     }
+    return result;
   }
 
-  private void closePrewarmDagClient() {
+  private void killAndClosePrewarmDagClient(long waitTimeMs) {
     if (prewarmDagClient == null) {
       return;
     }
     try {
-       prewarmDagClient.tryKillDAG();
-       LOG.info("Waiting for prewarm DAG to shut down");
-       prewarmDagClient.waitForCompletion();
-    } catch (Exception ex) {
-       LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex);
+      prewarmDagClient.tryKillDAG();
+      if (waitTimeMs > 0) {
+        LOG.info("Waiting for prewarm DAG to shut down");
+        prewarmDagClient.waitForCompletion(waitTimeMs);
+      }
+    }
+    catch (Exception ex) {
+      LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex);
+    }
+    closePrewarmDagClient();
+  }
+
+  private void closePrewarmDagClient() {
+    if (prewarmDagClient == null) {
+      return;
     }
     try {
       prewarmDagClient.close();
@@ -705,6 +716,11 @@ public class TezClient {
         frameworkClient);
   }
 
+  @VisibleForTesting
+  protected long getPrewarmWaitTimeMs() {
+    return PREWARM_WAIT_MS;
+  }
+
   /**
    * Stop the client. This terminates the connection to the YARN cluster.
    * In session mode, this shuts down the session DAG App Master
@@ -712,7 +728,7 @@ public class TezClient {
    * @throws IOException
    */
   public synchronized void stop() throws TezException, IOException {
-    closePrewarmDagClient();
+    killAndClosePrewarmDagClient(getPrewarmWaitTimeMs());
     try {
       if (amKeepAliveService != null) {
         amKeepAliveService.shutdownNow();
index c70da75..6c0ebbd 100644 (file)
@@ -103,7 +103,7 @@ public abstract class DAGClient implements Closeable {
   public abstract void tryKillDAG() throws IOException, TezException;
 
   /**
-   * Wait for DAG to complete without printing any vertex statuses
+   * Wait forever for DAG to complete without printing any vertex statuses
    * 
    * @return Final DAG Status
    * @throws IOException
@@ -113,6 +113,17 @@ public abstract class DAGClient implements Closeable {
   public abstract DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException;
 
   /**
+   * Wait for DAG to complete without printing any vertex statuses
+   *
+   * @param timeMs Maximum wait duration
+   * @return Final DAG Status, or null on timeout or if DAG is no longer running
+   * @throws IOException
+   * @throws TezException
+   * @throws InterruptedException
+   */
+  public abstract DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException;
+
+  /**
    * Wait for DAG to complete and periodically print *all* vertices' status.
    * 
    * @param statusGetOpts
@@ -125,4 +136,5 @@ public abstract class DAGClient implements Closeable {
    */
   public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
       throws IOException, TezException, InterruptedException;
+
 }
index 1cf0bfc..9e17b9b 100644 (file)
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.api.client;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.Collections;
@@ -28,7 +29,6 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@@ -338,15 +338,20 @@ public class DAGClientImpl extends DAGClient {
   }
 
   @Override
+  public DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException {
+    return _waitForCompletionWithStatusUpdates(timeMs, false, EnumSet.noneOf(StatusGetOpts.class));
+  }
+
+  @Override
   public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
-    return _waitForCompletionWithStatusUpdates(false, EnumSet.noneOf(StatusGetOpts.class));
+    return _waitForCompletionWithStatusUpdates(-1, false, EnumSet.noneOf(StatusGetOpts.class));
   }
 
   @Override
   public DAGStatus waitForCompletionWithStatusUpdates(
       @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException,
       InterruptedException {
-    return _waitForCompletionWithStatusUpdates(true, statusGetOpts);
+    return _waitForCompletionWithStatusUpdates(-1, true, statusGetOpts);
   }
 
   @Override
@@ -504,15 +509,21 @@ public class DAGClientImpl extends DAGClient {
     return dagStatus;
   }
 
-  private DAGStatus _waitForCompletionWithStatusUpdates(boolean vertexUpdates,
+  private DAGStatus _waitForCompletionWithStatusUpdates(long timeMs,
+                                                        boolean vertexUpdates,
                                                         @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
     DAGStatus dagStatus;
     boolean initPrinted = false;
     boolean runningPrinted = false;
     double dagProgress = -1.0; // Print the first one
     // monitoring
+    Long maxNs = timeMs >= 0 ? (System.nanoTime() + (timeMs * 1000000L)) : null;
     while (true) {
-      dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
+      try {
+        dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
+      } catch (DAGNotRunningException ex) {
+        return null;
+      }
       if (!initPrinted
           && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
         initPrinted = true; // Print once
@@ -525,6 +536,9 @@ public class DAGClientImpl extends DAGClient {
           || dagStatus.getState() == DAGStatus.State.ERROR) {
         break;
       }
+      if (maxNs != null && System.nanoTime() > maxNs) {
+        return null;
+      }
     }// End of while(true)
 
     Set<String> vertexNames = Collections.emptySet();
@@ -537,7 +551,14 @@ public class DAGClientImpl extends DAGClient {
         vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
       }
       dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
-      dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
+      try {
+        dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
+      } catch (DAGNotRunningException ex) {
+        return null;
+      }
+      if (maxNs != null && System.nanoTime() > maxNs) {
+        return null;
+      }
     }// end of while
     // Always print the last status irrespective of progress change
     monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
index e959a55..0c297d3 100644 (file)
@@ -115,6 +115,7 @@ public class TestTezClient {
     YarnClient mockYarnClient;
     ApplicationId mockAppId;
     boolean callRealGetSessionAMProxy;
+    Long prewarmTimeoutMs;
 
     public TezClientForTest(String name, TezConfiguration tezConf,
         @Nullable Map<String, LocalResource> localResources,
@@ -135,6 +136,15 @@ public class TestTezClient {
       }
       return super.getAMProxy(appId);
     }
+
+    public void setPrewarmTimeoutMs(Long prewarmTimeoutMs) {
+      this.prewarmTimeoutMs = prewarmTimeoutMs;
+    }
+
+    @Override
+    protected long getPrewarmWaitTimeMs() {
+      return prewarmTimeoutMs == null ? super.getPrewarmWaitTimeMs() : prewarmTimeoutMs;
+    }
   }
   
   TezClientForTest configureAndCreateTezClient() throws YarnException, IOException, ServiceException {
@@ -429,6 +439,25 @@ public class TestTezClient {
     client.stop();
   }
 
+
+  @Test (timeout=5000)
+  public void testPreWarmCloseStuck() throws Exception {
+    TezClientForTest client = configureAndCreateTezClient();
+    client.setPrewarmTimeoutMs(10L); // Don't wait too long.
+    client.start();
+
+    when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+        .thenReturn(YarnApplicationState.RUNNING);
+    when(client.sessionAmProxy.getAMStatus((RpcController) any(), (GetAMStatusRequestProto) any()))
+        .thenReturn(GetAMStatusResponseProto.newBuilder().setStatus(TezAppMasterStatusProto.READY).build());
+
+    PreWarmVertex vertex = PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1));
+    client.preWarm(vertex);
+    // Keep prewarm in "running" state. Client should give up waiting; if it doesn't, the test will time out.
+    client.stop();
+  }
+
+
   private void setClientToReportStoppedDags(TezClientForTest client) throws Exception {
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
       .thenReturn(YarnApplicationState.FINISHED);
index 42b52e0..16dc2f8 100644 (file)
@@ -86,6 +86,11 @@ public class MRDAGClient extends DAGClient {
   }
 
   @Override
+  public DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException {
+    return realClient.waitForCompletion(timeMs);
+  }
+
+  @Override
   public DAGStatus waitForCompletionWithStatusUpdates(
       @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
     return realClient.waitForCompletionWithStatusUpdates(statusGetOpts);