SAMZA-1128 : Remove dependency of debounce timer from the CoordinationUtils
authorNavina Ramesh <navina@apache.org>
Thu, 25 May 2017 00:46:12 +0000 (17:46 -0700)
committernramesh <nramesh@linkedin.com>
Thu, 25 May 2017 00:46:12 +0000 (17:46 -0700)
This patch addresses the following, apart from the main bug that barrier implementation uses a different scheduler that the Jobcoordinator's main thread.
* Removes CoordinationUtils#getBarrier, BarrierForVersionUpgrade interface
* Renamed ZkBarrierForVersionUpgrade to ZkBarrier and introduces a listener ZkBarrierListener
* Simplified the ZkBarrier class and its integration test

Author: Navina Ramesh <navina@apache.org>

Reviewers: Boris Shkolnik <boryas@apache.org>, Bharath Kumarasubramanian <bkumarasubramanian@linkedin.com>

Closes #195 from navina/SAMZA-1128

13 files changed:
samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java [deleted file]
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
samza-core/src/main/java/org/apache/samza/zk/ZkBarrierListener.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java

index 34d2542..21bfe76 100644 (file)
@@ -29,7 +29,7 @@ public class ZkConfig extends MapConfig {
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
   public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
   public static final int DEFAULT_CONSENSUS_TIMEOUT_MS = 40000;
-  
+
   public ZkConfig(Config config) {
     super(config);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
deleted file mode 100644 (file)
index 664cef8..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator;
-
-import java.util.List;
-
-
-/**
- * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
- * JobModel.
- */
-public interface BarrierForVersionUpgrade {
-  /**
-   * Barrier is usually started by the leader. Creates the Barrier paths in ZK
-   *
-   * @param version - String, representing the version of the JobModel for which the barrier is created
-   * @param participants - {@link List} of participants that need to join for barrier to complete
-   */
-  void start(String version, List<String> participants);
-
-  /**
-   * Called by the processor.
-   * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
-   * joined.
-   * The call is async. The callback will be invoked when the barrier is reached.
-   * @param version - for which the barrier waits
-   * @param thisProcessorsName as it appears in the list of processors.
-   * @param callback  will be invoked, when barrier is reached.
-   */
-  void waitForBarrier(String version, String thisProcessorsName, Runnable callback);
-}
index 952aa51..150b3d4 100644 (file)
@@ -27,7 +27,6 @@ import org.apache.samza.annotation.InterfaceStability;
  * This service provide three primitives:
  *   - LeaderElection
  *   - Latch
- *   - barrier for version upgrades
  */
 @InterfaceStability.Evolving
 public interface CoordinationUtils {
@@ -42,6 +41,4 @@ public interface CoordinationUtils {
   LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId
 
   Latch getLatch(int size, String latchId);
-
-  BarrierForVersionUpgrade getBarrier(String barrierId);
 }
index c7bfc1d..581387d 100644 (file)
 
 package org.apache.samza.zk;
 
-import java.util.Arrays;
-import java.util.List;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * This class creates a barrier for version upgrade.
- * Barrier is started by the participant responsible for the upgrade. (start())
- * Each participant will mark its readiness and register for a notification when the barrier is reached. (waitFor())
- * If a timer (started in start()) goes off before the barrier is reached, all the participants will unsubscribe
- * from the notification and the barrier becomes invalid.
+ * ZkBarrierForVersionUpgrade is an implementation of distributed barrier, which guarantees that the expected barrier
+ * size and barrier participants match before marking the barrier as complete.
+ * It also allows the caller to expire the barrier.
+ *
+ * This implementation is specifically tailored towards barrier support during jobmodel version upgrades. The participant
+ * responsible for the version upgrade starts the barrier by invoking {@link #create(String, List)}.
+ * Each participant in the list, then, joins the new barrier. When all listed participants {@link #join(String, String)}
+ * the barrier, the creator marks the barrier as {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#DONE}
+ * which signals the end of barrier.
+ * The creator of the barrier can expire the barrier by invoking {@link #expire(String)}. This will mark the barrier
+ * with value {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#TIMED_OUT} and indicates to everyone that it
+ * is no longer valid.
+ *
+ * The caller can listen to events associated with the barrier by registering a {@link ZkBarrierListener}.
+ *
+ * Zk Tree Reference:
+ * /barrierRoot/
+ *  |
+ *  |- barrier_{version1}/
+ *  |   |- barrier_state/
+ *  |   |  ([DONE|TIMED_OUT])
+ *  |   |- barrier_participants/
+ *  |   |   |- {id1}
+ *  |   |   |- {id2}
+ *  |   |   |-  ...
  */
-public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
-  private final ZkUtils zkUtils;
-  private final static String BARRIER_DONE = "done";
-  private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
+public class ZkBarrierForVersionUpgrade {
   private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+  private final ZkUtils zkUtils;
+  private final BarrierKeyBuilder keyBuilder;
+  private final Optional<ZkBarrierListener> barrierListenerOptional;
 
-  private final ScheduleAfterDebounceTime debounceTimer;
+  public enum State {
+    TIMED_OUT, DONE
+  }
 
-  private final String barrierPrefix;
-  private String barrierDonePath;
-  private String barrierProcessors;
-  private static final String VERSION_UPGRADE_TIMEOUT_TIMER = "VersionUpgradeTimeout";
-  private final long barrierTimeoutMS;
 
-  public ZkBarrierForVersionUpgrade(String barrierId, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, long barrierTimeoutMS) {
+  public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, ZkBarrierListener barrierListener) {
     if (zkUtils == null) {
       throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade without ZkUtils.");
     }
     this.zkUtils = zkUtils;
-    barrierPrefix = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(barrierId);
-    this.debounceTimer = debounceTimer;
-    this.barrierTimeoutMS = barrierTimeoutMS;
-  }
-
-  protected long getBarrierTimeOutMs() {
-    return barrierTimeoutMS;
+    this.keyBuilder = new BarrierKeyBuilder(barrierRoot);
+    this.barrierListenerOptional = Optional.ofNullable(barrierListener);
   }
 
-  private void timerOff(final String version, final Stat currentStatOfBarrierDone) {
-    try {
-      // write a new value "TIMED_OUT", if the value was changed since previous value, make sure it was changed to "DONE"
-      zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, currentStatOfBarrierDone.getVersion());
-    } catch (ZkBadVersionException e) {
-      // Expected. failed to write, make sure the value is "DONE"
-      String done = zkUtils.getZkClient().<String>readData(barrierDonePath);
-      LOG.info("Barrier timeout expired, but done=" + done);
-      if (!done.equals(BARRIER_DONE)) {
-        throw new SamzaException("Failed to write to the barrier_done, version=" + version, e);
-      }
-    }
-  }
-
-  private void setPaths(String version) {
-    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
-    barrierDonePath = String.format("%s/barrier_done", barrierPath);
-    barrierProcessors = String.format("%s/barrier_processors", barrierPath);
-
-    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+  /**
+   * Creates a shared barrier sub-tree in ZK
+   *
+   * @param version Version associated with the Barrier
+   * @param participants List of expected participated for this barrier to complete
+   */
+  public void create(final String version, List<String> participants) {
+    String barrierRoot = keyBuilder.getBarrierRoot();
+    String barrierParticipantsPath = keyBuilder.getBarrierParticipantsPath(version);
+    zkUtils.makeSurePersistentPathsExists(new String[]{
+        barrierRoot,
+        keyBuilder.getBarrierPath(version),
+        barrierParticipantsPath,
+        keyBuilder.getBarrierStatePath(version)});
+
+    // subscribe for participant's list changes
+    LOG.info("Subscribing for child changes at " + barrierParticipantsPath);
+    zkUtils.getZkClient().subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
+
+    barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version));
   }
 
-  @Override
-  public void start(String version, List<String> participants) {
-    setPaths(version);
-
-    // subscribe for processor's list changes
-    LOG.info("Subscribing for child changes at " + barrierProcessors);
-    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(participants));
-
-    // create a timer for time-out
-    Stat currentStatOfBarrierDone = new Stat();
-    zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone);
+  /**
+   * Joins a shared barrier by registering under the barrier sub-tree in ZK
+   *
+   * @param version Version associated with the Barrier
+   * @param participantId Identifier of the participant
+   */
+  public void join(String version, String participantId) {
+    String barrierDonePath = keyBuilder.getBarrierStatePath(version);
+    zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
 
-    debounceTimer.scheduleAfterDebounceTime(
-        VERSION_UPGRADE_TIMEOUT_TIMER,
-        getBarrierTimeOutMs(),
-        () -> timerOff(version, currentStatOfBarrierDone));
+    // TODO: Handle ZkNodeExistsException - SAMZA-1304
+    zkUtils.getZkClient().createPersistent(
+        String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
   }
 
-  @Override
-  public void waitForBarrier(String version, String participantName, Runnable callback) {
-    setPaths(version);
-    final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, participantName);
-
-    // now subscribe for the barrier
-    zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+  /**
+   * Expires the barrier version by marking it as TIMED_OUT
+   *
+   * @param version Version associated with the Barrier
+   */
+  public void expire(String version) {
+    zkUtils.getZkClient().writeData(
+        keyBuilder.getBarrierStatePath(version),
+        State.TIMED_OUT);
 
-    // update the barrier for this processor
-    LOG.info("Creating a child for barrier at " + barrierProcessorThis);
-    zkUtils.getZkClient().createPersistent(barrierProcessorThis);
   }
-
   /**
-   * Listener for the subscription for the list of participants.
-   * This method will identify when all the participants have joined.
+   * Listener for changes to the list of participants. It is meant to be subscribed only by the creator of the barrier
+   * node. It checks to see when the barrier is ready to be marked as completed.
    */
   class ZkBarrierChangeHandler implements IZkChildListener {
+    private final String barrierVersion;
     private final List<String> names;
 
-    public ZkBarrierChangeHandler(List<String> names) {
+    public ZkBarrierChangeHandler(String barrierVersion, List<String> names) {
+      this.barrierVersion = barrierVersion;
       this.names = names;
     }
 
     @Override
-    public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
-
+    public void handleChildChange(String parentPath, List<String> currentChildren) {
       if (currentChildren == null) {
-        LOG.info("Got handleChildChange with null currentChildren");
+        LOG.info("Got ZkBarrierChangeHandler handleChildChange with null currentChildren");
         return;
       }
       LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray()));
       LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray()));
 
-      // check if all the names are in
-      if (CollectionUtils.containsAll(names, currentChildren)) {
-        LOG.info("ALl nodes reached the barrier");
+      // check if all the expected participants are in
+      if (currentChildren.size() == names.size() && CollectionUtils.containsAll(currentChildren, names)) {
+        String barrierDonePath = keyBuilder.getBarrierStatePath(barrierVersion);
         LOG.info("Writing BARRIER DONE to " + barrierDonePath);
-        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); // this will trigger notifications
+        zkUtils.getZkClient().writeData(barrierDonePath, State.DONE); // this will trigger notifications
+        zkUtils.getZkClient().unsubscribeChildChanges(barrierDonePath, this);
       }
     }
   }
 
+  /**
+   * Listener for changes to the Barrier state. It is subscribed by all participants of the barrier, including the
+   * participant that creates the barrier.
+   * Barrier state values are either DONE or TIMED_OUT. It only registers to receive on valid state change notification.
+   * Once a valid state change notification is received, it will un-subscribe from further notifications.
+   */
   class ZkBarrierReachedHandler implements IZkDataListener {
-    private final ScheduleAfterDebounceTime debounceTimer;
-    private final String barrierPathDone;
-    private final Runnable callback;
-
-    public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
-      this.barrierPathDone = barrierPathDone;
-      this.callback = callback;
-      this.debounceTimer = debounceTimer;
+    private final String barrierStatePath;
+    private final String barrierVersion;
+
+    public ZkBarrierReachedHandler(String barrierStatePath, String version) {
+      this.barrierStatePath = barrierStatePath;
+      this.barrierVersion = version;
     }
 
     @Override
-    public void handleDataChange(String dataPath, Object data)
-        throws Exception {
-      String done = (String) data;
-      LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
-      if (done.equals(BARRIER_DONE)) {
-        debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
-      } else if (done.equals(BARRIER_TIMED_OUT)) {
-        // timed out
-        LOG.warn("Barrier for " + dataPath + " timed out");
-      }
-      // in any case we unsubscribe
-      zkUtils.unsubscribeDataChanges(barrierPathDone, this);
+    public void handleDataChange(String dataPath, Object data) {
+      LOG.info("got notification about barrier " + barrierStatePath + "; done=" + data);
+      zkUtils.unsubscribeDataChanges(barrierStatePath, this);
+      barrierListenerOptional.ifPresent(
+          zkBarrierListener -> zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data));
     }
 
     @Override
@@ -182,4 +185,33 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
       LOG.warn("barrier done got deleted at " + dataPath);
     }
   }
+
+  class BarrierKeyBuilder {
+    private static final String BARRIER_PARTICIPANTS = "/barrier_participants";
+    private static final String BARRIER_STATE = "/barrier_state";
+    private final String barrierRoot;
+    BarrierKeyBuilder(String barrierRoot) {
+      if (barrierRoot == null || barrierRoot.trim().isEmpty() || !barrierRoot.trim().startsWith("/")) {
+        throw new IllegalArgumentException("Barrier root path cannot be null or empty and the path has to start with '/'");
+      }
+      this.barrierRoot = barrierRoot;
+    }
+
+    String getBarrierRoot() {
+      return barrierRoot;
+    }
+
+    String getBarrierPath(String version) {
+      return String.format("%s/barrier_%s", barrierRoot, version);
+    }
+
+    String getBarrierParticipantsPath(String version) {
+      return getBarrierPath(version) + BARRIER_PARTICIPANTS;
+    }
+
+    String getBarrierStatePath(String version) {
+      return getBarrierPath(version) + BARRIER_STATE;
+    }
+  }
+
 }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierListener.java
new file mode 100644 (file)
index 0000000..ecf77b3
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+/**
+ * An interface for listening to {@link ZkBarrierForVersionUpgrade} related events
+ */
+public interface ZkBarrierListener {
+  /**
+   * Invoked when the root of barrier for a given version is created in Zk
+   *
+   * @param version Version associated with the Barrier
+   */
+  void onBarrierCreated(String version);
+
+  /**
+   * Invoked when the data written to the Barrier state changes
+   *
+   * @param version Version associated with the Barrier
+   * @param state {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State} value
+   */
+  void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state);
+
+  /**
+   * Invoked when Barrier encounters error
+   *
+   * @param version Version associated with the Barrier
+   * @param t Throwable describing the cause of the barrier error
+   */
+  void onBarrierError(String version, Throwable t);
+}
index 44d2e37..a8317a8 100644 (file)
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-
 public class ZkControllerImpl implements ZkController {
   private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class);
 
@@ -115,10 +114,8 @@ public class ZkControllerImpl implements ZkController {
 
   class ZkJobModelVersionChangeHandler implements IZkDataListener {
     /**
-     * called when job model version gets updated
-     * @param dataPath
-     * @param data
-     * @throws Exception
+     * Called when there is a change to the data in JobModel version path
+     * To the subscribers, it signifies that a new version of JobModel is available.
      */
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {
index 661650d..b3a2a6f 100644 (file)
@@ -45,7 +45,7 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory
 
     ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
 
-    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
+    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils);
   }
 
   /**
index 965b32a..df0a527 100644 (file)
@@ -19,7 +19,6 @@
 package org.apache.samza.zk;
 
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
@@ -29,14 +28,11 @@ public class ZkCoordinationUtils implements CoordinationUtils {
   public final ZkConfig zkConfig;
   public final ZkUtils zkUtils;
   public final String processorIdStr;
-  public final ScheduleAfterDebounceTime debounceTimer;
 
-  public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils,
-      ScheduleAfterDebounceTime debounceTimer) {
+  public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils) {
     this.zkConfig = zkConfig;
     this.zkUtils = zkUtils;
     this.processorIdStr = processorId;
-    this.debounceTimer = debounceTimer;
   }
 
   @Override
@@ -54,11 +50,6 @@ public class ZkCoordinationUtils implements CoordinationUtils {
     return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils);
   }
 
-  @Override
-  public BarrierForVersionUpgrade getBarrier(String barrierId) {
-    return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs());
-  }
-
   // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer
   public ZkUtils getZkUtils() {
     return zkUtils;
index 6ad10d2..64395ac 100644 (file)
@@ -21,8 +21,7 @@ package org.apache.samza.zk;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -45,7 +43,6 @@ import java.util.List;
  */
 public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
-  private static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
   // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
   // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
   private static final int METADATA_CACHE_TTL_MS = 5000;
@@ -55,7 +52,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final ZkController zkController;
 
   private final Config config;
-  private final CoordinationUtils coordinationUtils;
+  private final ZkBarrierForVersionUpgrade barrier;
 
   private StreamMetadataCache streamMetadataCache = null;
   private ScheduleAfterDebounceTime debounceTimer = null;
@@ -64,13 +61,24 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   public ZkJobCoordinator(Config config) {
     this.config = config;
+    ZkConfig zkConfig = new ZkConfig(config);
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getAppId());
+    this.zkUtils = new ZkUtils(
+        keyBuilder,
+        ZkCoordinationServiceFactory.createZkClient(
+            zkConfig.getZkConnect(),
+            zkConfig.getZkSessionTimeoutMs(),
+            zkConfig.getZkConnectionTimeoutMs()),
+        zkConfig.getZkConnectionTimeoutMs());
+
     this.processorId = createProcessorId(config);
-    this.coordinationUtils = new ZkCoordinationServiceFactory()
-        .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
-    this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
     LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
+    this.barrier =  new ZkBarrierForVersionUpgrade(
+        keyBuilder.getJobModelVersionBarrierPrefix(),
+        zkUtils,
+        new ZkBarrierListenerImpl());
   }
 
   @Override
@@ -121,12 +129,34 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
   }
 
