SAMZA-1151 - Coordination Service
authorBoris Shkolnik <boryas@apache.org>
Wed, 29 Mar 2017 17:44:49 +0000 (10:44 -0700)
committernavina <navina@apache.org>
Wed, 29 Mar 2017 17:44:49 +0000 (10:44 -0700)
Author: Boris Shkolnik <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Xinyu Liu <xinyuliu.us@apache.org>, Navina Ramesh <navina@apache.org>

Closes #91 from sborya/CoordinationService

31 files changed:
checkstyle/checkstyle-suppressions.xml [new file with mode: 0644]
checkstyle/checkstyle.xml
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java [moved from samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java with 78% similarity]
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java [moved from samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java with 72% similarity]
samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
settings.gradle

diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml
new file mode 100644 (file)
index 0000000..428ac93
--- /dev/null
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+
+<!DOCTYPE suppressions PUBLIC
+     "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+     "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<!--
+     // Licensed to the Apache Software Foundation (ASF) under one or more
+     // contributor license agreements.  See the NOTICE file distributed with
+     // this work for additional information regarding copyright ownership.
+     // The ASF licenses this file to You under the Apache License, Version 2.0
+     // (the "License"); you may not use this file except in compliance with
+     // the License.  You may obtain a copy of the License at
+     //
+     //    http://www.apache.org/licenses/LICENSE-2.0
+     //
+     // Unless required by applicable law or agreed to in writing, software
+     // distributed under the License is distributed on an "AS IS" BASIS,
+     // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     // See the License for the specific language governing permissions and
+     // limitations under the License.
+-->
+
+<suppressions>
+<!-- example
+    <suppress checks="Indentation"
+           files="TestZkProcessorLatch.java"
+           lines="91-275"/>
+           -->
+</suppressions>
+
index 775d674..479896e 100644 (file)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE module PUBLIC
-    "-//Puppy Crawl//DTD Check Configuration 1.3//EN" 
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
      "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
 <!--
 // Licensed to the Apache Software Foundation (ASF) under one or more
@@ -9,29 +9,30 @@
 // The ASF licenses this file to You under the Apache License, Version 2.0
 // (the "License"); you may not use this file except in compliance with
 // the License.  You may obtain a copy of the License at
-// 
+//
 //    http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
---> 
+-->
 <module name="Checker">
   <property name="localeLanguage" value="en"/>
+
   <!-- allow suppression for specific files -->
   <module name="SuppressionCommentFilter"/>
 
   <module name="FileTabCharacter"/>
-  
+
   <!-- header: use one star only -->
   <module name="RegexpHeader">
     <property name="header" value="/\*\nLicensed to the Apache.*"/>
   </module>
-  
+
   <module name="TreeWalker">
-    
+
     <!-- code cleanup -->
     <module name="UnusedImports"/>
     <module name="FileContentsHolder"/>
@@ -42,7 +43,7 @@
     <module name="OneStatementPerLine"/>
     <module name="UnnecessaryParentheses" />
     <module name="SimplifyBooleanReturn"/>
-    
+
     <!-- style -->
     <module name="DefaultComesLast"/>
     <module name="EmptyStatement"/>
@@ -61,7 +62,7 @@
     <module name="ParameterName"/>
     <module name="StaticVariableName"/>
     <module name="TypeName"/>
-    
+
     <!-- whitespace -->
     <module name="GenericWhitespace"/>
     <module name="NoWhitespaceBefore"/>
@@ -82,4 +83,7 @@
     <module name="ParenPad"/>
     <module name="TypecastParenPad"/>
   </module>
+  <module name="SuppressionFilter">
+    <property name="file" value="checkstyle/checkstyle-suppressions.xml"/>
+  </module>
 </module>
