SAMZA-1365: Calling zkClient.close from zkWatch impl blocks indefinitely.
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Thu, 3 Aug 2017 21:32:09 +0000 (14:32 -0700)
committernavina <navina@apache.org>
Thu, 3 Aug 2017 21:32:09 +0000 (14:32 -0700)
Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #253 from shanthoosh/SAMZA-1365

samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java

index 2204240..9f64b3a 100644 (file)
@@ -333,6 +333,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   /// listener to handle session expiration
   class ZkSessionStateChangedListener implements IZkStateListener {
 
+    private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
+
     @Override
     public void handleStateChanged(Watcher.Event.KeeperState state)
         throws Exception {
@@ -367,7 +369,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         throws Exception {
       // this means we cannot connect to zookeeper
       LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
-      stop();
+      debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
     }
   }