-  public void doOnProcessorChange(List<String> processors) {
+  void doOnProcessorChange(List<String> processors) {
     // if list of processors is empty - it means we are called from 'onBecomeLeader'
-    generateNewJobModel(processors);
-    if (coordinatorListener != null) {
-      coordinatorListener.onJobModelExpired();
+    // TODO: Handle empty currentProcessorIds or duplicate processorIds in the list
+    List<String> currentProcessorIds = getActualProcessorIds(processors);
+
+    // Generate the JobModel
+    JobModel jobModel = generateNewJobModel(currentProcessorIds);
+
+    // Assign the next version of JobModel
+    String currentJMVersion  = zkUtils.getJobModelVersion();
+    String nextJMVersion;
+    if (currentJMVersion == null) {
+      nextJMVersion = "1";
+    } else {
+      nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
     }
+    LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
+
+    // Publish the new job model
+    zkUtils.publishJobModel(nextJMVersion, jobModel);
+
+    // Start the barrier for the job model update
+    barrier.create(nextJMVersion, currentProcessorIds);
+
+    // Notify all processors about the new JobModel by updating JobModel Version number
+    zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
+
+    LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
   }
 
   @Override
@@ -138,16 +168,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         if (coordinatorListener != null) {
           coordinatorListener.onJobModelExpired();
         }
-        LOG.info("pid=" + processorId + "new JobModel available.Container stopped.");
-        // get the new job model
+        // get the new job model from ZK
         newJobModel = zkUtils.getJobModel(version);
 
         LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
 
         // update ZK and wait for all the processors to get this new version
-        ZkBarrierForVersionUpgrade barrier =
-            (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_UPGRADE_BARRIER);
-        barrier.waitForBarrier(version, processorId, () -> onNewJobModelConfirmed(version));
+        barrier.join(version, processorId);
       });
   }
 