index e3839ca..fd1e039 100644 (file)
@@ -284,7 +284,7 @@ public class ConfigManager {
     //killing the current job
     log.info("Killing the current job");
     yarnUtil.killApplication(applicationId);
-    //clear the global variables
+    //reset the global variables
     coordinatorServerURL = null;
 
 
index 946a308..e0c599d 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.base.Strings;
 
 public class JobCoordinatorConfig extends MapConfig {
   public static final String JOB_COORDINATOR_FACTORY = "job-coordinator.factory";
+  public static final String JOB_COORDINATIOIN_SERVICE_FACTORY = "job-coordinationService.factory";
 
   public JobCoordinatorConfig(Config config) {
     super(config);
@@ -37,4 +38,14 @@ public class JobCoordinatorConfig extends MapConfig {
 
     return jobCoordinatorFactoryClassName;
   }
+
+  public String getJobCoordinationServiceFactoryClassName() {
+    String jobCooridanationFactoryClassName = get(JOB_COORDINATIOIN_SERVICE_FACTORY, "org.apache.samza.zk.ZkCoordinationServiceFactory");
+    if (Strings.isNullOrEmpty(jobCooridanationFactoryClassName)) {
+      throw new ConfigException(
+          String.format("config  '%s' is set to empty. Cannot instantiate coordination utils!", JOB_COORDINATOR_FACTORY));
+    }
+
+    return jobCooridanationFactoryClassName;
+  }
 }
index f26b2d9..fc483eb 100644 (file)
@@ -27,6 +27,8 @@ public class ZkConfig extends MapConfig {
 
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
   public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
+  public static final String ZK_BARRIER_TIMEOUT_MS = "coordinator.zk.barrier-timeout-ms";
+  public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000;
 
   public ZkConfig(Config config) {
     super(config);
@@ -46,4 +48,8 @@ public class ZkConfig extends MapConfig {
   public int getZkConnectionTimeoutMs() {
     return getInt(ZK_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS);
   }
+
+  public int getZkBarrierTimeoutMs() {
+    return getInt(ZK_BARRIER_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS);
+  }
 }
  * under the License.
  */
 
-package org.apache.samza.zk;
+package org.apache.samza.coordinator;
+
+import java.util.List;
+
 
 /**
  * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
@@ -26,16 +29,19 @@ package org.apache.samza.zk;
 public interface BarrierForVersionUpgrade {
   /**
    * Barrier is usually started by the leader.
+   * @param version - for which the barrier is created
+   * @param participatns - list of participants that need to join for barrier to complete
    */
-  void start();
+  void start(String version, List<String> participatns);
 
   /**
    * Called by the processor.
    * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
    * joined.
    * The call is async. The callback will be invoked when the barrier is reached.
+   * @param version - for which the barrier waits
    * @param thisProcessorsName as it appears in the list of processors.
    * @param callback  will be invoked, when barrier is reached.
    */
-  void waitForBarrier(String thisProcessorsName, Runnable callback);
+  void waitForBarrier(String version, String thisProcessorsName, Runnable callback);
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java
new file mode 100644 (file)
index 0000000..497d3e0
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * factory to instantiate a c{@link CoordinationUtils} service
+ */
+public interface CoordinationServiceFactory {
+  /**
+   * get a unique service instance
+   * @param groupId - unique id to identify the service
+   * @param participantId - a unique id that identifies the participant in the service
+   * @param updatedConfig - configs, to define the details of the service
+   * @return a unique service instance
+   */
+  CoordinationUtils getCoordinationService(String groupId, String participantId, Config updatedConfig);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
new file mode 100644 (file)
index 0000000..39bda24
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**  THIS API WILL CHANGE
+ *
+ * Coordination service provides synchronization primitives.
+ * The actual implementation (for example ZK based) is left to each implementation class.
+ * This service provide three primitives:
+ *   - LeaderElection
+ *   - Latch
+ *   - barrier for version upgrades
+ */
+public interface CoordinationUtils {
+
+  /**
+   * reset the internal structure. Does not happen automatically with stop()
+   */
+  void reset();
+
+
+  // facilities for group coordination
+  LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId
+
+  Latch getLatch(int size, String latchId);
+
+  BarrierForVersionUpgrade getBarrier(String barrierId);
+}
index 3da70e0..056bdb1 100644 (file)
@@ -22,6 +22,7 @@ import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.processor.SamzaContainerController;
 
+
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
@@ -31,5 +32,6 @@ public interface JobCoordinatorFactory {
    *                            pause the container and add/remove tasks
    * @return An instance of IJobCoordinator
    */
-  JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
+  JobCoordinator getJobCoordinator(int processorId, Config config,
+      SamzaContainerController containerController, CoordinationUtils coordinationUtils);
 }
\ No newline at end of file
  * under the License.
  */
 
-package org.apache.samza.coordinator.leaderelection;
+package org.apache.samza.coordinator;
 
 import org.apache.samza.annotation.InterfaceStability;
 
+
+/**
+ * Leader elector async primitives, implemented based on ZK.
+ * The callback is a async, and run in a separate (common) thread.
+ * So the caller should never block in the callback.
+ * Callbacks will be delivered on callback at a time. Others will wait.
+ *
+ */
 @InterfaceStability.Evolving
 public interface LeaderElector {
   /**
-   * Method that helps the caller participate in leader election and returns when the participation is complete
+   * Async method that helps the caller participate in leader election.
    *
-   * @return True, if caller is chosen as a leader through the leader election process. False, otherwise.
+   * @param leaderElectorListener to be invoked if the caller is chosen as a leader through the leader election process
    */
-  boolean tryBecomeLeader();
+  void tryBecomeLeader(LeaderElectorListener leaderElectorListener);
 
   /**
    * Method that allows a caller to resign from leadership role. Caller can resign from leadership due to various
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java
new file mode 100644 (file)
index 0000000..ef61229
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * This call back should be passed to {@link LeaderElector#tryBecomeLeader} and
+ * will be invoked if the caller becomes the leader.
+ */
+public interface LeaderElectorListener {
+  void onBecomingLeader();
+}
index 15d9b9d..3a62275 100644 (file)
@@ -23,6 +23,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.metrics.MetricsReporter;
@@ -124,11 +126,17 @@ public class StreamProcessor {
         String.valueOf(processorId),
         customMetricsReporters);
 
+    CoordinationUtils jobCooridanationService = Util.
+        <CoordinationServiceFactory>getObj(
+            new JobCoordinatorConfig(updatedConfig)
+                .getJobCoordinationServiceFactoryClassName())
+        .getCoordinationService("groupId", String.valueOf(processorId), updatedConfig);
+
     this.jobCoordinator = Util.
         <JobCoordinatorFactory>getObj(
             new JobCoordinatorConfig(updatedConfig)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, updatedConfig, containerController);
+        .getJobCoordinator(processorId, updatedConfig, containerController, jobCooridanationService);
   }
 
   /**
index 7ca85c0..3588dce 100644 (file)
 package org.apache.samza.standalone;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.processor.SamzaContainerController;
 
 public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+  public JobCoordinator getJobCoordinator(int processorId, Config config,
+      SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
     return new StandaloneJobCoordinator(processorId, config, containerController);
   }
 }
\ No newline at end of file
index d0332ab..0afd840 100644 (file)
@@ -26,6 +26,7 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,34 +44,25 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
   private final ZkKeyBuilder keyBuilder;
   private final static String BARRIER_DONE = "done";
   private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
-  private final static long BARRIER_TIMED_OUT_MS = 60 * 1000;
   private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
 
   private final ScheduleAfterDebounceTime debounceTimer;
 
   private final String barrierPrefix;
-  private final String barrierPath;
-  private final String barrierDonePath;
-  private final String barrierProcessors;
-  private final String version;
-  private final List<String> processorsNames;
+  private String barrierPath;
+  private String barrierDonePath;
+  private String barrierProcessors;
   private static final String VERSION_UPGRADE_TIMEOUT_TIMER = "VersionUpgradeTimeout";
+  private final long barrierTimeoutMS;
 
-  public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, String version, List<String> processorsNames) {
+  public ZkBarrierForVersionUpgrade(String barrierId, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, long barrierTimeoutMS) {
     this.zkUtils = zkUtils;
     keyBuilder = zkUtils.getKeyBuilder();
 
-    barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
-    this.debounceTimer = debounceTimer;
+    barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(barrierId);
 
-    barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
-    barrierDonePath = String.format("%s/barrier_done", barrierPath);
-    barrierProcessors = String.format("%s/barrier_processors", barrierPath);
-
-    this.version = version;
-    this.processorsNames = processorsNames;
-
-    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+    this.debounceTimer = debounceTimer;
+    this.barrierTimeoutMS = barrierTimeoutMS;
   }
 
   /**
@@ -83,7 +75,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
   }
 
   protected long getBarrierTimeOutMs() {
-    return BARRIER_TIMED_OUT_MS;
+    return barrierTimeoutMS;
   }
 
   private void timerOff(final String version, final Stat currentStatOfBarrierDone) {
@@ -91,49 +83,62 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
       // write a new value "TIMED_OUT", if the value was changed since previous value, make sure it was changed to "DONE"
       zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, currentStatOfBarrierDone.getVersion());
     } catch (ZkBadVersionException e) {
-      // failed to write, make sure the value is "DONE"
-      LOG.warn("Barrier timeout write failed");
+      // Expected. failed to write, make sure the value is "DONE"
+      ///LOG.("Barrier timeout write failed");
       String done = zkUtils.getZkClient().<String>readData(barrierDonePath);
+      LOG.info("Barrier timeout expired, but done=" + done);
       if (!done.equals(BARRIER_DONE)) {
         throw new SamzaException("Failed to write to the barrier_done, version=" + version, e);
       }
     }
   }
 
+  private void setPaths(String version) {
+    barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+
+    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+  }
+
   @Override
-  public void start() {
+  public void start(String version, List<String> participants) {
+
+    setPaths(version);
 
     // subscribe for processor's list changes
     LOG.info("Subscribing for child changes at " + barrierProcessors);
-    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(version, processorsNames));
+    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(participants));
 
     // create a timer for time-out
     Stat currentStatOfBarrierDone = new Stat();
     zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone);
+
     setTimer(version, getBarrierTimeOutMs(), currentStatOfBarrierDone);
   }
 
   @Override
-  public void waitForBarrier(String processorsName, Runnable callback) {
-    final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
+  public void waitForBarrier(String version, String participantName, Runnable callback) {
 
-    // update the barrier for this processor
-    LOG.info("Creating a child for barrier at " + barrierProcessorThis);
-    zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+    setPaths(version);
+    final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, participantName);
 
     // now subscribe for the barrier
     zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+
+    // update the barrier for this processor
+    LOG.info("Creating a child for barrier at " + barrierProcessorThis);
+    zkUtils.getZkClient().createPersistent(barrierProcessorThis);
   }
 
   /**
-   * listener for the subscription.
+   * Listener for the subscription for the list of participants.
+   * This method will identify when all the participants have joined.
    */
   class ZkBarrierChangeHandler implements IZkChildListener {
-    private final String version;
     private final List<String> names;
 
-    public ZkBarrierChangeHandler(String version, List<String> names) {
-      this.version = version;
+    public ZkBarrierChangeHandler(List<String> names) {
       this.names = names;
     }
 
@@ -151,7 +156,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
       if (CollectionUtils.containsAll(names, currentChildren)) {
         LOG.info("ALl nodes reached the barrier");
         LOG.info("Writing BARRIER DONE to " + barrierDonePath);
-        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
+        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); // this will trigger notifications
       }
     }
   }
@@ -173,16 +178,13 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
       String done = (String) data;
       LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
       if (done.equals(BARRIER_DONE)) {
-        zkUtils.unsubscribeDataChanges(barrierPathDone, this);
         debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
       } else if (done.equals(BARRIER_TIMED_OUT)) {
         // timed out
         LOG.warn("Barrier for " + dataPath + " timed out");
-        LOG.info("Barrier for " + dataPath + " timed out");
-        zkUtils.unsubscribeDataChanges(barrierPathDone, this);
       }
-      // we do not need to resubscribe because, ZkClient library does it for us.
-
+      // in any case we unsubscribe
+      zkUtils.unsubscribeDataChanges(barrierPathDone, this);
     }
 
     @Override
index 70c8a37..4570a62 100644 (file)
@@ -22,6 +22,7 @@ package org.apache.samza.zk;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.LeaderElectorListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,14 +43,7 @@ public class ZkControllerImpl implements ZkController {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
     this.zkControllerListener = zkControllerListener;
-    this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            onBecomeLeader();
-          }
-        }
-    );
+    this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer);
     this.debounceTimer = debounceTimer;
 
     init();
