SAMZA-1653: Support waitForFinish in remote application runner and add waitForFinish
authorBharath Kumarasubramanian <bkumaras@linkedin.com>
Mon, 7 May 2018 18:11:01 +0000 (11:11 -0700)
committerxiliu <xiliu@linkedin.com>
Mon, 7 May 2018 18:11:01 +0000 (11:11 -0700)
Added the following APIs to ApplicationRunner
 `void waitForFinish()`
 `boolean waitForFinish(Duration timeout)`

Implemented the wait for finish methods in remote application runner. Note currently, there is disparity in the APIs in terms of associating runners with stream application. Ideally, we want to decide on the cardinal relation between them and change the APIs accordingly.

The goal of the PR is limited to introduce API (waitForFinish) parity between runners in the current setup.
xinyuiscool

Author: Bharath Kumarasubramanian <bkumaras@linkedin.com>

Reviewers: Xinyu Liu <xinyuliu.us@gmail.com>

Closes #503 from bharathkk/samza-1653

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java [new file with mode: 0644]
samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java

index 440dd33..8339429 100644 (file)
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.runtime;
 
+import java.time.Duration;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
@@ -107,6 +108,24 @@ public abstract class ApplicationRunner {
   public abstract ApplicationStatus status(StreamApplication streamApp);
 
   /**
+   * Waits until the application finishes.
+   */
+  public void waitForFinish() {
+    throw new UnsupportedOperationException(getClass().getName() + " does not support waitForFinish.");
+  }
+
+  /**
+   * Waits for {@code timeout} duration for the application to finish.
+   *
+   * @param timeout time to wait for the application to finish
+   * @return true - application finished before timeout
+   *         false - otherwise
+   */
+  public boolean waitForFinish(Duration timeout) {
+    throw new UnsupportedOperationException(getClass().getName() + " does not support timed waitForFinish.");
+  }
+
+  /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
    *
    * The stream configurations are read from the following properties in the config:
index 8f481cd..1284060 100644 (file)
@@ -19,6 +19,8 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
@@ -194,15 +196,42 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   /**
-   * Block until the application finishes
+   * Waits until the application finishes.
    */
+  @Override
   public void waitForFinish() {
+    waitForFinish(Duration.ofMillis(0));
+  }
+
+  /**
+   * Waits for {@code timeout} duration for the application to finish.
+   * If timeout &lt; 1, blocks the caller indefinitely.
+   *
+   * @param timeout time to wait for the application to finish
+   * @return true - application finished before timeout
+   *         false - otherwise
+   */
+  @Override
+  public boolean waitForFinish(Duration timeout) {
+    long timeoutInMs = timeout.toMillis();
+    boolean finished = true;
+
     try {
-      shutdownLatch.await();
+      if (timeoutInMs < 1) {
+        shutdownLatch.await();
+      } else {
+        finished = shutdownLatch.await(timeoutInMs, TimeUnit.MILLISECONDS);
+
+        if (!finished) {
+          LOG.warn("Timed out waiting for application to finish.");
+        }
+      }
     } catch (Exception e) {
-      LOG.error("Wait is interrupted by exception", e);
+      LOG.error("Error waiting for application to finish", e);
       throw new SamzaException(e);
     }
+
+    return finished;
   }
 
   /**
@@ -280,4 +309,9 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   Set<StreamProcessor> getProcessors() {
     return processors;
   }
+
+  @VisibleForTesting
+  CountDownLatch getShutdownLatch() {
+    return shutdownLatch;
+  }
 }
index ea218d0..202fa76 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import java.time.Duration;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 
+import static org.apache.samza.job.ApplicationStatus.*;
+
 
 /**
  * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster
@@ -41,6 +44,7 @@ import java.util.UUID;
 public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class);
+  private static final long DEFAULT_SLEEP_DURATION_MS = 2000;
 
   public RemoteApplicationRunner(Config config) {
     super(config);
@@ -110,9 +114,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
       ExecutionPlan plan = getExecutionPlan(app);
       for (JobConfig jobConfig : plan.getJobConfigs()) {
-        JobRunner runner = new JobRunner(jobConfig);
-        ApplicationStatus status = runner.status();
-        LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
+        ApplicationStatus status = getApplicationStatus(jobConfig);
 
         switch (status.getStatusCode()) {
           case New:
@@ -133,22 +135,79 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
       if (hasNewJobs) {
         // There are jobs not started, report as New
-        return ApplicationStatus.New;
+        return New;
       } else if (hasRunningJobs) {
         // All jobs are started, some are running
-        return ApplicationStatus.Running;
+        return Running;
       } else if (unsuccessfulFinishStatus != null) {
         // All jobs are finished, some are not successful
         return unsuccessfulFinishStatus;
       } else {
         // All jobs are finished successfully
-        return ApplicationStatus.SuccessfulFinish;
+        return SuccessfulFinish;
       }
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
     }
   }
 
+  /* package private */ ApplicationStatus getApplicationStatus(JobConfig jobConfig) {
+    JobRunner runner = new JobRunner(jobConfig);
+    ApplicationStatus status = runner.status();
+    LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
+    return status;
+  }
+
+  /**
+   * Waits until the application finishes.
+   */
+  public void waitForFinish() {
+    waitForFinish(Duration.ofMillis(0));
+  }
+
+  /**
+   * Waits for {@code timeout} duration for the application to finish.
+   * If timeout &lt; 1, blocks the caller indefinitely.
+   *
+   * @param timeout time to wait for the application to finish
+   * @return true - application finished before timeout
+   *         false - otherwise
+   */
+  public boolean waitForFinish(Duration timeout) {
+    JobConfig jobConfig = new JobConfig(config);
+    boolean finished = true;
+    long timeoutInMs = timeout.toMillis();
+    long startTimeInMs = System.currentTimeMillis();
+    long timeElapsed = 0L;
+
+    long sleepDurationInMs = timeoutInMs < 1 ?
+        DEFAULT_SLEEP_DURATION_MS : Math.min(timeoutInMs, DEFAULT_SLEEP_DURATION_MS);
+    ApplicationStatus status;
+
+    try {
+      while (timeoutInMs < 1 || timeElapsed <= timeoutInMs) {
+        status = getApplicationStatus(jobConfig);
+        if (status == SuccessfulFinish || status == UnsuccessfulFinish) {
+          LOG.info("Application finished with status {}", status);
+          break;
+        }
+
+        Thread.sleep(sleepDurationInMs);
+        timeElapsed = System.currentTimeMillis() - startTimeInMs;
+      }
+
+      if (timeElapsed > timeoutInMs) {
+        LOG.warn("Timed out waiting for application to finish.");
+        finished = false;
+      }
+    } catch (Exception e) {
+      LOG.error("Error waiting for application to finish", e);
+      throw new SamzaException(e);
+    }
+
+    return finished;
+  }
+
   private Config getConfigFromPrevRun() {
     CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
     consumer.register();
index b4a2259..84ecc6c 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.runtime;
 
 import com.google.common.collect.ImmutableList;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,6 +52,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
@@ -304,6 +306,24 @@ public class TestLocalApplicationRunner {
         planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
   }
 
+  @Test
+  public void testWaitForFinishReturnsBeforeTimeout() {
+    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig());
+    long timeoutInMs = 1000;
+
+    runner.getShutdownLatch().countDown();
+    boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs));
+    assertTrue("Application did not finish before the timeout.", finished);
+  }
+
+  @Test
+  public void testWaitForFinishTimesout() {
+    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig());
+    long timeoutInMs = 100;
+    boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs));
+    assertFalse("Application finished before the timeout.", finished);
+  }
+
   private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
     String intermediateStreamJson =
         updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