@@ -179,53 +206,23 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
   }
 
-  /**
-   * Generate new JobModel when becoming a leader or the list of processor changed.
-   */
-  private void generateNewJobModel(List<String> processors) {
-    List<String> currentProcessorsIds;
+  private List<String> getActualProcessorIds(List<String> processors) {
     if (processors.size() > 0) {
       // we should use this list
       // but it needs to be converted into PIDs, which is part of the data
-      currentProcessorsIds = zkUtils.getActiveProcessorsIDs(processors);
+      return zkUtils.getActiveProcessorsIDs(processors);
     } else {
       // get the current list of processors
-      currentProcessorsIds = zkUtils.getSortedActiveProcessorsIDs();
+      return zkUtils.getSortedActiveProcessorsIDs();
     }
+  }
 
-    // get the current version
-    String currentJMVersion  = zkUtils.getJobModelVersion();
-    String nextJMVersion;
-    if (currentJMVersion == null) {
-      LOG.info("pid=" + processorId + "generating first version of the model");
-      nextJMVersion = "1";
-    } else {
-      nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
-    }
-    LOG.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
-
-    List<String> containerIds = new ArrayList<>(currentProcessorsIds.size());
-    for (String processorPid : currentProcessorsIds) {
-      containerIds.add(processorPid);
-    }
-    LOG.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
-
-    JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
-        containerIds);
-
-    LOG.info("pid=" + processorId + "Generated jobModel: " + jobModel);
-
-    // publish the new job model first
-    zkUtils.publishJobModel(nextJMVersion, jobModel);
-
-    // start the barrier for the job model update
-    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(
-        JOB_MODEL_UPGRADE_BARRIER);
-    barrier.start(nextJMVersion, currentProcessorsIds);
-
-    // publish new JobModel version
-    zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
-    LOG.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
+  /**
+   * Generate new JobModel when becoming a leader or the list of processor changed.
+   */
+  private JobModel generateNewJobModel(List<String> processors) {
+    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+        processors);
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {
@@ -241,4 +238,40 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         });
     }
   }
