SAMZA-1695: Clear events in ScheduleAfterDebounceTime on session expiration
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Tue, 8 May 2018 02:40:03 +0000 (19:40 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Tue, 8 May 2018 02:40:03 +0000 (19:40 -0700)
Scenario:
Let's assume there're three processors in the group [P1, P2, P3] and P1 is the leader.

1. Leader processor(P1) loses connectivity with a zookeeper server in the ensemble and it's ephemeral processor node is deleted(due to session expiration).
2. Immediate successor(P2) to the leader(P1) finds out that the leader is dead and declares itself as leader. Processor P2 Schedules onProcessorChange to publish JobModel.
3. ZkClient connection retry logic helps the Leader(P1) to reconnect to another zkServer in the ensemble and it joins as follower.
4. Processor P1 acts on the stale buffered event in the debounce queue(which it received when it's a leader) and acts as leader. At this point, there're two processors acting as leader(P1 & P2). If P1 proceeds to execute leader actions before P2, P2 will fail(and in worst case can cause state corruption).

Sample exception logs:
https://gist.github.com/shanthoosh/55410fe4ebf3cfb65281b35f16397cad

Author: Shanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Author: Shanthoosh Venkataraman <shanthoosh@users.noreply.github.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #499 from shanthoosh/remove_events_from_debounce_queue_on_session_expiry

samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java

index ec3521b..9abc26d 100644 (file)
@@ -122,8 +122,14 @@ public class ScheduleAfterDebounceTime {
     scheduledExecutorService.shutdown();
 
     // should clear out the future handles as well
-    futureHandles.keySet()
-        .forEach(this::tryCancelScheduledAction);
+    cancelAllScheduledActions();
+  }
+
+  public synchronized void cancelAllScheduledActions() {
+    if (!isShuttingDown) {
+      futureHandles.keySet().forEach(this::tryCancelScheduledAction);
+      futureHandles.clear();
+    }
   }
 
   /**
index d6f402f..4977bff 100644 (file)
@@ -33,7 +33,6 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
@@ -41,7 +40,6 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -92,10 +90,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final ZkBarrierForVersionUpgrade barrier;
   private final ZkJobCoordinatorMetrics metrics;
   private final Map<String, MetricsReporter> reporters;
+  private final ZkLeaderElector leaderElector;
 
   private StreamMetadataCache streamMetadataCache = null;
   private SystemAdmins systemAdmins = null;
-  private ScheduleAfterDebounceTime debounceTimer = null;
+
+  @VisibleForTesting
+  ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
   private int debounceTimeMs;
@@ -114,7 +115,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     // setup a listener for a session state change
     // we are mostly interested in "session closed" and "new session created" events
     zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
-    LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
+    leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
     this.barrier =  new ZkBarrierForVersionUpgrade(
@@ -218,16 +219,17 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   //////////////////////////////////////////////// LEADER stuff ///////////////////////////
   @Override
   public void onProcessorChange(List<String> processors) {
-    LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
-    debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs,
-        () -> doOnProcessorChange(processors));
+    if (leaderElector.amILeader()) {
+      LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
+      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
+    }
   }
 
   void doOnProcessorChange(List<String> processors) {
     // if list of processors is empty - it means we are called from 'onBecomeLeader'
     // TODO: Handle empty currentProcessorIds.
-    List<String> currentProcessorIds = getActualProcessorIds(processors);
-    Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds);
+    List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
+    Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
 
     if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
       LOG.info("Processors: {} has duplicates. Not generating JobModel.", currentProcessorIds);
@@ -350,8 +352,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
      * to host mapping) is passed in as null when building the jobModel.
      */
     JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
-    // Nuke the configuration in JobModel.
-    return new JobModel(new MapConfig(), model.getContainers());
+    return model;
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {
@@ -379,11 +380,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       startTime = System.nanoTime();
 
       metrics.barrierCreation.inc();
-      debounceTimer.scheduleAfterDebounceTime(
-          barrierAction,
-        (new ZkConfig(config)).getZkBarrierTimeoutMs(),
-        () -> barrier.expire(version)
-      );
+      if (leaderElector.amILeader()) {
+        debounceTimer.scheduleAfterDebounceTime(barrierAction, (new ZkConfig(config)).getZkBarrierTimeoutMs(), () -> barrier.expire(version));
+      }
     }
 
     public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
@@ -397,7 +396,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
           // no-op for non-leaders
           // for leader: make sure we do not stop - so generate a new job model
           LOG.warn("Barrier for version " + version + " timed out.");
-          if (zkController.isLeader()) {
+          if (leaderElector.amILeader()) {
             LOG.info("Leader will schedule a new job model generation");
             debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
               {
@@ -418,13 +417,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   }
 
   /// listener to handle ZK state change events
+  @VisibleForTesting
   class ZkSessionStateChangedListener implements IZkStateListener {
 
     private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
+    private static final String ZK_SESSION_EXPIRED = "ZK_SESSION_EXPIRED";
 
     @Override
-    public void handleStateChanged(Watcher.Event.KeeperState state)
-        throws Exception {
+    public void handleStateChanged(Watcher.Event.KeeperState state) {
       switch (state) {
         case Expired:
           // if the session has expired it means that all the registration's ephemeral nodes are gone.
@@ -433,12 +433,26 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
           // increase generation of the ZK session. All the callbacks from the previous generation will be ignored.
           zkUtils.incGeneration();
 
-          if (coordinatorListener != null) {
-            coordinatorListener.onJobModelExpired();
-          }
-
           // reset all the values that might have been from the previous session (e.g ephemeral node path)
           zkUtils.unregister();
+          if (leaderElector.amILeader()) {
+            leaderElector.resignLeadership();
+          }
+          /**
+           * After this event, one amongst the following two things could potentially happen:
+           * A. On successful reconnect to another zookeeper server in ensemble, this processor is going to
+           * join the group again as new processor. In this case, retaining buffered events in debounceTimer will be unnecessary.
+           * B. If zookeeper server is unreachable, handleSessionEstablishmentError callback will be triggered indicating
+           * a error scenario. In this case, retaining buffered events in debounceTimer will be unnecessary.
+           */
+          LOG.info("Cancelling all scheduled actions in session expiration for processorId: {}.", processorId);
+          debounceTimer.cancelAllScheduledActions();
+          debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_EXPIRED, 0, () -> {
+              if (coordinatorListener != null) {
+                coordinatorListener.onJobModelExpired();
+              }
+            });
+
           return;
         case Disconnected:
           // if the session has expired it means that all the registration's ephemeral nodes are gone.
@@ -460,21 +474,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         default:
           // received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. NoOp
           LOG.info("Got ZK event " + state.toString() + " for processor=" + processorId + ". Continue");
+          return;
       }
     }
 
     @Override
-    public void handleNewSession()
-        throws Exception {
+    public void handleNewSession() {
       LOG.info("Got new session created event for processor=" + processorId);
-
+      debounceTimer.cancelAllScheduledActions();
       LOG.info("register zk controller for the new session");
       zkController.register();
     }
 
     @Override
-    public void handleSessionEstablishmentError(Throwable error)
-        throws Exception {
+    public void handleSessionEstablishmentError(Throwable error) {
       // this means we cannot connect to zookeeper to establish a session
       LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
       debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
index 117d458..c8367fb 100644 (file)
@@ -23,9 +23,14 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
+import org.apache.zookeeper.Watcher;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.mockito.Mockito.*;
+
+
 public class TestZkJobCoordinator {
   private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
   private static final String TEST_JOB_MODEL_VERSION = "1";
@@ -34,16 +39,40 @@ public class TestZkJobCoordinator {
   public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() {
     ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
     ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    Mockito.when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 
     ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    Mockito.when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    Mockito.when(zkUtils.getZkClient()).thenReturn(mockZkClient);
-    Mockito.when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 
     ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
     zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
 
-    Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+    verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+  }
+
+  @Test
+  public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+    ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
+
+    zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
+
+    verify(zkUtils).incGeneration();
+    verify(mockDebounceTimer).cancelAllScheduledActions();
+    verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
   }
 }