SAMZA-1692: Standalone stability fixes.
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Mon, 7 May 2018 20:43:05 +0000 (13:43 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Mon, 7 May 2018 20:43:05 +0000 (13:43 -0700)
- Currently, on session expiration processorListener with incorrect generationId is registered with zookeeper(ZkUtils generationId is incremented on reconnect but the generationId in processorListener is zero all the time). When a session reconnect happens to a processor successive to leader, leader expiration event will be skipped. This will prevent leader re-election on a current leader death and will stall the processors group. Fix is to reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when multiple processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the following steps
     - Shutdown the ScheduleAfterDebounceTime queue.
     - Stop the zkClient  and relinquish it's resources.

After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, any new operations can be scheduled in ScheduleAfterDebounceTime queue. This will result in RejectedExecutionException, since executorService is stopped.

```
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask23f962a8 rejected from java.util.concurrent.ScheduledThreadPoolExecutor43408be8
```

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>
Author: Shanthoosh Venkataraman <santhoshvenkat1988@gmail.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #496 from shanthoosh/master

samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java

index f6f2dc9..ec3521b 100644 (file)
 package org.apache.samza.zk;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ScheduleAfterDebounceTime {
   private static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
-  private static final String DEBOUNCE_THREAD_NAME_FORMAT = "debounce-thread-%d";
+  private static final String DEBOUNCE_THREAD_NAME_FORMAT = "Samza Debounce Thread-%s";
 
   // timeout to wait for a task to complete.
   private static final int TIMEOUT_MS = 1000 * 10;
@@ -56,11 +56,14 @@ public class ScheduleAfterDebounceTime {
    * A map from actionName to {@link ScheduledFuture} of task scheduled for execution.
    */
   private final Map<String, ScheduledFuture> futureHandles = new ConcurrentHashMap<>();
-  private boolean isShuttingDown;
+  private volatile boolean isShuttingDown;
 
-  public ScheduleAfterDebounceTime() {
-    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
+  public ScheduleAfterDebounceTime(String processorId) {
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(String.format(DEBOUNCE_THREAD_NAME_FORMAT, processorId))
+        .setDaemon(true).build();
     this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    isShuttingDown = false;
   }
 
   public void setScheduledTaskCallback(ScheduledTaskCallback scheduledTaskCallback) {
@@ -79,16 +82,29 @@ public class ScheduleAfterDebounceTime {
    * @param runnable the action to execute.
    */
   public synchronized void scheduleAfterDebounceTime(String actionName, long delayInMillis, Runnable runnable) {
-    // 1. Try to cancel any existing scheduled task associated with the action.
-    tryCancelScheduledAction(actionName);
+    if (!isShuttingDown) {
+      // 1. Try to cancel any existing scheduled task associated with the action.
+      tryCancelScheduledAction(actionName);
+
+      // 2. Schedule the action.
+      ScheduledFuture scheduledFuture =
+          scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), delayInMillis, TimeUnit.MILLISECONDS);
+
+      LOG.info("Scheduled action: {} to run after: {} milliseconds.", actionName, delayInMillis);
+      futureHandles.put(actionName, scheduledFuture);
+    } else {
+      LOG.info("Scheduler is stopped. Not scheduling action: {} to run.", actionName);
+    }
+  }
 
-    // 2. Schedule the action.
-    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), delayInMillis, TimeUnit.MILLISECONDS);
 
-    LOG.info("Scheduled action: {} to run after: {} milliseconds.", actionName, delayInMillis);
-    futureHandles.put(actionName, scheduledFuture);
+  public synchronized void cancelAction(String action) {
+    if (!isShuttingDown) {
+      this.tryCancelScheduledAction(action);
+    }
   }
 
+
   /**
    * Stops the scheduler. After this invocation no further schedule calls will be accepted
    * and all pending enqueued tasks will be cancelled.
@@ -110,16 +126,13 @@ public class ScheduleAfterDebounceTime {
         .forEach(this::tryCancelScheduledAction);
   }
 
-  public synchronized void cancelAction(String action) {
-    this.tryCancelScheduledAction(action);
-  }
-
   /**
    * Tries to cancel the task that belongs to {@code actionName} submitted to the queue.
    *
    * @param actionName the name of action to cancel.
    */
   private void tryCancelScheduledAction(String actionName) {
+    LOG.info("Trying to cancel the action: {}.", actionName);
     ScheduledFuture scheduledFuture = futureHandles.get(actionName);
     if (scheduledFuture != null && !scheduledFuture.isDone()) {
       LOG.info("Attempting to cancel the future of action: {}", actionName);
@@ -146,16 +159,18 @@ public class ScheduleAfterDebounceTime {
   private Runnable getScheduleableAction(String actionName, Runnable runnable) {
     return () -> {
       try {
-        runnable.run();
-        /*
-         * Expects all run() implementations <b>not to swallow the interrupts.</b>
-         * This thread is interrupted from an external source(mostly executor service) to die.
-         */
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.warn("Action: {} is interrupted.", actionName);
-          doCleanUpOnTaskException(new InterruptedException());
-        } else {
-          LOG.debug("Action: {} completed successfully.", actionName);
+        if (!isShuttingDown) {
+          runnable.run();
+          /*
+           * Expects all run() implementations <b>not to swallow the interrupts.</b>
+           * This thread is interrupted from an external source(mostly executor service) to die.
+           */
+          if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("Action: {} is interrupted.", actionName);
+            doCleanUpOnTaskException(new InterruptedException());
+          } else {
+            LOG.info("Action: {} completed successfully.", actionName);
+          }
         }
       } catch (Throwable throwable) {
         LOG.error("Execution of action: {} failed.", actionName, throwable);
index 1134d6f..d6f402f 100644 (file)
@@ -123,7 +123,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
-    debounceTimer = new ScheduleAfterDebounceTime();
+    debounceTimer = new ScheduleAfterDebounceTime(processorId);
     debounceTimer.setScheduledTaskCallback(throwable -> {
         LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
         stop();
index f4c1e94..c9ee1f0 100644 (file)
@@ -51,7 +51,7 @@ public class ZkLeaderElector implements LeaderElector {
   private final String hostName;
 
   private AtomicBoolean isLeader = new AtomicBoolean(false);
-  private final IZkDataListener previousProcessorChangeListener;
+  private IZkDataListener previousProcessorChangeListener;
   private LeaderElectorListener leaderElectorListener = null;
   private String currentSubscription = null;
   private final Random random = new Random();
@@ -130,6 +130,7 @@ public class ZkLeaderElector implements LeaderElector {
         LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
         zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
             previousProcessorChangeListener);
+        previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
       }
       currentSubscription = predecessor;
       LOG.info(zLog("Subscribing data change for " + predecessor));
index 7f687d7..67b2d45 100644 (file)
@@ -34,6 +34,8 @@ public class TestScheduleAfterDebounceTime {
 
   private static final long WAIT_TIME = 500;
 
+  private static final String TEST_PROCESSOR_ID = "TEST_PROCESSOR_ID";
+
   @Rule
   public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
 
@@ -52,7 +54,7 @@ public class TestScheduleAfterDebounceTime {
 
   @Test
   public void testSchedule() throws InterruptedException {
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
     final CountDownLatch latch = new CountDownLatch(1);
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
@@ -72,7 +74,7 @@ public class TestScheduleAfterDebounceTime {
 
   @Test
   public void testCancelAndSchedule() throws InterruptedException {
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
     final CountDownLatch test1Latch = new CountDownLatch(1);
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
@@ -101,7 +103,7 @@ public class TestScheduleAfterDebounceTime {
   public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
     final CountDownLatch latch = new CountDownLatch(1);
     final Throwable[] taskCallbackException = new Exception[1];
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
     scheduledQueue.setScheduledTaskCallback(throwable -> {
         taskCallbackException[0] = throwable;
         latch.countDown();
@@ -121,4 +123,12 @@ public class TestScheduleAfterDebounceTime {
     Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
     scheduledQueue.stopScheduler();
   }
+
+  @Test
+  public void testNewTasksScheduledAfterShutdownDoesNotThrowException() throws InterruptedException {
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
+
+    scheduledQueue.stopScheduler();
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () -> Assert.fail("New event should not be scheduled"));
+  }
 }