@@ -76,7 +70,12 @@ public class ZkControllerImpl implements ZkController {
 
     // TODO - make a loop here with some number of attempts.
     // possibly split into two method - becomeLeader() and becomeParticipant()
-    leaderElector.tryBecomeLeader();
+    leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        onBecomeLeader();
+      }
+    });
 
     // subscribe to JobModel version updates
     zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer));
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
new file mode 100644 (file)
index 0000000..cc454e3
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+
+
+public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
+
+
+  synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+    ZkConfig zkConfig = new ZkConfig(config);
+    ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+    ZkUtils zkUtils = new ZkUtils(participantId, new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer);
+  }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
new file mode 100644 (file)
index 0000000..e381a41
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.Latch;
+import org.apache.samza.coordinator.LeaderElector;
+
+
+public class ZkCoordinationUtils implements CoordinationUtils {
+  public final ZkConfig zkConfig;
+  public final ZkUtils zkUtils;
+  public final String processorIdStr;
+  public final ScheduleAfterDebounceTime debounceTimer;
+
+  public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils,
+      ScheduleAfterDebounceTime debounceTimer) {
+    this.zkConfig = zkConfig;
+    this.zkUtils = zkUtils;
+    this.processorIdStr = processorId;
+    this.debounceTimer = debounceTimer;
+  }
+
+  @Override
+  public void reset() {
+    zkUtils.deleteRoot();
+  }
+
+  @Override
+  public LeaderElector getLeaderElector() {
+    return new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer);
+  }
+
+  @Override
+  public Latch getLatch(int size, String latchId) {
+    return new ZkProcessorLatch(size, latchId, processorIdStr, zkConfig, zkUtils);
+  }
+
+  @Override
+  public BarrierForVersionUpgrade getBarrier(String barrierId) {
+    return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs());
+  }
+  @VisibleForTesting
+  public ZkUtils getZkUtils() {
+    return zkUtils;
+  }
+}
index 9e5dd84..87d6bac 100644 (file)
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
@@ -44,22 +45,24 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
+  private static final String JOB_MODEL_VERSION_BARRIER = "JobModelVersion";
 
   private final ZkUtils zkUtils;
   private final int processorId;
   private final ZkController zkController;
   private final SamzaContainerController containerController;
