SAMZA-1298; Create zk path.
authorBoris Shkolnik <boryas@apache.org>
Mon, 22 May 2017 18:58:53 +0000 (11:58 -0700)
committervjagadish1989 <jvenkatr@linkedin.com>
Mon, 22 May 2017 18:58:53 +0000 (11:58 -0700)
if ZK path contains extra path at the end, it needs to be created in ZK at first connect.

Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Jagadish <jagadish@apache.org>

Closes #197 from sborya/createZkPath

samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java

index 9971732..661650d 100644 (file)
  */
 package org.apache.samza.zk;
 
+import com.google.common.base.Strings;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.zookeeper.client.ConnectStringParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
+  private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
+
   // TODO - Why should this method be synchronized?
   synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
-    ZkClient zkClient;
+
+    ZkClient zkClient = createZkClient(zkConfig.getZkConnect(),
+        zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+
+    // make sure the 'path' exists
+    createZkPath(zkConfig.getZkConnect(), zkClient);
+
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+
+    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
+  }
+
+  /**
+   * helper method to create zkClient
+   * @param connectString - zkConnect string
+   * @param sessionTimeoutMS - session timeout
+   * @param connectionTimeoutMs - connection timeout
+   * @return zkClient object
+   */
+  public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) {
     try {
-      zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+      return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs);
     } catch (Exception e) {
-      throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e);
+      // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based.
+      throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e);
     }
+  }
 
-    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
-    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
+  /**
+   * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time.
+   * @param zkConnect - connect string
+   * @param zkClient - zkClient object to talk to the ZK
+   */
+  public static void createZkPath(String zkConnect, ZkClient zkClient) {
+    ConnectStringParser parser = new ConnectStringParser(zkConnect);
+
+    String path = parser.getChrootPath();
+    LOG.info("path =" + path);
+    if (!Strings.isNullOrEmpty(path)) {
+      // create this path in zk
+      LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses());
+      if (!zkClient.exists(path)) {
+        zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists
+      }
+    }
   }
 
 }
index 5c8fcf3..c547901 100644 (file)
 
 package org.apache.samza.zk;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -32,12 +37,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Util class to help manage Zk connection and ZkClient.
  * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -84,10 +83,6 @@ public class ZkUtils {
     return new ZkConnection(zkConnectString, sessionTimeoutMs);
   }
 
-  public static ZkClient createZkClient(ZkConnection zkConnection, int connectionTimeoutMs) {
-    return new ZkClient(zkConnection, connectionTimeoutMs);
-  }
-
   ZkClient getZkClient() {
     return zkClient;
   }
index 7cfad61..393d733 100644 (file)
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
@@ -32,12 +37,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -433,10 +432,10 @@ public class TestZkLeaderElector {
   }
 
   private ZkUtils getZkUtilsWithNewClient() {
-    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        zkClient,
         CONNECTION_TIMEOUT_MS);
   }
 }
index 2385b32..9f089a0 100644 (file)
  */
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.ZkConnection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
@@ -28,13 +34,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * The ZkProcessorLatch uses a shared Znode as a latch. Each participant await existence of a target znode under the
  * shared latch, which is a persistent, sequential target znode with value (latchSize - 1). latchSize is the minimum
@@ -215,10 +214,11 @@ public class TestZkProcessorLatch {
 
   }
   private ZkUtils getZkUtilsWithNewClient(String processorId) {
-    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    ZkClient zkClient = ZkCoordinationServiceFactory
+        .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        zkClient,
         CONNECTION_TIMEOUT_MS);
   }
 }
index 63e2361..173b8a6 100644 (file)
  */
 package org.apache.samza.zk;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -35,10 +38,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
-
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -87,6 +86,26 @@ public class TestZkUtils {
     zkServer.teardown();
   }
 
+
+  @Test
+  public void testInitZkPath() {
+    String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1";
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/samza1"));
+
+    zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2";
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/samza1/samza2"));
+
+
+    zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path.
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/"));
+  }
+
   @Test
   public void testRegisterProcessorId() {
     String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));