SAMZA-1724: Guarantee exit from ApplicationRunnerMain during deploys
authorPrateek Maheshwari <pmaheshwari@apache.org>
Wed, 30 May 2018 17:37:23 +0000 (10:37 -0700)
committerPrateek Maheshwari <pmaheshw@LM-LSNSCDW6508.linkedin.biz>
Wed, 30 May 2018 17:37:23 +0000 (10:37 -0700)
Author: Prateek Maheshwari <pmaheshw@LM-LSNSCDW6508.linkedin.biz>

Reviewers: Jagadish Venkatraman <vjagadish1989@gmail.com>

Closes #538 from prateekm/process-exit

samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java [moved from samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java with 75% similarity]
samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java [moved from samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java with 87% similarity]

index 84427ea..f9f7467 100644 (file)
@@ -23,6 +23,7 @@ import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.job.JobRunner$;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.Util;
@@ -52,6 +53,12 @@ public class ApplicationRunnerMain {
   }
 
   public static void main(String[] args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(
+        new SamzaUncaughtExceptionHandler(() -> {
+          System.out.println("Exiting process now.");
+          System.exit(1);
+        }));
+
     ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
     OptionSet options = cmdLine.parser().parse(args);
     Config orgConfig = cmdLine.loadConfig(options);
@@ -78,6 +85,8 @@ public class ApplicationRunnerMain {
     } else {
       JobRunner$.MODULE$.main(args);
     }
+
+    System.exit(0);
   }
 }
 
index 7751241..66176d7 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerExceptionHandler;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
@@ -78,6 +78,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
         config,
         ScalaJavaUtil.toScalaMap(new HashMap<>()),
         taskFactory);
+
     container.setContainerListener(
         new SamzaContainerListener() {
           @Override
@@ -96,9 +97,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
             containerRunnerException = t;
           }
         });
+
     startContainerHeartbeatMonitor();
     container.run();
     stopContainerHeartbeatMonitor();
+    
     if (containerRunnerException != null) {
       log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
       System.exit(1);
@@ -127,10 +130,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
 
   public static void main(String[] args) throws Exception {
     Thread.setDefaultUncaughtExceptionHandler(
-        new SamzaContainerExceptionHandler(() -> {
+        new SamzaUncaughtExceptionHandler(() -> {
           log.info("Exiting process now.");
           System.exit(1);
         }));
+
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
     log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
@@ -153,6 +157,8 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
     StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
     LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
     localContainerRunner.run(streamApp);
+
+    System.exit(0);
   }
 
   private void startContainerHeartbeatMonitor() {
  * under the License.
  */
 
-package org.apache.samza.container;
+package org.apache.samza.util;
 
 import java.lang.Thread.UncaughtExceptionHandler;
 
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An UncaughtExceptionHandler for SamzaContainer that simply executes the configured {@link #runnable}
- * when any thread throws an uncaught exception.
+ * An UncaughtExceptionHandler that logs the uncaught exception, logs a thread dump, and then
+ * executes the provided {@code runnable}.
+ * <p>
+ * Example usage: Exit process if any thread throws an uncaught exception:
+ * <pre>
+ * Thread.setDefaultUncaughtExceptionHandler(
+ *   new SamzaUncaughtExceptionHandler(() -&gt; {
+ *     System.exit(1);
+ *   })
+ * );
+ * </pre>
  */
-public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerExceptionHandler.class);
+public class SamzaUncaughtExceptionHandler implements UncaughtExceptionHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SamzaUncaughtExceptionHandler.class);
   private final Runnable runnable;
 
-  public SamzaContainerExceptionHandler(Runnable runnable) {
+  public SamzaUncaughtExceptionHandler(Runnable runnable) {
     this.runnable = runnable;
   }
   /**
 package org.apache.samza.container;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertTrue;
 
-public class TestSamzaContainerExceptionHandler {
+public class TestSamzaUncaughtExceptionHandler {
 
   @Test
   public void testExceptionHandler() {
     final AtomicBoolean exitCalled = new AtomicBoolean(false);
     Thread.UncaughtExceptionHandler exceptionHandler =
-        new SamzaContainerExceptionHandler(() -> exitCalled.getAndSet(true));
+        new SamzaUncaughtExceptionHandler(() -> exitCalled.getAndSet(true));
     exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
     assertTrue(exitCalled.get());
   }