-  private BarrierForVersionUpgrade barrier;
   private final ScheduleAfterDebounceTime debounceTimer;
   private final StreamMetadataCache  streamMetadataCache;
   private final ZkKeyBuilder keyBuilder;
   private final Config config;
+  private final CoordinationUtils coordinationUtils;
 
   private JobModel newJobModel;
   private String newJobModelVersion;  // version published in ZK (by the leader)
   private JobModel jobModel;
 
-  public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
+  public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+      SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
     this.zkUtils = zkUtils;
     this.keyBuilder = zkUtils.getKeyBuilder();
     this.debounceTimer = debounceTimer;
@@ -67,6 +70,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     this.containerController = containerController;
     this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
     this.config = config;
+    this.coordinationUtils = coordinationUtils;
 
     streamMetadataCache = getStreamMetadataCache();
   }
@@ -101,6 +105,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   @Override
   public void stop() {
     zkController.stop();
+    if (containerController != null)
+      containerController.stopContainer();
   }
 
   @Override
@@ -152,7 +158,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
 
     // update ZK and wait for all the processors to get this new version
-    barrier.waitForBarrier(String.valueOf(zkProcessorId), new Runnable() {
+    ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
+    barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() {
       @Override
       public void run() {
         onNewJobModelConfirmed(version);
@@ -199,7 +206,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     log.info("generate new job model: processorsIds: " + sb.toString());
 
-    jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, containerIds);
+    jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+        containerIds);
 
     log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
 
@@ -208,8 +216,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
 
     // start the barrier for the job model update
-    barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer, nextJMVersion, currentProcessors);
-    barrier.start();
+    ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
+    barrier.start(nextJMVersion, currentProcessors);
 
     // publish new JobModel version
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
index e211f70..22ead65 100644 (file)
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.processor.SamzaContainerController;
@@ -36,22 +37,24 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
    * @return An instance of IJobCoordinator
    */
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
     JobConfig jobConfig = new JobConfig(config);
-    String groupName = String.format("%s-%s", jobConfig.getName(), jobConfig.getJobId());
+    String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
     ZkConfig zkConfig = new ZkConfig(config);
+    String processorIdStr = String.valueOf(processorId);
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
     ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+
     return new ZkJobCoordinator(
         processorId,
         config,
         debounceTimer,
         new ZkUtils(
-            String.valueOf(processorId),
+            processorIdStr,
             new ZkKeyBuilder(groupName),
             zkClient,
             zkConfig.getZkConnectionTimeoutMs()
             ),
-        containerController);
+        containerController, coordinationUtils);
   }
 }
index 0a8f37e..44f83e4 100644 (file)
@@ -88,7 +88,7 @@ public class ZkKeyBuilder {
     return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
   }
 
-  public String getJobModelVersionBarrierPrefix() {
-    return String.format("/%s/versionBarriers", pathPrefix);
+  public String getJobModelVersionBarrierPrefix(String barrierId) {
+    return String.format("/%s/%s/versionBarriers", pathPrefix, barrierId);
   }
 }
index b9bdf11..8a027d9 100644 (file)
 package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.leaderelection.LeaderElector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.coordinator.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -43,37 +44,32 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * </p>
  * */
 public class ZkLeaderElector implements LeaderElector {
-  public static final Logger LOGGER = LoggerFactory.getLogger(ZkLeaderElector.class);
+  public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
   private final ZkUtils zkUtils;
   private final String processorIdStr;
   private final ZkKeyBuilder keyBuilder;
   private final String hostName;
+  private final ScheduleAfterDebounceTime debounceTimer;
 
   private AtomicBoolean isLeader = new AtomicBoolean(false);
-  private final IZkDataListener previousProcessorChangeListener;
-  ZkLeaderElectorListener zkLeaderElectorListener;
+  private IZkDataListener previousProcessorChangeListener;
   private String currentSubscription = null;
   private final Random random = new Random();
 
   @VisibleForTesting
-  ZkLeaderElector(String processorIdStr,
-      ZkUtils zkUtils,
-      ZkLeaderElectorListener zkLeaderElectorListener,
-      IZkDataListener previousProcessorChangeListener) {
+  public void setPreviousProcessorChangeListener(IZkDataListener previousProcessorChangeListener) {
+    this.previousProcessorChangeListener = previousProcessorChangeListener;
+  }
+
+  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils,  ScheduleAfterDebounceTime debounceTimer) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
-    this.keyBuilder = this.zkUtils.getKeyBuilder();
+    this.keyBuilder = zkUtils.getKeyBuilder();
     this.hostName = getHostName();
-    this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader
-    if (previousProcessorChangeListener == null)
-      this.previousProcessorChangeListener =  new PreviousProcessorChangeListener();
-    else
-      this.previousProcessorChangeListener = previousProcessorChangeListener;
-  }
-
-  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) {
-    this(processorIdStr, zkUtils, zkLeaderElectorListener, null);
+    this.debounceTimer = (debounceTimer != null) ? debounceTimer : new ScheduleAfterDebounceTime();
 
+//    String [] paths = new String[]{keyBuilder.getProcessorsPath()};
+//    zkUtils.makeSurePersistentPathsExists(paths);
   }
 
   // TODO: This should go away once we integrate with Zk based Job Coordinator
@@ -81,45 +77,47 @@ public class ZkLeaderElector implements LeaderElector {
     try {
       return InetAddress.getLocalHost().getHostName();
     } catch (UnknownHostException e) {
-      LOGGER.error("Failed to fetch hostname of the processor", e);
+      LOG.error("Failed to fetch hostname of the processor", e);
       throw new SamzaException(e);
     }
   }
 
-  public interface ZkLeaderElectorListener {
-    void onBecomingLeader();
-  }
-
   @Override
