SAMZA-1082 : Implement Leader Election using ZK
authornavina <navina@apache.org>
Mon, 13 Feb 2017 21:32:06 +0000 (13:32 -0800)
committernavina <navina@apache.org>
Mon, 13 Feb 2017 21:32:06 +0000 (13:32 -0800)
Simple implementation of leader election recipe along with unit tests

Author: navina <navina@apache.org>

Reviewers: Xinyu Liu <xinyuliu.us@gmail.com>, Fred Ji <fji@linkedin.com>

Closes #48 from navina/LeaderElector

13 files changed:
build.gradle
gradle/dependency-versions.gradle
samza-core/src/main/java/org/apache/samza/config/ZkConfig.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java [new file with mode: 0644]

index 5b41c52..0d60970 100644 (file)
@@ -159,6 +159,7 @@ project(":samza-core_$scalaVersion") {
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
+    compile "com.101tec:zkclient:$zkClientVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
index db59672..0193b64 100644 (file)
@@ -26,7 +26,7 @@
   mockitoVersion = "1.8.4"
   scalaTestVersion = "2.2.4"
   zkClientVersion = "0.8"
-  zookeeperVersion = "3.3.4"
+  zookeeperVersion = "3.4.6"
   metricsVersion = "2.2.0"
   kafkaVersion = "0.10.0.1"
   commonsHttpClientVersion = "3.1"
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
new file mode 100644 (file)
index 0000000..f26b2d9
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+public class ZkConfig extends MapConfig {
+  // Connection string for ZK, format: :<hostname>:<port>,..."
+  public static final String ZK_CONNECT = "coordinator.zk.connect";
+  public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms";
+  public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.connection-timeout-ms";
+
+  public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
+  public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
+
+  public ZkConfig(Config config) {
+    super(config);
+  }
+
+  public String getZkConnect() {
+    if (!containsKey(ZK_CONNECT)) {
+      throw new ConfigException("Missing " + ZK_CONNECT + " config!");
+    }
+    return get(ZK_CONNECT);
+  }
+
+  public int getZkSessionTimeoutMs() {
+    return getInt(ZK_SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS);
+  }
+
+  public int getZkConnectionTimeoutMs() {
+    return getInt(ZK_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
new file mode 100644 (file)
index 0000000..94e3311
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.leaderelection;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public interface LeaderElector {
+  /**
+   * Method that helps the caller participate in leader election and returns when the participation is complete
+   *
+   * @return True, if caller is chosen as a leader through the leader election process. False, otherwise.
+   */
+  boolean tryBecomeLeader();
+
+  /**
+   * Method that allows a caller to resign from leadership role. Caller can resign from leadership due to various
+   * reasons such as shutdown, connection failures etc.
+   * This method should clear any state created by the leader and clean-up the resources used by the leader.
+   */
+  void resignLeadership();
+
+  /**
+   * Method that can be used to know if the caller is the current leader or not
+   *
+   * @return True, if the caller is the current leader. False, otherwise
+   */
+  boolean amILeader();
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
new file mode 100644 (file)
index 0000000..28344e9
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import com.google.common.base.Strings;
+import org.apache.samza.SamzaException;
+
+/**
+ * The following ZK hierarchy is maintained for Standalone jobs:
+ * <pre>
+ *   - /
+ *      |- jobName-jobId/
+ *          |- processors/
+ *              |- 00000001
+ *              |- 00000002
+ *              |- ...
+ * </pre>
+ * Note: ZK Node levels without an ending forward slash ('/') represent a leaf node and non-leaf node, otherwise.
+ *
+ * This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
+ */
+public class ZkKeyBuilder {
+  /**
+   * Prefix generated to uniquely identify a particular deployment of a job.
+   * TODO: For now, it looks like $jobName-$jobId. We need to add a unique deployment/attempt identifier as well.
+   */
+  private final String pathPrefix;
+
+  static final String PROCESSORS_PATH = "processors";
+  static final String PROCESSOR_ID_PREFIX = "processor-";
+
+  public ZkKeyBuilder(String pathPrefix) {
+    if (Strings.isNullOrEmpty(pathPrefix)) {
+      throw new SamzaException("Zk PathPrefix cannot be null or empty!");
+    }
+    this.pathPrefix = pathPrefix.trim();
+  }
+
+  public String getProcessorsPath() {
+    return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
+  }
+
+  /**
+   * Static method that helps parse the processorId substring from the ZK path
+   *
+   * Processor ID is prefixed by "processor-" and is an leaf node in ZK tree. Hence, this pattern is used to extract
+   * the processorId.
+   *
+   * @param path Full ZK path of a registered processor
+   * @return String representing the processor ID
+   */
+  public static String parseIdFromPath(String path) {
+    if (!Strings.isNullOrEmpty(path))
+      return path.substring(path.lastIndexOf("/") + 1);
+    return null;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
new file mode 100644 (file)
index 0000000..8cdf8fc
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.leaderelection.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ * An implementation of Leader Elector using Zookeeper.
+ *
+ * Each participant in the leader election process creates an instance of this class and tries to become the leader.
+ * The participant with the lowest sequence number in the ZK subtree for election becomes the leader. Every non-leader
+ * sets a watcher on its predecessor, where the predecessor is the participant with the largest sequence number
+ * that is less than the current participant's sequence number.
+ * </p>
+ * */
+public class ZkLeaderElector implements LeaderElector {
+  public static final Logger LOGGER = LoggerFactory.getLogger(ZkLeaderElector.class);
+  private final ZkUtils zkUtils;
+  private final String processorIdStr;
+  private final ZkKeyBuilder keyBuilder;
+  private final String hostName;
+
+  private AtomicBoolean isLeader = new AtomicBoolean(false);
+  private final IZkDataListener zkLeaderElectionListener;
+  private String currentSubscription = null;
+  private final Random random = new Random();
+
+  @VisibleForTesting
+  ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) {
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.zkLeaderElectionListener = leaderElectionListener;
+    this.keyBuilder = this.zkUtils.getKeyBuilder();
+    this.hostName = getHostName();
+  }
+
+  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
+    this.zkLeaderElectionListener = new ZkLeaderElectionListener();
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.keyBuilder = this.zkUtils.getKeyBuilder();
+    this.hostName = getHostName();
+  }
+
+  // TODO: This should go away once we integrate with Zk based Job Coordinator
+  private String getHostName() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOGGER.error("Failed to fetch hostname of the processor", e);
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public boolean tryBecomeLeader() {
+    String currentPath = zkUtils.registerProcessorAndGetId(hostName);
+
+    List<String> children = zkUtils.getSortedActiveProcessors();
+    LOGGER.debug(zLog("Current active processors - " + children));
+    int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
+
+    if (children.size() == 0 || index == -1) {
+      throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
+    }
+
+    if (index == 0) {
+      isLeader.getAndSet(true);
+      LOGGER.info(zLog("Eligible to become the leader!"));
+      return true;
+    }
+
+    isLeader.getAndSet(false);
+    LOGGER.info("Index = " + index + " Not eligible to be a leader yet!");
+    String predecessor = children.get(index - 1);
+    if (!predecessor.equals(currentSubscription)) {
+      if (currentSubscription != null) {
+        LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
+        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+      }
+      currentSubscription = predecessor;
+      LOGGER.info(zLog("Subscribing data change for " + predecessor));
+      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+    }
+    /**
+     * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
+     * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't
+     * exist during subscription, it is not going to get created in the future.
+     */
+    boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
+    if (predecessorExists) {
+      LOGGER.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
+    } else {
+      try {
+        Thread.sleep(random.nextInt(1000));
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+      }
+      LOGGER.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
+      return tryBecomeLeader();
+    }
+    return false;
+  }
+
+  @Override
+  public void resignLeadership() {
+    isLeader.compareAndSet(true, false);
+  }
+
+  @Override
+  public boolean amILeader() {
+    return isLeader.get();
+  }
+
+  private String zLog(String logMessage) {
+    return String.format("[Processor-%s] %s", processorIdStr, logMessage);
+  }
+
+  // Only by non-leaders
+  class ZkLeaderElectionListener implements IZkDataListener {
+
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception {
+      LOGGER.debug("Data change on path: " + dataPath + " Data: " + data);
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      LOGGER.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
+      tryBecomeLeader();
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
new file mode 100644 (file)
index 0000000..d0a269d
--- /dev/null
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Util class to help manage Zk connection and ZkClient.
+ * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
+ *
+ * <p>
+ *  <b>Note on ZkClient:</b>
+ *  {@link ZkClient} consists of two threads - I/O thread and Event thread.
+ *  I/O thread manages heartbeats to the Zookeeper server in the ensemble and handles responses to synchronous methods
+ *  in Zookeeper API.
+ *  Event thread typically receives all the Watcher events and delivers to registered listeners. It, also, handles
+ *  responses to asynchronous methods in Zookeeper API.
+ * </p>
+ *
+ * <p>
+ *   <b>Note on Session Management:</b>
+ *   Session management, if needed, should be handled by the caller. This can be done by implementing
+ *   {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
+ *   callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking
+ *   processing in the callbacks.
+ * </p>
+ */
+public class ZkUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
+
+  private final ZkClient zkClient;
+  private volatile String ephemeralPath = null;
+  private final ZkKeyBuilder keyBuilder;
+  private final int connectionTimeoutMs;
+
+  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+    this.keyBuilder = zkKeyBuilder;
+    this.connectionTimeoutMs = connectionTimeoutMs;
+    this.zkClient = zkClient;
+  }
+
+  public void connect() throws ZkInterruptedException {
+    boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
+    if (!isConnected) {
+      throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
+    }
+  }
+
+  public static ZkConnection createZkConnection(String zkConnectString, int sessionTimeoutMs) {
+    return new ZkConnection(zkConnectString, sessionTimeoutMs);
+  }
+
+  public static ZkClient createZkClient(ZkConnection zkConnection, int connectionTimeoutMs) {
+    return new ZkClient(zkConnection, connectionTimeoutMs);
+  }
+
+  ZkClient getZkClient() {
+    return zkClient;
+  }
+
+  public ZkKeyBuilder getKeyBuilder() {
+    return keyBuilder;
+  }
+
+  /**
+   * Returns a ZK generated identifier for this client.
+   * If the current client is registering for the first time, it creates an ephemeral sequential node in the ZK tree
+   * If the current client has already registered and is still within the same session, it returns the already existing
+   * value for the ephemeralPath
+   *
+   * @param data Object that should be written as data in the registered ephemeral ZK node
+   * @return String representing the absolute ephemeralPath of this client in the current session
+   */
+  public synchronized String registerProcessorAndGetId(final Object data) {
+    if (ephemeralPath == null) {
+      // TODO: Data should be more than just the hostname. Use Json serialized data
+      ephemeralPath =
+          zkClient.createEphemeralSequential(
+              keyBuilder.getProcessorsPath() + "/", data);
+      return ephemeralPath;
+    } else {
+      return ephemeralPath;
+    }
+  }
+
+  public synchronized String getEphemeralPath() {
+    return ephemeralPath;
+  }
+
+  /**
+   * Method is used to get the <i>sorted</i> list of currently active/registered processors
+   *
+   * @return List of absolute ZK node paths
+   */
+  public List<String> getSortedActiveProcessors() {
+    List<String> children = zkClient.getChildren(keyBuilder.getProcessorsPath());
+    if (children.size() > 0) {
+      Collections.sort(children);
+      LOG.info("Found these children - " + children);
+    }
+    return children;
+  }
+
+  /* Wrapper for standard I0Itec methods */
+  public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
+    zkClient.unsubscribeDataChanges(path, dataListener);
+  }
+
+  public void subscribeDataChanges(String path, IZkDataListener dataListener) {
+    zkClient.subscribeDataChanges(path, dataListener);
+  }
+
+  public boolean exists(String path) {
+    return zkClient.exists(path);
+  }
+
+  public void close() throws ZkInterruptedException {
+    zkClient.close();
+  }
+}
index 85f4df0..7f5d05d 100644 (file)
@@ -214,7 +214,6 @@ object JobModelManager extends Logging {
 
     // Generate the jobModel
     def jobModelGenerator(): JobModel = refreshJobModel(config,
-                                                        allSystemStreamPartitions,
                                                         groups,
                                                         previousChangelogMapping,
                                                         localityManager)
@@ -247,7 +246,6 @@ object JobModelManager extends Logging {
    * refresh. Hence, there is no need for synchronization as before.
    */
   private def refreshJobModel(config: Config,
-                              allSystemStreamPartitions: util.Set[SystemStreamPartition],
                               groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
                               previousChangelogMapping: util.Map[TaskName, Integer],
                               localityManager: LocalityManager): JobModel = {
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java b/samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java
new file mode 100644 (file)
index 0000000..bd0a2d1
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class EmbeddedZookeeper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedZookeeper.class);
+
+  private static final String SNAPSHOT_DIR_RELATIVE_PATH = "zk/snapshot";
+  private static final String LOG_DIR_RELATIVE_PATH = "zk/log";
+  private static final int TICK_TIME = 500;
+  private static final int MAX_CLIENT_CONNECTIONS = 1024;
+  private static final int RANDOM_PORT = 0;
+
+  private ZooKeeperServer zooKeeperServer = null;
+  private ServerCnxnFactory serverCnxnFactory = null;
+  private File snapshotDir = null;
+  private File logDir = null;
+
+  public void setup() {
+    try {
+      snapshotDir = FileUtil.createFileInTempDir(SNAPSHOT_DIR_RELATIVE_PATH);
+      logDir = FileUtil.createFileInTempDir(LOG_DIR_RELATIVE_PATH);
+    } catch (IOException e) {
+      LOGGER.error("Failed to setup Zookeeper Server Environment", e);
+      Assert.fail("Failed to setup Zookeeper Server Environment");
+    }
+
+    try {
+      zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, TICK_TIME);
+      serverCnxnFactory = NIOServerCnxnFactory.createFactory();
+      InetSocketAddress addr = new InetSocketAddress("127.0.0.1", RANDOM_PORT);
+      serverCnxnFactory.configure(addr, MAX_CLIENT_CONNECTIONS);
+
+      serverCnxnFactory.startup(zooKeeperServer);
+    } catch (Exception e) {
+      LOGGER.error("Zookeeper Server failed to start", e);
+      Assert.fail("Zookeeper Server failed to start");
+    }
+  }
+
+  public void teardown() {
+    serverCnxnFactory.shutdown();
+
+    try {
+      serverCnxnFactory.join();
+    } catch (InterruptedException e) {
+      LOGGER.warn("Zookeeper server may not have terminated cleanly!", e);
+    }
+
+    try {
+      FileUtil.deleteDir(snapshotDir);
+      FileUtil.deleteDir(logDir);
+    } catch (FileNotFoundException | NullPointerException e) {
+      LOGGER.warn("Zookeeper Server Environment Cleanup failed!", e);
+    }
+  }
+
+  public int getPort() {
+    return zooKeeperServer.getClientPort();
+  }
+
+  public static void main(String[] args) {
+    EmbeddedZookeeper zk = new EmbeddedZookeeper();
+    zk.setup();
+    System.out.println("Zk Server Started!!");
+    try {
+      Thread.sleep(10000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    zk.teardown();
+  }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java b/samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java
new file mode 100644 (file)
index 0000000..b33a9fa
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class FileUtil {
+  public static final String TMP_DIR = System.getProperty("java.io.tmpdir");
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
+
+  private FileUtil() {}
+
+  /**
+   * Creates a file, along with any parents (if any), in the system-specific Java temporary directory
+   * If the file already exists, this method simply returns
+   *
+   * @param path Path relative to the temporary directory
+   * @return True, if the file was created successfully, along with parent files (if any)
+   * @throws IOException
+   */
+  static File createFileInTempDir(String path) throws IOException {
+    if (path == null || path.isEmpty()) {
+      throw new RuntimeException("Unable to create file - Null or empty path!");
+    }
+    File file = new File(TMP_DIR, path);
+    if (!file.exists()) {
+      if (!file.mkdirs()) {
+        throw new IOException("Failed to create file");
+      }
+    }
+    return file;
+  }
+
+  /**
+   * Deletes a given {@link File}, if it exists. If it doesn't exist, it throws a {@link FileNotFoundException}
+   * If the given {@link File} is a directory, it recursively deletes the files in the directory, before deleting the
+   * directory itself.
+   *
+   * @param path Reference to the {@link File} to be deleted
+   * @return True, if it successfully deleted the given {@link File}. False, otherwise.
+   * @throws FileNotFoundException When the given {@link File} does not exist
+   * @throws NullPointerException When the given {@link File} reference is null
+   */
+  static boolean deleteDir(File path) throws FileNotFoundException, NullPointerException {
+    if (path == null) {
+      throw new NullPointerException("Path cannot be null!");
+    }
+    if (!path.exists()) {
+      throw new FileNotFoundException("File not found: " + path);
+    }
+    boolean result = true;
+
+    if (path.isDirectory()) {
+      for (File f: path.listFiles()) {
+        result = result & deleteDir(f);
+      }
+    }
+    return result && path.delete();
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
new file mode 100644 (file)
index 0000000..e04f7c9
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZkKeyBuilder {
+
+  @Test
+  public void pathPrefixCannotBeNullOrEmpty() {
+    try {
+      new ZkKeyBuilder("");
+      Assert.fail("Key Builder was created with empty path prefix!");
+      new ZkKeyBuilder(null);
+      Assert.fail("Key Builder was created with null path prefix!");
+    } catch (SamzaException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testProcessorsPath() {
+    ZkKeyBuilder builder = new ZkKeyBuilder("test");
+    Assert.assertEquals("/test/" + ZkKeyBuilder.PROCESSORS_PATH, builder.getProcessorsPath());
+  }
+
+  @Test
+  public void testParseIdFromPath() {
+    Assert.assertEquals(
+        ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1",
+        ZkKeyBuilder.parseIdFromPath("/test/processors/" + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1"));
+    Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
+    Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
new file mode 100644 (file)
index 0000000..b999ec5
--- /dev/null
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZkLeaderElector {
+
+  private static EmbeddedZookeeper zkServer = null;
+  private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
+  private String testZkConnectionString = null;
+  private ZkUtils testZkUtils = null;
+  private static final int SESSION_TIMEOUT_MS = 20000;
+  private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+  @BeforeClass
+  public static void setup() throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+  }
+
+  @Before
+  public void testSetup() {
+    testZkConnectionString = "localhost:" + zkServer.getPort();
+    try {
+      testZkUtils = getZkUtilsWithNewClient();
+    } catch (Exception e) {
+      Assert.fail("Client connection setup failed. Aborting tests..");
+    }
+    try {
+      testZkUtils.getZkClient().createPersistent(KEY_BUILDER.getProcessorsPath(), true);
+    } catch (ZkNodeExistsException e) {
+      // Do nothing
+    }
+  }
+
+
+  @After
+  public void testTeardown() {
+    testZkUtils.close();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testLeaderElectionRegistersProcessor() {
+    List<String> activeProcessors = new ArrayList<String>() {
+      {
+        add("0000000000");
+      }
+    };
+
+    ZkUtils mockZkUtils = mock(ZkUtils.class);
+    when(mockZkUtils.registerProcessorAndGetId(any())).
+        thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
+    when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
+
+    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils);
+    Assert.assertTrue(leaderElector.tryBecomeLeader());
+  }
+
+  @Test
+  public void testUnregisteredProcessorInLeaderElection() {
+    String processorId = "1";
+    ZkUtils mockZkUtils = mock(ZkUtils.class);
+    when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
+
+    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils);
+    try {
+      leaderElector.tryBecomeLeader();
+      Assert.fail("Was expecting leader election to fail!");
+    } catch (SamzaException e) {
+      // No-op Expected
+    }
+  }
+
+  /**
+   * Test starts 3 processors and verifies the state of the Zk tree after all processors participate in LeaderElection
+   */
+  @Test
+  public void testLeaderElection() {
+    // Processor-1
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
+        "1",
+        zkUtils1);
+
+    // Processor-2
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
+        "2",
+        zkUtils2);
+
+    // Processor-3
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
+        "3",
+        zkUtils3);
+
+    Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
+
+    Assert.assertTrue(leaderElector1.tryBecomeLeader());
+    Assert.assertFalse(leaderElector2.tryBecomeLeader());
+    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+
+    Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
+
+    // Clean up
+    zkUtils1.close();
+    zkUtils2.close();
+    zkUtils3.close();
+
+    Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessors());
+
+  }
+
+  /**
+   * Tests that Leader Failure automatically promotes the next successor to become the leader
+   */
+  @Test
+  public void testLeaderFailure() {
+    /**
+     * electionLatch and count together verify that:
+     * 1. the registered listeners are actually invoked by the ZkClient on the correct path
+     * 2. for a single participant failure, at-most 1 other participant is notified
+     */
+    final CountDownLatch electionLatch = new CountDownLatch(1);
+    final AtomicInteger count = new AtomicInteger(0);
+
+    // Processor-1
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    zkUtils1.registerProcessorAndGetId("processor1");
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
+        "1",
+        zkUtils1,
+        new IZkDataListener() {
+          @Override
+          public void handleDataChange(String dataPath, Object data) throws Exception {
+
+          }
+
+          @Override
+          public void handleDataDeleted(String dataPath) throws Exception {
+            count.incrementAndGet();
+          }
+        });
+
+
+    // Processor-2
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    final String path2 = zkUtils2.registerProcessorAndGetId("processor2");
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
+        "2",
+        zkUtils2,
+        new IZkDataListener() {
+          @Override
+          public void handleDataChange(String dataPath, Object data) throws Exception {
+
+          }
+
+          @Override
+          public void handleDataDeleted(String dataPath) throws Exception {
+            String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
+            Assert.assertNotNull(registeredIdStr);
+
+            String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+            Assert.assertNotNull(predecessorIdStr);
+
+            try {
+              int selfId = Integer.parseInt(registeredIdStr);
+              int predecessorId = Integer.parseInt(predecessorIdStr);
+              Assert.assertEquals(1, selfId - predecessorId);
+            } catch (Exception e) {
+              System.out.println(e.getMessage());
+            }
+            count.incrementAndGet();
+            electionLatch.countDown();
+          }
+        });
+
+    // Processor-3
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    zkUtils3.registerProcessorAndGetId("processor3");
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
+        "3",
+        zkUtils3,
+        new IZkDataListener() {
+          @Override
+          public void handleDataChange(String dataPath, Object data) throws Exception {
+
+          }
+
+          @Override
+          public void handleDataDeleted(String dataPath) throws Exception {
+            count.incrementAndGet();
+          }
+        });
+
+    // Join Leader Election
+    Assert.assertTrue(leaderElector1.tryBecomeLeader());
+    Assert.assertFalse(leaderElector2.tryBecomeLeader());
+    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+
+    Assert.assertTrue(leaderElector1.amILeader());
+    Assert.assertFalse(leaderElector2.amILeader());
+    Assert.assertFalse(leaderElector3.amILeader());
+
+    List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
+    Assert.assertEquals(3, currentActiveProcessors.size());
+
+    // Leader Failure
+    zkUtils1.close();
+    currentActiveProcessors.remove(0);
+
+    try {
+      Assert.assertTrue(electionLatch.await(5, TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      Assert.fail("Interrupted while waiting for leaderElection listener callback to complete!");
+    }
+
+    Assert.assertEquals(1, count.get());
+    Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors());
+
+    // Clean up
+    zkUtils2.close();
+    zkUtils3.close();
+  }
+
+  /**
+   * Tests that a non-leader failure updates the Zk tree and participants' state correctly
+   */
+  @Test
+  public void testNonLeaderFailure() {
+    /**
+     * electionLatch and count together verify that:
+     * 1. the registered listeners are actually invoked by the ZkClient on the correct path
+     * 2. for a single participant failure, at-most 1 other participant is notified
+     */
+    final CountDownLatch electionLatch = new CountDownLatch(1);
+    final AtomicInteger count = new AtomicInteger(0);
+
+    // Processor-1
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    zkUtils1.registerProcessorAndGetId("processor1");
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
+        "1",
+        zkUtils1,
+        new IZkDataListener() {
+          @Override
+          public void handleDataChange(String dataPath, Object data) throws Exception {
+
+          }
+
+          @Override
+          public void handleDataDeleted(String dataPath) throws Exception {
+            count.incrementAndGet();
+          }
+        });
+
+    // Processor-2
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    zkUtils2.registerProcessorAndGetId("processor2");
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
+        "2",
+        zkUtils2,
+        new IZkDataListener() {
+          @Override
+          public void handleDataChange(String dataPath, Object data) throws Exception {
+
+          }
+
+          @Override
+          public void handleDataDeleted(String dataPath) throws Exception {
+            count.incrementAndGet();
+          }
+        });
+
+    // Processor-3
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    final String path3 = zkUtils3.registerProcessorAndGetId("processor3");
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
+        "3",
+        zkUtils3,
+        new IZkDataListener() {
+          @Override
+          public void handleDataChange(String dataPath, Object data) throws Exception {
+
+          }
+
+          @Override
+          public void handleDataDeleted(String dataPath) throws Exception {
+            String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
+            Assert.assertNotNull(registeredIdStr);
+
+            String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+            Assert.assertNotNull(predecessorIdStr);
+
+            try {
+              int selfId = Integer.parseInt(registeredIdStr);
+              int predecessorId = Integer.parseInt(predecessorIdStr);
+              Assert.assertEquals(1, selfId - predecessorId);
+            } catch (Exception e) {
+              Assert.fail("Exception in LeaderElectionListener!");
+            }
+            count.incrementAndGet();
+            electionLatch.countDown();
+          }
+        });
+
+    // Join Leader Election
+    Assert.assertTrue(leaderElector1.tryBecomeLeader());
+    Assert.assertFalse(leaderElector2.tryBecomeLeader());
+    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+
+    List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
+    Assert.assertEquals(3, currentActiveProcessors.size());
+
+    zkUtils2.close();
+    currentActiveProcessors.remove(1);
+
+    try {
+      Assert.assertTrue(electionLatch.await(5, TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      Assert.fail("Interrupted while waiting for leaderElection listener callback to complete!");
+    }
+
+    Assert.assertEquals(1, count.get());
+    Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors());
+
+    // Clean up
+    zkUtils1.close();
+    zkUtils3.close();
+  }
+
+  @Test
+  public void testAmILeader() {
+    // Processor-1
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
+        "1",
+        getZkUtilsWithNewClient());
+
+    // Processor-2
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
+        "2",
+        getZkUtilsWithNewClient());
+
+    // Before Leader Election
+    Assert.assertFalse(leaderElector1.amILeader());
+    Assert.assertFalse(leaderElector2.amILeader());
+
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+
+    // After Leader Election
+    Assert.assertTrue(leaderElector1.amILeader());
+    Assert.assertFalse(leaderElector2.amILeader());
+  }
+
+  private ZkUtils getZkUtilsWithNewClient() {
+    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    return new ZkUtils(
+        KEY_BUILDER,
+        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        CONNECTION_TIMEOUT_MS);
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
new file mode 100644 (file)
index 0000000..855d29d
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestZkUtils {
+  private static EmbeddedZookeeper zkServer = null;
+  private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
+  private ZkConnection zkConnection = null;
+  private ZkClient zkClient = null;
+  private static final int SESSION_TIMEOUT_MS = 20000;
+  private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+  @BeforeClass
+  public static void setup() throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+  }
+
+  @Before
+  public void testSetup() {
+    try {
+      zkClient = new ZkClient(
+          new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
+          CONNECTION_TIMEOUT_MS);
+    } catch (Exception e) {
+      Assert.fail("Client connection setup failed. Aborting tests..");
+    }
+    try {
+      zkClient.createPersistent(KEY_BUILDER.getProcessorsPath(), true);
+    } catch (ZkNodeExistsException e) {
+      // Do nothing
+    }
+  }
+
+
+  @After
+  public void testTeardown() {
+    zkClient.close();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testRegisterProcessorId() {
+    ZkUtils utils = new ZkUtils(
+        KEY_BUILDER,
+        zkClient,
+        SESSION_TIMEOUT_MS);
+    utils.connect();
+    String assignedPath = utils.registerProcessorAndGetId("0.0.0.0");
+    Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
+
+    // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
+    Assert.assertTrue(utils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
+
+    utils.close();
+  }
+
+  @Test
+  public void testGetActiveProcessors() {
+    ZkUtils utils = new ZkUtils(
+        KEY_BUILDER,
+        zkClient,
+        SESSION_TIMEOUT_MS);
+    utils.connect();
+
+    Assert.assertEquals(0, utils.getSortedActiveProcessors().size());
+    utils.registerProcessorAndGetId("processorData");
+
+    Assert.assertEquals(1, utils.getSortedActiveProcessors().size());
+
+    utils.close();
+  }
+
+}