+
+  class ZkBarrierListenerImpl implements ZkBarrierListener {
+    private final String barrierAction = "BarrierAction";
+    @Override
+    public void onBarrierCreated(String version) {
+      debounceTimer.scheduleAfterDebounceTime(
+          barrierAction,
+        (new ZkConfig(config)).getZkBarrierTimeoutMs(),
+        () -> barrier.expire(version)
+      );
+    }
+
+    public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
+      LOG.info("JobModel version " + version + " obtained consensus successfully!");
+      if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
+        debounceTimer.scheduleAfterDebounceTime(
+            barrierAction,
+          0,
+          () -> onNewJobModelConfirmed(version));
+      } else {
+        if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
+          // no-op
+          // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
+          // participants failed to join. That means, they should have de-registered from "processors" list
+          // and that would have triggered onProcessorChange action -> a new round of consensus.
+          LOG.info("Barrier for version " + version + " timed out.");
+        }
+      }
+    }
+
+    @Override
+    public void onBarrierError(String version, Throwable t) {
+      LOG.error("Encountered error while attaining consensus on JobModel version " + version);
+      stop();
+    }
+  }
 }
index 7e4e0d6..37bff6d 100644 (file)
@@ -50,6 +50,7 @@ public class ZkKeyBuilder {
 
   static final String PROCESSORS_PATH = "processors";
   static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration";
+  static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
 
   public ZkKeyBuilder(String pathPrefix) {
     if (pathPrefix != null && !pathPrefix.trim().isEmpty()) {
@@ -94,7 +95,7 @@ public class ZkKeyBuilder {
     return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
   }
 
-  public String getJobModelVersionBarrierPrefix(String barrierId) {
-    return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, barrierId);
+  public String getJobModelVersionBarrierPrefix() {
+    return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER);
   }
 }
index c547901..677ce54 100644 (file)
@@ -114,10 +114,6 @@ public class ZkUtils {
     }
   }
 