-  public boolean tryBecomeLeader() {
-    String currentPath = zkUtils.registerProcessorAndGetId(hostName);
+  public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) {
+    String currentPath = zkUtils.registerProcessorAndGetId(hostName + " " + processorIdStr);
 
     List<String> children = zkUtils.getSortedActiveProcessors();
-    LOGGER.debug(zLog("Current active processors - " + children));
+    LOG.debug(zLog("Current active processors - " + children));
     int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
 
+    LOG.info("tryBecomeLeader: index = " + index + " for path=" + currentPath + " out of " + Arrays.toString(children.toArray()));
     if (children.size() == 0 || index == -1) {
       throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
     }
 
     if (index == 0) {
       isLeader.getAndSet(true);
-      LOGGER.info(zLog("Eligible to become the leader!"));
-      zkLeaderElectorListener.onBecomingLeader(); // inform the caller
-      return true;
+      LOG.info(zLog("Eligible to become the leader!"));
+      debounceTimer.scheduleAfterDebounceTime("ON_BECOMING_LEADER", 1, () -> leaderElectorListener.onBecomingLeader()); // inform the caller
+      return;
     }
 
     isLeader.getAndSet(false);
-    LOGGER.info("Index = " + index + " Not eligible to be a leader yet!");
+    LOG.info("Index = " + index + " Not eligible to be a leader yet!");
     String predecessor = children.get(index - 1);
     if (!predecessor.equals(currentSubscription)) {
       if (currentSubscription != null) {
-        LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
+
+        // callback in case if the previous node gets deleted (when previous processor dies)
+        if (previousProcessorChangeListener == null)
+          previousProcessorChangeListener =  new PreviousProcessorChangeListener(leaderElectorListener);
+
+        LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
         zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
             previousProcessorChangeListener);
       }
       currentSubscription = predecessor;
-      LOGGER.info(zLog("Subscribing data change for " + predecessor));
+      LOG.info(zLog("Subscribing data change for " + predecessor));
       zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
           previousProcessorChangeListener);
     }
@@ -130,17 +128,16 @@ public class ZkLeaderElector implements LeaderElector {
      */
     boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
     if (predecessorExists) {
-      LOGGER.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
+      LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
     } else {
       try {
         Thread.sleep(random.nextInt(1000));
       } catch (InterruptedException e) {
         Thread.interrupted();
       }
-      LOGGER.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
-      return tryBecomeLeader();
+      LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
+      tryBecomeLeader(leaderElectorListener);
     }
-    return false;
   }
 
   @Override