new file mode 100644 (file)
index 0000000..2ef2b33
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.time.Duration;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.ApplicationStatus;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * A test class for {@link RemoteApplicationRunner}.
+ */
+public class TestRemoteApplicationRunner {
+  @Test
+  public void testWaitForFinishReturnsBeforeTimeout() {
+    RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new MapConfig()));
+    doReturn(ApplicationStatus.SuccessfulFinish).when(runner).getApplicationStatus(any(JobConfig.class));
+
+    boolean finished = runner.waitForFinish(Duration.ofMillis(5000));
+    assertTrue("Application did not finish before the timeout.", finished);
+  }
+
+  @Test
+  public void testWaitForFinishTimesout() {
+    RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new MapConfig()));
+    doReturn(ApplicationStatus.Running).when(runner).getApplicationStatus(any(JobConfig.class));
+
+    boolean finished = runner.waitForFinish(Duration.ofMillis(1000));
+    assertFalse("Application finished before the timeout.", finished);
+  }
+}
index 4497a7c..f3093a7 100644 (file)
@@ -110,7 +110,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
     Validate.isTrue(localRunner, "This method can be called only in standalone mode.");
     SamzaSqlApplication app = new SamzaSqlApplication();
     run(app);
-    ((LocalApplicationRunner) appRunner).waitForFinish();
+    appRunner.waitForFinish();
   }
 
   @Override