SAMZA-1368; make sure new job model will be generated in case of barrier timeout.
authorBoris Shkolnik <boryas@apache.org>
Fri, 21 Jul 2017 22:32:58 +0000 (15:32 -0700)
committerJagadish <jagadish@apache.org>
Fri, 21 Jul 2017 22:32:58 +0000 (15:32 -0700)
Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Shanthoosh V <svenkata@linkedin.com>

Closes #247 from sborya/onBarrierTimeout1

samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java

index dd08e3f..e973099 100644 (file)
@@ -313,11 +313,17 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version));
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
-          // no-op
-          // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
-          // participants failed to join. That means, they should have de-registered from "processors" list
-          // and that would have triggered onProcessorChange action -> a new round of consensus.
-          LOG.info("Barrier for version " + version + " timed out.");
+          // no-op for non-leaders
+          // for leader: make sure we do not stop - so generate a new job model
+          LOG.warn("Barrier for version " + version + " timed out.");
+          if (zkController.isLeader()) {
+            LOG.info("Leader will schedule a new job model generation");
+            debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+              {
+                // actual actions to do are the same as onProcessorChange
+                doOnProcessorChange(new ArrayList<>());
+              });
+          }
         }
       }
     }