@@ -159,16 +156,21 @@ public class ZkLeaderElector implements LeaderElector {
 
   // Only by non-leaders
   class PreviousProcessorChangeListener implements IZkDataListener {
+    private final LeaderElectorListener leaderElectorListener;
+    PreviousProcessorChangeListener(LeaderElectorListener leaderElectorListener) {
+      this.leaderElectorListener = leaderElectorListener;
+    }
 
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {
-      LOGGER.debug("Data change on path: " + dataPath + " Data: " + data);
+      LOG.debug("Data change on path: " + dataPath + " Data: " + data);
     }
 
     @Override
     public void handleDataDeleted(String dataPath) throws Exception {
-      LOGGER.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
-      tryBecomeLeader();
+      LOG.info(
+          zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
+      tryBecomeLeader(leaderElectorListener);
     }
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
new file mode 100644 (file)
index 0000000..4394302
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.Latch;
+
+
+/*
+ * Latch of the sizeN is open when countDown() was called N times.
+ * In this implementation a sequential node is created on every call of countDown().
+ * When Nth node is created await() call returns.
+ */
+public class ZkProcessorLatch implements Latch {
+
+  private final ZkConfig zkConfig;
+  private final ZkUtils zkUtils;
+  private final String processorIdStr;
+  private final ZkKeyBuilder keyBuilder;
+  private final String latchId;
+
+  private final String latchPath;
+  private final String targetPath;
+
+  private final static String LATCH_PATH = "latch";
+  private final int size; // latch size
+
+  public ZkProcessorLatch(int size, String latchId, String participantId, ZkConfig zkConfig, ZkUtils zkUtils) {
+    this.zkConfig = zkConfig;
+    this.zkUtils = zkUtils;
+    this.processorIdStr = participantId;
+    this.latchId = latchId;
+    this.keyBuilder = this.zkUtils.getKeyBuilder();
+    this.size = size;
+
+    latchPath = String.format("%s/%s", keyBuilder.getRootPath(), LATCH_PATH + "_" + latchId);
+    zkUtils.makeSurePersistentPathsExists(new String[] {latchPath});
+    targetPath =  String.format("%s/%010d", latchPath, size - 1);
+    System.out.println("targetPath " + targetPath);
+  }
+
+  @Override
+  public void await(long timeout, TimeUnit tu) {
+    zkUtils.getZkClient().waitUntilExists(targetPath, TimeUnit.MILLISECONDS, timeout);
+  }
+
+  @Override
+  public void countDown() {
+    // create persistent (should be ephemeral? Probably not)
+    String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", processorIdStr);
+    System.out.println("countDown created " + path);
+  }
+}
index e8170e3..7a9b4d5 100644 (file)
@@ -111,8 +111,11 @@ public class ZkUtils {
       ephemeralPath =
           zkClient.createEphemeralSequential(
               keyBuilder.getProcessorsPath() + "/", data);
+
+      LOG.info("newly generated path for " + data +  " is " +  ephemeralPath);
       return ephemeralPath;
     } else {
+      LOG.info("existing path for " + data +  " is " +  ephemeralPath);
       return ephemeralPath;
     }
   }
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java
new file mode 100644 (file)
index 0000000..5ca9138
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * latch implementation for the coordination service.
+ * Supports different size latches.
+ * await() returns when either latch reaches N (N participants call countDown()) or timeout.
+ */
+public interface Latch {
+  void await(long timeout, TimeUnit tu) throws TimeoutException;
+  void countDown();
+}
index 35f9eb3..31cbe79 100644 (file)
@@ -521,6 +521,7 @@ public class TestAsyncRunLoop {
   }
 
   @Test
+  @Ignore
   public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
     commitRequest = TaskCoordinator.RequestScope.CURRENT_TASK;
     maxMessagesInFlight = 2;
index a1af782..c0c0e6a 100644 (file)
 package org.apache.samza.zk;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import junit.framework.Assert;
-import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -32,33 +39,34 @@ import org.junit.Test;
 
 public class TestZkBarrierForVersionUpgrade {
   private static EmbeddedZookeeper zkServer = null;
-  private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
-  private String testZkConnectionString = null;
-  private ZkUtils testZkUtils = null;
-  private static final int SESSION_TIMEOUT_MS = 20000;
-  private static final int CONNECTION_TIMEOUT_MS = 10000;
+  private static String testZkConnectionString = null;
+  private static CoordinationUtils coordinationUtils;
+
 
   @BeforeClass
   public static void setup() throws InterruptedException {
     zkServer = new EmbeddedZookeeper();
     zkServer.setup();
+    testZkConnectionString = "localhost:" + zkServer.getPort();
   }
 
   @Before
   public void testSetup() {
-    testZkConnectionString = "localhost:" + zkServer.getPort();
-    try {
-      testZkUtils = getZkUtilsWithNewClient();
-    } catch (Exception e) {
-      Assert.fail("Client connection setup failed. Aborting tests..");
-    }
+    String groupId = "group1";
+    String processorId = "p1";
+    Map<String, String> map = new HashMap<>();
+    map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
+    map.put(ZkConfig.ZK_BARRIER_TIMEOUT_MS, "200");
+    Config config = new MapConfig(map);
+
+    CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();
+    coordinationUtils = serviceFactory.getCoordinationService(groupId, processorId, config);
+    coordinationUtils.reset();
   }
 
   @After
-  public void testTeardown() {
-    testZkUtils.deleteRoot();
-    testZkUtils.close();
-    testZkUtils = null;
+  public void testTearDown() {
+    coordinationUtils.reset();
   }
 
   @AfterClass
@@ -68,13 +76,13 @@ public class TestZkBarrierForVersionUpgrade {
 
   @Test
   public void testZkBarrierForVersionUpgrade() {
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    String barrierId = "b1";
     String ver = "1";
     List<String> processors = new ArrayList<String>();
     processors.add("p1");
     processors.add("p2");
 
-    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors);
+    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
 
     class Status {
       boolean p1 = false;
@@ -82,16 +90,16 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.start();
+    barrier.start(ver, processors);
 
-    barrier.waitForBarrier("p1", new Runnable() {
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
       @Override
       public void run() {
         s.p1 = true;
       }
     });
 
-    barrier.waitForBarrier("p2", new Runnable() {
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
       @Override
       public void run() {
         s.p2 = true;
@@ -103,14 +111,15 @@ public class TestZkBarrierForVersionUpgrade {
 
   @Test
   public void testNegativeZkBarrierForVersionUpgrade() {
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+
+    String barrierId = "b1";
     String ver = "1";
     List<String> processors = new ArrayList<String>();
     processors.add("p1");
     processors.add("p2");
     processors.add("p3");
 
-    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors);
+    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
 
     class Status {
       boolean p1 = false;
@@ -119,16 +128,16 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.start();
+    barrier.start(ver, processors);
 
-    barrier.waitForBarrier("p1", new Runnable() {
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
       @Override
       public void run() {
         s.p1 = true;
       }
     });
 
-    barrier.waitForBarrier("p2", new Runnable() {
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
       @Override
       public void run() {
         s.p2 = true;
@@ -140,20 +149,14 @@ public class TestZkBarrierForVersionUpgrade {
 
   @Test
   public void testZkBarrierForVersionUpgradeWithTimeOut() {
-    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-
+    String barrierId = "b1";
     String ver = "1";
     List<String> processors = new ArrayList<String>();
     processors.add("p1");
     processors.add("p2");
     processors.add("p3");
 
-    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors) {
-      @Override
-      protected long getBarrierTimeOutMs() {
-        return 200;
-      }
-    };
+    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
 
     class Status {
       boolean p1 = false;
@@ -162,16 +165,16 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.start();
+    barrier.start(ver, processors);
 
-    barrier.waitForBarrier("p1", new Runnable() {
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
       @Override
       public void run() {
         s.p1 = true;
       }
     });
 
-    barrier.waitForBarrier("p2", new Runnable() {
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
       @Override
       public void run() {
         s.p2 = true;
@@ -179,7 +182,7 @@ public class TestZkBarrierForVersionUpgrade {
     });
 
     // this node will join "too late"
-    barrier.waitForBarrier("p3", new Runnable() {
+    barrier.waitForBarrier(ver, "p3", new Runnable() {
       @Override
       public void run() {
         TestZkUtils.sleepMs(300);
@@ -187,15 +190,6 @@ public class TestZkBarrierForVersionUpgrade {
       }
     });
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
-  }
-
 
-  private ZkUtils getZkUtilsWithNewClient() {
-    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
-    return new ZkUtils(
-        "1",
-        KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
-        CONNECTION_TIMEOUT_MS);
   }
 }
index b56d279..ef271f0 100644 (file)
@@ -60,6 +60,6 @@ public class TestZkKeyBuilder {
     Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
     String version = "2";
     Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
-    Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+    Assert.assertEquals("/test/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix("testBarrier"));
   }
 }
index b48bc70..fb31054 100644 (file)
@@ -19,7 +19,9 @@
 package org.apache.samza.zk;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,6 +29,12 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -34,12 +42,15 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestZkLeaderElector {
+  private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(TestZkLeaderElector.class);
 
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -47,6 +58,7 @@ public class TestZkLeaderElector {
   private ZkUtils testZkUtils = null;
   private static final int SESSION_TIMEOUT_MS = 20000;
   private static final int CONNECTION_TIMEOUT_MS = 10000;
+  private final CoordinationServiceFactory factory = new ZkCoordinationServiceFactory();
 
   @BeforeClass
   public static void setup() throws InterruptedException {
@@ -58,7 +70,7 @@ public class TestZkLeaderElector {
   public void testSetup() {
     testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
     try {
-      testZkUtils = getZkUtilsWithNewClient();
+      testZkUtils = getZkUtilsWithNewClient("testProcessorId");
     } catch (Exception e) {
       Assert.fail("Client connection setup failed. Aborting tests..");
     }
@@ -96,18 +108,22 @@ public class TestZkLeaderElector {
     when(mockZkUtils.registerProcessorAndGetId(any())).
         thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
+    Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
 
+    ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
+    when(kb.getProcessorsPath()).thenReturn("");
+    when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
+
+    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, null);
     BooleanResult isLeader = new BooleanResult();
-    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils,
-      new ZkLeaderElector.ZkLeaderElectorListener() {
-        @Override
-        public void onBecomingLeader() {
-          isLeader.res = true;
-        }
+
+    leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader.res = true;
       }
-    );
-    leaderElector.tryBecomeLeader();
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100));
+    });
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 100));
   }
 
   @Test
@@ -115,22 +131,37 @@ public class TestZkLeaderElector {
     String processorId = "1";
     ZkUtils mockZkUtils = mock(ZkUtils.class);
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
+    Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+
+    ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
+    when(kb.getProcessorsPath()).thenReturn("");
+    when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
+
+    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, null);
 
-    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-          }
-        }
-    );
     try {
-      leaderElector.tryBecomeLeader();
+      leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+        @Override
+        public void onBecomingLeader() {
+        }
+      });
       Assert.fail("Was expecting leader election to fail!");
     } catch (SamzaException e) {
       // No-op Expected
     }
   }
 
+  private CoordinationUtils getZkCoordinationService(String groupId, String processorId) {
+
+    Map<String, String> map = new HashMap<>();
+    map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
+    Config config = new MapConfig(map);
+
+    CoordinationUtils coordinationUtils = factory.getCoordinationService(groupId, processorId, config);
+    
+    return coordinationUtils;
+  }
+
   /**
    * Test starts 3 processors and verifies the state of the Zk tree after all processors participate in LeaderElection
    */
@@ -139,50 +170,49 @@ public class TestZkLeaderElector {
     BooleanResult isLeader1 = new BooleanResult();
     BooleanResult isLeader2 = new BooleanResult();
     BooleanResult isLeader3 = new BooleanResult();
+
+
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1,
-      new ZkLeaderElector.ZkLeaderElectorListener() {
-        @Override
-        public void onBecomingLeader() {
-          isLeader1.res = true;
-        }
-      }
-    );
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        }
-    );
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader3.res = true;
-          }
-        });
+    ZkUtils zkUtils3 = getZkUtilsWithNewClient("3");
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
 
     Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
 
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
-    leaderElector3.tryBecomeLeader();
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
+    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader3.res = true;
+      }
+    });
 
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
 
     Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
 
+
     // Clean up
     zkUtils1.close();
     zkUtils2.close();
@@ -211,104 +241,102 @@ public class TestZkLeaderElector {
 
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
     zkUtils1.registerProcessorAndGetId("processor1");
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        zkUtils1,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader1.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws Exception {
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
 
+    leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
     final String path2 = zkUtils2.registerProcessorAndGetId("processor2");
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        zkUtils2,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
-            Assert.assertNotNull(registeredIdStr);
-
-            String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
-            Assert.assertNotNull(predecessorIdStr);
-
-            try {
-              int selfId = Integer.parseInt(registeredIdStr);
-              int predecessorId = Integer.parseInt(predecessorIdStr);
-              Assert.assertEquals(1, selfId - predecessorId);
-            } catch (Exception e) {
-              System.out.println(e.getMessage());
-            }
-            count.incrementAndGet();
-            electionLatch.countDown();
-          }
-        });
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
+
+    leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
+        Assert.assertNotNull(registeredIdStr);
+
+        String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+        Assert.assertNotNull(predecessorIdStr);
+
+        try {
+          int selfId = Integer.parseInt(registeredIdStr);
+          int predecessorId = Integer.parseInt(predecessorIdStr);
+          Assert.assertEquals(1, selfId - predecessorId);
+        } catch (Exception e) {
+          LOG.error(e.getLocalizedMessage());
+        }
+        count.incrementAndGet();
+        electionLatch.countDown();
+      }
+    });
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
     zkUtils3.registerProcessorAndGetId("processor3");
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
-        "3",
-        zkUtils3,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader3.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
+
+    leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
 
     // Join Leader Election
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
-    leaderElector3.tryBecomeLeader();
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
+    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader3.res = true;
+      }
+    });
+
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
 
     Assert.assertTrue(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
     Assert.assertFalse(leaderElector3.amILeader());
 
-    List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
+    List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
     Assert.assertEquals(3, currentActiveProcessors.size());
 
     // Leader Failure
@@ -322,11 +350,12 @@ public class TestZkLeaderElector {
     }
 
     Assert.assertEquals(1, count.get());
-    Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors());
+    Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessors());
 
     // Clean up
     zkUtils2.close();
     zkUtils3.close();