-  public synchronized String getEphemeralPath() {
-    return ephemeralPath;
-  }
-
   /**
    * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
    *
@@ -259,7 +255,7 @@ public class ZkUtils {
 
     if (currentVersion != null && !currentVersion.equals(oldVersion)) {
       throw new SamzaException(
-          "Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
+          "Someone changed JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
     }
     // data version is the ZK version of the data from the ZK.
     int dataVersion = stat.getVersion();
@@ -268,7 +264,7 @@ public class ZkUtils {
     } catch (Exception e) {
       String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
       LOG.error(msg, e);
-      throw new SamzaException(msg);
+      throw new SamzaException(msg, e);
     }
     LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion  + 1) +
         "(actual data version after update = " + stat.getVersion() +    ")");
index 9c91fd3..63d6663 100644 (file)
  */
 package org.apache.samza.zk;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationServiceFactory;
-import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+
+// TODO: Rename this such that it is clear that it is an integration test and NOT unit test
 public class TestZkBarrierForVersionUpgrade {
   private static EmbeddedZookeeper zkServer = null;
   private static String testZkConnectionString = null;
-  private static CoordinationUtils coordinationUtils;
-
-  private static AtomicInteger counter = new AtomicInteger(1);
-
-
-  @Before
-  public void testSetup() {
+  private ZkUtils zkUtils;
+  private ZkUtils zkUtils1;
 
+  @BeforeClass
+  public static void test() {
     zkServer = new EmbeddedZookeeper();
     zkServer.setup();
     testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
+  }
 
-    String groupId = "group" + counter.getAndAdd(1);
-    String processorId = "p1";
-    Map<String, String> map = new HashMap<>();
-    map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
-    map.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, "200");
-    Config config = new MapConfig(map);
-
-    CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();
-    coordinationUtils = serviceFactory.getCoordinationService(groupId, processorId, config);
+  @Before
+  public void testSetup() {
+    ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   @After
   public void testTearDown() {
-    coordinationUtils.reset();
+    zkUtils.close();
+    zkUtils1.close();
+  }
+
+  @AfterClass
+  public static void teardown() {
     zkServer.teardown();
   }
 
   @Test
   public void testZkBarrierForVersionUpgrade() {
-    String barrierId = "b1";
+    String barrierId = zkUtils.getKeyBuilder().getRootPath() + "/b1";
     String ver = "1";
     List<String> processors = new ArrayList<>();
     processors.add("p1");
     processors.add("p2");
-
-    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
-
-    class Status {
-      boolean p1 = false;
-      boolean p2 = false;
+    final CountDownLatch latch = new CountDownLatch(2);
+    final AtomicInteger stateChangedCalled = new AtomicInteger(0);
+
+    ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, new ZkBarrierListener() {
+      @Override
+      public void onBarrierCreated(String version) {
+      }
+
+      @Override
+      public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+        if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) {
+          latch.countDown();
+          stateChangedCalled.incrementAndGet();
+        }
+      }
+
+      @Override
+      public void onBarrierError(String version, Throwable t) {
+
+      }
+    });
+
+    processor1Barrier.create(ver, processors);
+    processor1Barrier.join(ver, "p1");
+
+    ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, new ZkBarrierListener() {
+      @Override
+      public void onBarrierCreated(String version) {
+      }
+
+      @Override
+      public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+        if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) {
+          latch.countDown();
+          stateChangedCalled.incrementAndGet();
+        }
+      }
+
+      @Override
+      public void onBarrierError(String version, Throwable t) {
+
+      }
+    });
+    processor2Barrier.join(ver, "p2");
+
+    boolean result = false;
+    try {
+      result = latch.await(10000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
-    final Status s = new Status();
-
-    barrier.start(ver, processors);
-
-    barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
-
-    barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
-
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
-  }
-
-  @Test
-  public void testNegativeZkBarrierForVersionUpgrade() {
-    String barrierId = "negativeZkBarrierForVersionUpgrade";
-    String ver = "1";
-    List<String> processors = new ArrayList<>();
-    processors.add("p1");
-    processors.add("p2");
-    processors.add("p3");
-
-    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
-
-    class Status {
-      boolean p1 = false;
-      boolean p2 = false;
-      boolean p3 = false;
+    Assert.assertTrue("Barrier failed to complete within test timeout.", result);
+
+    try {
+      List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_v1/barrier_participants");
+      Assert.assertNotNull(children);
+      Assert.assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size());
+      Assert.assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children);
+    } catch (Exception e) {
+      // no-op
     }
-    final Status s = new Status();
-
-    barrier.start(ver, processors);
-
-    barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
-
-    barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
-
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+    Assert.assertEquals(2, stateChangedCalled.get());
   }
 
   @Test
   public void testZkBarrierForVersionUpgradeWithTimeOut() {
-    String barrierId = "barrierTimeout";
+    String barrierId = zkUtils1.getKeyBuilder().getRootPath() + "/barrierTimeout";
     String ver = "1";
     List<String> processors = new ArrayList<>();
     processors.add("p1");
     processors.add("p2");
-    processors.add("p3");
-
-    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
-
-    class Status {
-      boolean p1 = false;
-      boolean p2 = false;
-      boolean p3 = false;
+    processors.add("p3"); // Simply to prevent barrier from completion for testing purposes
+
+    final AtomicInteger timeoutStateChangeCalled = new AtomicInteger(0);
+    final CountDownLatch latch = new CountDownLatch(2);
+    final ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(
+        barrierId,
+        zkUtils,
+        new ZkBarrierListener() {
+          @Override
+          public void onBarrierCreated(String version) {
+          }
+
+          @Override
+          public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+            if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
+              timeoutStateChangeCalled.incrementAndGet();
+              latch.countDown();
+            }
+          }
+
+          @Override
+          public void onBarrierError(String version, Throwable t) {
+
+          }
+
+        });
+    processor1Barrier.create(ver, processors);
+    processor1Barrier.join(ver, "p1");
+
+    final ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(
+        barrierId,
+        zkUtils1,
+        new ZkBarrierListener() {
+          @Override
+          public void onBarrierCreated(String version) {
+          }
+
+          @Override
+          public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+            if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
+              timeoutStateChangeCalled.incrementAndGet();
+              latch.countDown();
+            }
+          }
+
+          @Override
+          public void onBarrierError(String version, Throwable t) {
+
+          }
+
+        });
+
+    processor2Barrier.join(ver, "p2");
+
+    processor1Barrier.expire(ver);
+    boolean result = false;
+    try {
+      result = latch.await(10000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
-    final Status s = new Status();
-
-    barrier.start(ver, processors);
-
-    barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
-
-    barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
-
-    // this node will join "too late"
-    barrier.waitForBarrier(ver, "p3", () -> {
-        TestZkUtils.sleepMs(300);
-        s.p3 = true;
-      });
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
+    Assert.assertTrue("Barrier Timeout test failed to complete within test timeout.", result);
+
+    try {
+      List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_v1/barrier_participants");
+      Assert.assertNotNull(children);
+      Assert.assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size());
+      Assert.assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children);
+    } catch (Exception e) {
+      // no-op
+    }
+    Assert.assertEquals(2, timeoutStateChangeCalled.get());
   }
 }
index 6d0bc0b..8ddd688 100644 (file)
@@ -56,7 +56,6 @@ public class TestZkKeyBuilder {
     Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
     String version = "2";
     Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
-    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix(
-        "testBarrier"));
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
   }
 }