SAMZA-1390; Update SamzaMonitorService to spawn deamon threads
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Tue, 15 Aug 2017 00:01:09 +0000 (17:01 -0700)
committerJagadish <jagadish@apache.org>
Tue, 15 Aug 2017 00:01:09 +0000 (17:01 -0700)
Observed in LinkedIn production setup that samza-rest jvm process doesn’t stop after main
thread death(due to jetty server failures) since non-deamon threads spawned for
`SamzaMonitorService` are alive.

This affects samza-rest jvm process lifecycle management. To fix this, plugging
in ThreadFactory which sets Thread name format & marks them as daemon.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #270 from shanthoosh/make_samza_rest_non_daemon

samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java

index de6febb..2f940e3 100644 (file)
@@ -18,7 +18,9 @@
  */
 package org.apache.samza.rest;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Map;
+import java.util.concurrent.ThreadFactory;
 import joptsimple.OptionSet;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
@@ -84,6 +86,7 @@ public class SamzaRestService {
    */
   public static void main(String[] args)
       throws Exception {
+    ScheduledExecutorSchedulingProvider schedulingProvider = null;
     try {
       SamzaRestConfig config = parseConfig(args);
       ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
@@ -99,8 +102,11 @@ public class SamzaRestService {
       restService.addServlet(container, "/*");
 
       // Schedule monitors to run
-      ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1);
-      ScheduledExecutorSchedulingProvider schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService);
+      ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                                                              .setNameFormat("MonitorThread-%d")
+                                                              .build();
+      ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1, threadFactory);
+      schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService);
       SamzaMonitorService monitorService = new SamzaMonitorService(config,
                                                                    metricsRegistry,
                                                                    schedulingProvider);
@@ -110,6 +116,10 @@ public class SamzaRestService {
       monitorService.stop();
     } catch (Throwable t) {
       log.error("Exception in main.", t);
+    } finally {
+      if (schedulingProvider != null){
+        schedulingProvider.stop();
+      }
     }
   }