+
   }
 
   /**
@@ -347,100 +376,101 @@ public class TestZkLeaderElector {
     BooleanResult isLeader3 = new BooleanResult();
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
     zkUtils1.registerProcessorAndGetId("processor1");
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        zkUtils1,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader1.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
+
+    leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
+
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
     zkUtils2.registerProcessorAndGetId("processor2");
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        zkUtils2,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
+
+    leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
     final String path3 = zkUtils3.registerProcessorAndGetId("processor3");
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
-        "3",
-        zkUtils3,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader3.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
-            Assert.assertNotNull(registeredIdStr);
-
-            String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
-            Assert.assertNotNull(predecessorIdStr);
-
-            try {
-              int selfId = Integer.parseInt(registeredIdStr);
-              int predecessorId = Integer.parseInt(predecessorIdStr);
-              Assert.assertEquals(1, selfId - predecessorId);
-            } catch (Exception e) {
-              Assert.fail("Exception in LeaderElectionListener!");
-            }
-            count.incrementAndGet();
-            electionLatch.countDown();
-          }
-        });
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
+
+    leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
+        Assert.assertNotNull(registeredIdStr);
+
+        String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+        Assert.assertNotNull(predecessorIdStr);
+
+        try {
+          int selfId = Integer.parseInt(registeredIdStr);
+          int predecessorId = Integer.parseInt(predecessorIdStr);
+          Assert.assertEquals(1, selfId - predecessorId);
+        } catch (Exception e) {
+          Assert.fail("Exception in LeaderElectionListener!");
+        }
+        count.incrementAndGet();
+        electionLatch.countDown();
+      }
+    });
 
     // Join Leader Election
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
-    leaderElector3.tryBecomeLeader();
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
-
-    List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
+    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader3.res = true;
+      }
+    });
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
+
+    List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
     Assert.assertEquals(3, currentActiveProcessors.size());
 
     zkUtils2.close();
@@ -453,7 +483,7 @@ public class TestZkLeaderElector {
     }
 
     Assert.assertEquals(1, count.get());
-    Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors());
+    Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessors());
 
     // Clean up
     zkUtils1.close();
@@ -465,43 +495,43 @@ public class TestZkLeaderElector {
     BooleanResult isLeader1 = new BooleanResult();
     BooleanResult isLeader2 = new BooleanResult();
     // Processor-1
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        getZkUtilsWithNewClient(),
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader1.res = true;
-          }
-        });
+
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
 
     // Processor-2
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        getZkUtilsWithNewClient(),
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        });
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
 
     // Before Leader Election
     Assert.assertFalse(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
 
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
 
     // After Leader Election
     Assert.assertTrue(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
+
+    zkUtils1.close();
+    zkUtils2.close();
   }
 
-  private ZkUtils getZkUtilsWithNewClient() {
+  private ZkUtils getZkUtilsWithNewClient(String processorId) {
     ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
     return new ZkUtils(
-        "processorId1",
+        processorId,
         KEY_BUILDER,
         ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
         CONNECTION_TIMEOUT_MS);
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
new file mode 100644 (file)
index 0000000..ec7e830
--- /dev/null
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.Latch;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestZkProcessorLatch {
+  private static EmbeddedZookeeper zkServer = null;
+  private static String zkConnectionString;
+  private final CoordinationServiceFactory factory = new ZkCoordinationServiceFactory();
+  private CoordinationUtils coordinationUtils;
+
+  @BeforeClass
+  public static void setup() throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+
+    zkConnectionString = "localhost:" + zkServer.getPort();
+    System.out.println("ZK port = " + zkServer.getPort());
+  }
+
+  @Before
+  public void testSetup() {
+    String groupId = "group1";
+    String processorId = "p1";
+    Map<String, String> map = new HashMap<>();
+    map.put(ZkConfig.ZK_CONNECT, zkConnectionString);
+    Config config = new MapConfig(map);
+
+
+    coordinationUtils = factory.getCoordinationService(groupId, processorId, config);
+    coordinationUtils.reset();
+  }
+
+  @After
+  public void testTearDown() {
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testSingleLatch1() {
+    System.out.println("Started 1");
+    int latchSize = 1;
+    String latchId = "l2";
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(30000, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("failed to get future." + e.getLocalizedMessage());
+    }
+    pool.shutdownNow();
+  }
+
+  @Test
+  public void testSingleLatch2() {
+    System.out.println("Started 1");
+    int latchSize = 1;
+    String latchId = "l2";
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        //latch.countDown(); only one thread counts down
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(30000, TimeUnit.MILLISECONDS);
+      f2.get(30000, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("failed to get future." + e.getLocalizedMessage());
+    }
+    pool.shutdownNow();
+  }
+
+  @Test
+  public void testNSizeLatch() {
+    System.out.println("Started N");
+    String latchId = "l1";
+    int latchSize = 3;
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out " + e.getLocalizedMessage());
+        }
+      });
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f3 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(300, TimeUnit.MILLISECONDS);
+      f2.get(300, TimeUnit.MILLISECONDS);
+      f3.get(300, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("failed to get future. " + e.getLocalizedMessage());
+    }
+  }
+
+  @Test
+  public void testLatchExpires() {
+    System.out.println("Started expiring");
+    String latchId = "l4";
+
+    int latchSize = 3;
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f3 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        // This processor never completes its task
+        //latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(300, TimeUnit.MILLISECONDS);
+      f2.get(300, TimeUnit.MILLISECONDS);
+      f3.get(300, TimeUnit.MILLISECONDS);
+      Assert.fail("Latch should've timeout.");
+    } catch (Exception e) {
+      f1.cancel(true);
+      f2.cancel(true);
+      f3.cancel(true);
+      // expected
+    }
+    pool.shutdownNow();
+  }
+
+  @Test
+  public void testSingleCountdown() {
+    System.out.println("Started single countdown");
+    String latchId = "l1";
+    int latchSize = 3;
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    // Only one thread invokes countDown
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        TestZkUtils.sleepMs(100);
+        latch.countDown();
+        TestZkUtils.sleepMs(100);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f3 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    try {
+      f1.get(600, TimeUnit.MILLISECONDS);
+      f2.get(600, TimeUnit.MILLISECONDS);
+      f3.get(600, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("Failed to get.");
+    }
+    pool.shutdownNow();
+  }
+}
index a1ad363..2c44aea 100644 (file)
 
 package org.apache.samza.test.processor;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,6 +38,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.task.AsyncStreamTaskAdapter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
@@ -37,17 +48,6 @@ import org.apache.samza.test.StandaloneTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.samza.test.processor.IdentityStreamTask.endLatch;
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
@@ -144,21 +144,14 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
   private Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic, int messageCount) {
     Map<String, String> configs = new HashMap<>();
     configs.putAll(
-        StandaloneTestUtils.getStandaloneConfigs(
-            "test-job",
-            "org.apache.samza.test.processor.IdentityStreamTask"));
-    configs.putAll(
-        StandaloneTestUtils.getKafkaSystemConfigs(
-            testSystem,
-            bootstrapServers(),
-            zkConnect(),
-            null,
-            StandaloneTestUtils.SerdeAlias.STRING,
-            true));
+        StandaloneTestUtils.getStandaloneConfigs("test-job", "org.apache.samza.test.processor.IdentityStreamTask"));
+    configs.putAll(StandaloneTestUtils.getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null,
+            StandaloneTestUtils.SerdeAlias.STRING, true));
     configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
     configs.put("app.messageCount", String.valueOf(messageCount));
     configs.put("app.outputTopic", outputTopic);
     configs.put("app.outputSystem", testSystem);
+    configs.put(ZkConfig.ZK_CONNECT, zkConnect());
     return configs;
   }
 
index 5de30d8..417ada4 100644 (file)
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 include \
   'samza-api',
   'samza-elasticsearch',