Introduces CoordinationUtilsFactory to create different implementations of Coordinati...
authorBoris Shkolnik <boryas@apache.org>
Tue, 29 Aug 2017 20:23:47 +0000 (13:23 -0700)
committernavina <navina@apache.org>
Tue, 29 Aug 2017 20:23:47 +0000 (13:23 -0700)
Some refactoring and cleanup.

Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #282 from sborya/CoordinationUtilsFactory

16 files changed:
build.gradle
samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtilsFactory.java [moved from samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java with 83% similarity]
samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtilsFactory.java [moved from samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java with 67% similarity]
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java [moved from samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java with 92% similarity]
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index 16fe2cf..319e51f 100644 (file)
@@ -155,7 +155,9 @@ project(":samza-core_$scalaVersion") {
 
   dependencies {
     compile project(':samza-api')
-    compile "com.101tec:zkclient:$zkClientVersion"
+    compile("com.101tec:zkclient:$zkClientVersion") {
+      exclude module: 'junit:junit'
+    }
     compile "com.google.guava:guava:$guavaVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
index f76dde3..dbd945f 100644 (file)
@@ -36,10 +36,6 @@ public class AzureCoordinationUtils implements CoordinationUtils {
   }
 
   @Override
-  public void reset() {}
-
-
-  @Override
   public LeaderElector getLeaderElector() throws UnsupportedOperationException {
     return null;
   }
@@ -49,6 +45,11 @@ public class AzureCoordinationUtils implements CoordinationUtils {
     return null;
   }
 
+  @Override
+  public void close() {
+
+  }
+
   public DistributedLock getLock(String initLockName) {
     BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
         azureConfig.getAzureBlobName() + initLockName, azureConfig.getAzureBlobLength());
@@ -21,10 +21,10 @@ package org.apache.samza.coordinator;
 
 import org.apache.samza.config.Config;
 
-public class AzureCoordinationServiceFactory implements CoordinationServiceFactory {
+public class AzureCoordinationUtilsFactory implements CoordinationUtilsFactory {
 
   @Override
-  public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+  public CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) {
     return new AzureCoordinationUtils(config);
   }
 }
index 57632ca..a04038a 100644 (file)
 package org.apache.samza.config;
 
 import com.google.common.base.Strings;
+import org.apache.samza.SamzaException;
+import org.apache.samza.zk.ZkCoordinationUtilsFactory;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
+
 
 public class JobCoordinatorConfig extends MapConfig {
   public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
+  public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory";
+  public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName();
 
   public JobCoordinatorConfig(Config config) {
     super(config);
   }
 
+  public String getJobCoordinationUtilsFactoryClassName() {
+    String jobCoordinatorFactoryClassName = get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");
+
+    String className = get(JOB_COORDINATION_UTILS_FACTORY, "");
+
+    if (!Strings.isNullOrEmpty(className)) {
+      return className;
+    }
+
+    // TODO: we will need a better way to package the configs with application runner
+    if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
+      return DEFAULT_COORDINATION_UTILS_FACTORY;
+    }
+
+    throw new SamzaException("Cannot determine which CoordinationUtilsFactory to load");
+  }
+
   public String getJobCoordinatorFactoryClassName() {
     String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
     if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
index 150b3d4..4ba44b5 100644 (file)
@@ -31,14 +31,13 @@ import org.apache.samza.annotation.InterfaceStability;
 @InterfaceStability.Evolving
 public interface CoordinationUtils {
 
-  /**
-   * reset the internal structure. Does not happen automatically with stop()
-   */
-  void reset();
-
-
   // facilities for group coordination
   LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId
 
   Latch getLatch(int size, String latchId);
+
+  /**
+   * performs necessary cleanup and closes ALL the utils.
+   */
+  void close();
 }
 package org.apache.samza.coordinator;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.util.*;
 
 
 /**
  * factory to instantiate a c{@link CoordinationUtils} service
  */
-public interface CoordinationServiceFactory {
+public interface CoordinationUtilsFactory {
+
+  public static CoordinationUtilsFactory getCoordinationUtilsFactory(Config config) {
+    // load the class
+    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+    String coordinationUtilsFactoryClass =   jcConfig.getJobCoordinationUtilsFactoryClassName();
+
+    return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
+  }
+
   /**
    * get a unique service instance
    * @param groupId - unique id to identify the service
@@ -32,5 +43,5 @@ public interface CoordinationServiceFactory {
    * @param updatedConfig - configs, to define the details of the service
    * @return a unique service instance
    */
-  CoordinationUtils getCoordinationService(String groupId, String participantId, Config updatedConfig);
+  CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config updatedConfig);
 }
index 588e657..fc11cf5 100644 (file)
@@ -34,9 +34,9 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.execution.ExecutionPlan;
@@ -47,8 +47,6 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
-import org.apache.samza.zk.ZkCoordinationServiceFactory;
-import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -207,23 +205,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   /**
-   * Create the {@link CoordinationUtils} needed by the application runner, or null if it's not configured.
-   * @return an instance of {@link CoordinationUtils}
-   */
-  /* package private */ CoordinationUtils createCoordinationUtils() {
-    String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");
-
-    // TODO: we will need a better way to package the configs with application runner
-    if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
-      ApplicationConfig appConfig = new ApplicationConfig(config);
-      return new ZkCoordinationServiceFactory().getCoordinationService(
-          appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config);
-    } else {
-      return null;
-    }
-  }
-
-  /**
    * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}.
    * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
    * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
@@ -236,7 +217,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     if (!intStreams.isEmpty()) {
       // Move the scope of coordination utils within stream creation to address long idle connection problem.
       // Refer SAMZA-1385 for more details
-      CoordinationUtils coordinationUtils = createCoordinationUtils();
+
+      String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX;
+      CoordinationUtils coordinationUtils =
+          CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
       if (coordinationUtils != null) {
         Latch initLatch = coordinationUtils.getLatch(1, planId);
 
@@ -252,7 +236,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
             initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
           }
         } finally {
-          coordinationUtils.reset();
+          if (initLatch != null)
+            coordinationUtils.close();
         }
       } else {
         // each application process will try creating the streams, which
index f5dda2e..05886db 100644 (file)
@@ -41,16 +41,6 @@ public class ZkCoordinationUtils implements CoordinationUtils {
   }
 
   @Override
-  public void reset() {
-    try {
-      zkUtils.close();
-    } catch (ZkInterruptedException ex) {
-      // Swallowing due to occurrence in the last stage of lifecycle(Not actionable).
-      LOG.error("Exception in reset: ", ex);
-    }
-  }
-
-  @Override
   public LeaderElector getLeaderElector() {
     return new ZkLeaderElector(processorIdStr, zkUtils);
   }
@@ -60,6 +50,17 @@ public class ZkCoordinationUtils implements CoordinationUtils {
     return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils);
   }
 
+  @Override
+  public void close() {
+    try {
+      if (zkUtils != null)
+        zkUtils.close();
+    } catch (ZkInterruptedException ex) {
+      // Swallowing due to occurrence in the last stage of lifecycle(Not actionable).
+      LOG.error("Exception in close(): ", ex);
+    }
+  }
+
   // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer
   public ZkUtils getZkUtils() {
     return zkUtils;
@@ -23,7 +23,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.zookeeper.client.ConnectStringParser;
@@ -31,10 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
+public class ZkCoordinationUtilsFactory implements CoordinationUtilsFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtilsFactory.class);
 
-  public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+  public CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
 
     ZkClient zkClient =
index 563bf4c..c967a21 100644 (file)
@@ -56,7 +56,8 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
     ZkConfig zkConfig = new ZkConfig(config);
     ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config));
-    ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+    ZkClient zkClient = ZkCoordinationUtilsFactory
+        .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
     return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
   }
 
index 77defa4..6e8236e 100644 (file)
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
  * When Nth node is created await() call returns.
  */
 public class ZkProcessorLatch implements Latch {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ZkProcessorLatch.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZkProcessorLatch.class);
 
   private final ZkUtils zkUtils;
   private final String participantId;
@@ -50,7 +50,7 @@ public class ZkProcessorLatch implements Latch {
     zkUtils.validatePaths(new String[] {latchPath});
     targetPath =  String.format("%s/%010d", latchPath, size - 1);
 
-    LOGGER.debug("ZkProcessorLatch targetPath " + targetPath);
+    LOG.debug("ZkProcessorLatch targetPath " + targetPath);
   }
 
   @Override
@@ -66,8 +66,8 @@ public class ZkProcessorLatch implements Latch {
 
   @Override
   public void countDown() {
-    // create persistent (should be ephemeral? Probably not)
+    // create persistent node
     String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
-    LOGGER.debug("ZKProcessorLatch countDown created " + path);
+    LOG.debug("ZKProcessorLatch countDown created " + path);
   }
 }
index ecacf25..38f58fd 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
@@ -45,35 +46,35 @@ import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.processor.StreamProcessorLifecycleListener;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(CoordinationUtilsFactory.class)
 public class TestLocalApplicationRunner {
 
-  private static final String PLAN_JSON = "{"
-      + "\"jobs\":[{"
-      + "\"jobName\":\"test-application\","
-      + "\"jobId\":\"1\","
-      + "\"operatorGraph\":{"
-      + "\"intermediateStreams\":{%s},"
-      + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
-  private static final String STREAM_SPEC_JSON_FORMAT = "\"%s\":{"
-      + "\"streamSpec\":{"
-      + "\"id\":\"%s\","
-      + "\"systemName\":\"%s\","
-      + "\"physicalName\":\"%s\","
-      + "\"partitionCount\":2},"
-      + "\"sourceJobs\":[\"test-app\"],"
-      + "\"targetJobs\":[\"test-target-app\"]},";
+  private static final String PLAN_JSON =
+      "{" + "\"jobs\":[{" + "\"jobName\":\"test-application\"," + "\"jobId\":\"1\"," + "\"operatorGraph\":{"
+          + "\"intermediateStreams\":{%s}," + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
+  private static final String STREAM_SPEC_JSON_FORMAT =
+      "\"%s\":{" + "\"streamSpec\":{" + "\"id\":\"%s\"," + "\"systemName\":\"%s\"," + "\"physicalName\":\"%s\","
+          + "\"partitionCount\":2}," + "\"sourceJobs\":[\"test-app\"]," + "\"targetJobs\":[\"test-target-app\"]},";
 
   @Test
-  public void testStreamCreation() throws Exception {
+  public void testStreamCreation()
+      throws Exception {
     Map<String, String> config = new HashMap<>();
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
     StreamApplication app = mock(StreamApplication.class);
@@ -109,6 +110,10 @@ public class TestLocalApplicationRunner {
     };
     when(planner.plan(anyObject())).thenReturn(plan);
 
+    mockStatic(CoordinationUtilsFactory.class);
+    CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+    when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
+
     LocalApplicationRunner spy = spy(runner);
     try {
       spy.run(app);
@@ -123,7 +128,8 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testStreamCreationWithCoordination() throws Exception {
+  public void testStreamCreationWithCoordination()
+      throws Exception {
     Map<String, String> config = new HashMap<>();
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
     StreamApplication app = mock(StreamApplication.class);
@@ -176,7 +182,8 @@ public class TestLocalApplicationRunner {
       }
 
       @Override
-      public void resignLeadership() {}
+      public void resignLeadership() {
+      }
 
       @Override
       public boolean amILeader() {
@@ -186,6 +193,7 @@ public class TestLocalApplicationRunner {
 
     Latch latch = new Latch() {
       boolean done = false;
+
       @Override
       public void await(long timeout, TimeUnit tu)
           throws TimeoutException {
@@ -200,9 +208,15 @@ public class TestLocalApplicationRunner {
         done = true;
       }
     };
+
+    mockStatic(CoordinationUtilsFactory.class);
+    CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+    when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
+
     when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector);
     when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch);
-    doReturn(coordinationUtils).when(spy).createCoordinationUtils();
+    when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
+        .thenReturn(coordinationUtils);
 
     try {
       spy.run(app);
@@ -217,12 +231,12 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunStreamTask() throws Exception {
+  public void testRunStreamTask()
+      throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     config.put(TaskConfig.TASK_CLASS(), "org.apache.samza.test.processor.IdentityStreamTask");
 
-
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
 
     StreamProcessor sp = mock(StreamProcessor.class);
@@ -243,10 +257,11 @@ public class TestLocalApplicationRunner {
     spy.runTask();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null));
-
   }
+
   @Test
-  public void testRunComplete() throws Exception {
+  public void testRunComplete()
+      throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
@@ -289,7 +304,6 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-
     LocalApplicationRunner spy = spy(runner);
     doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
 
@@ -299,7 +313,8 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunFailure() throws Exception {
+  public void testRunFailure()
+      throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.PROCESSOR_ID, "0");
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
@@ -335,13 +350,13 @@ public class TestLocalApplicationRunner {
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
         ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
 
-    doAnswer(i -> {
+    doAnswer(i ->
+      {
         StreamProcessorLifecycleListener listener = captor.getValue();
         listener.onFailure(t);
         return null;
       }).when(sp).start();
 
-
     LocalApplicationRunner spy = spy(runner);
     doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
 
@@ -360,14 +375,12 @@ public class TestLocalApplicationRunner {
    */
   @Test
   public void testPlanIdWithShuffledStreamSpecs() {
-    List<StreamSpec> streamSpecs = ImmutableList.of(
-      new StreamSpec("test-stream-1", "stream-1", "testStream"),
+    List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
         new StreamSpec("test-stream-2", "stream-2", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
     String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
 
-    List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(
-        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+    List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"),
         new StreamSpec("test-stream-1", "stream-1", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
 
@@ -381,8 +394,7 @@ public class TestLocalApplicationRunner {
    */
   @Test
   public void testGeneratePlanIdWithSameStreamSpecs() {
-    List<StreamSpec> streamSpecs = ImmutableList.of(
-        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+    List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
         new StreamSpec("test-stream-2", "stream-2", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
     String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
@@ -398,14 +410,12 @@ public class TestLocalApplicationRunner {
    */
   @Test
   public void testGeneratePlanIdWithDifferentStreamSpecs() {
-    List<StreamSpec> streamSpecs = ImmutableList.of(
-        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+    List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
         new StreamSpec("test-stream-2", "stream-2", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
     String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
 
-    List<StreamSpec> updatedStreamSpecs = ImmutableList.of(
-        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+    List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
         new StreamSpec("test-stream-4", "stream-4", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
 
@@ -414,9 +424,8 @@ public class TestLocalApplicationRunner {
   }
 
   private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
-    String intermediateStreamJson = updatedStreamSpecs.stream()
-        .map(this::streamSpecToJson)
-        .collect(Collectors.joining(","));
+    String intermediateStreamJson =
+        updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
 
     int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
 
@@ -424,10 +433,7 @@ public class TestLocalApplicationRunner {
   }
 
   private String streamSpecToJson(StreamSpec streamSpec) {
-    return String.format(STREAM_SPEC_JSON_FORMAT,
-        streamSpec.getId(),
-        streamSpec.getId(),
-        streamSpec.getSystemName(),
+    return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(),
         streamSpec.getPhysicalName());
   }
 }
index 010d138..74b9abd 100644 (file)
@@ -433,7 +433,8 @@ public class TestZkLeaderElector {
   }
 
   private ZkUtils getZkUtilsWithNewClient() {
-    ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
+    ZkClient zkClient = ZkCoordinationUtilsFactory
+        .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
         zkClient,
index 3ce203e..1defccb 100644 (file)
@@ -90,7 +90,7 @@ public class TestZkNamespace {
     String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace;
     createNamespace(zkNameSpace);
     initZk(zkConnect);
-    ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+    ZkCoordinationUtilsFactory.validateZkNameSpace(zkConnect, zkClient);
 
     zkClient.createPersistent("/test");
     zkClient.createPersistent("/test/test1");
@@ -106,7 +106,7 @@ public class TestZkNamespace {
     try {
       String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace";
       initZk(zkConnect);
-      ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+      ZkCoordinationUtilsFactory.validateZkNameSpace(zkConnect, zkClient);
       Assert.fail("1.Should fail with exception, because namespace doesn't exist");
     } catch (SamzaException e) {
       // expected
@@ -120,7 +120,7 @@ public class TestZkNamespace {
     try {
       String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz";
       initZk(zkConnect);
-      ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+      ZkCoordinationUtilsFactory.validateZkNameSpace(zkConnect, zkClient);
       Assert.fail("2.Should fail with exception, because namespace doesn't exist");
     } catch (SamzaException e) {
       // expected
@@ -134,7 +134,7 @@ public class TestZkNamespace {
     // should succeed, because no namespace provided
     String zkConnect = "127.0.0.1:" + zkServer.getPort() + "";
     initZk(zkConnect);
-    ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+    ZkCoordinationUtilsFactory.validateZkNameSpace(zkConnect, zkClient);
     tearDownZk();
   }
 
index b2a5533..674287b 100644 (file)
@@ -215,7 +215,7 @@ public class TestZkProcessorLatch {
 
   }
   private ZkUtils getZkUtilsWithNewClient(String processorId) {
-    ZkClient zkClient = ZkCoordinationServiceFactory
+    ZkClient zkClient = ZkCoordinationUtilsFactory
         .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
index 76fd046..fb8f17a 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
 import kafka.admin.AdminUtils;
 import kafka.server.KafkaServer;
 import kafka.utils.TestUtils;
@@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;