SAMZA-1309; Debounce time config
authorBoris Shkolnik <boryas@apache.org>
Fri, 26 May 2017 23:31:45 +0000 (16:31 -0700)
committernavina <navina@apache.org>
Fri, 26 May 2017 23:31:45 +0000 (16:31 -0700)
Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #203 from sborya/DebounceConfig

docs/learn/documentation/versioned/jobs/configuration-table.html
samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala

index 3eea37d..7a1b56e 100644 (file)
                     </td>
                 </tr>
                 <tr>
+                    <td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
+                    <td class="default"> 2000 </td>
+                    <td class="description">
+                        How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
+                    </td>
+                </tr>
+                <tr>
                     <th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th>
                 </tr>
 
+
                 <tr>
                     <td class="property" id="task-class">task.class</td>
                     <td class="default"></td>
index 5cfd37a..9b8ea66 100644 (file)
@@ -48,8 +48,6 @@ public class ScheduleAfterDebounceTime {
   // Action name when the Processor membership changes
   public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
 
-  public static final int DEBOUNCE_TIME_MS = 2000;
-
   private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
 
   private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
index 64395ac..5e46ce5 100644 (file)
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -34,10 +38,6 @@ import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -47,6 +47,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
   private static final int METADATA_CACHE_TTL_MS = 5000;
 
+
   private final ZkUtils zkUtils;
   private final String processorId;
   private final ZkController zkController;
@@ -59,6 +60,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
 
+  private int debounceTimeMs;
+
   public ZkJobCoordinator(Config config) {
     this.config = config;
     ZkConfig zkConfig = new ZkConfig(config);
@@ -79,11 +82,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         keyBuilder.getJobModelVersionBarrierPrefix(),
         zkUtils,
         new ZkBarrierListenerImpl());
+    this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
+
   }
 
   @Override
   public void start() {
     streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+
     debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
         LOG.error("Received exception from in JobCoordinator Processing!", throwable);
         stop();
@@ -126,7 +132,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   public void onProcessorChange(List<String> processors) {
     LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
     debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
+        debounceTimeMs, () -> doOnProcessorChange(processors));
   }
 
   void doOnProcessorChange(List<String> processors) {
@@ -232,8 +238,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       zkController.subscribeToProcessorChange();
       debounceTimer.scheduleAfterDebounceTime(
         ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> {
-          // actual actions to do are the same as onProcessorChange()
+        debounceTimeMs, () -> {
+          // actual actions to do are the same as onProcessorChange
           doOnProcessorChange(new ArrayList<>());
         });
     }
index 030d945..2545194 100644 (file)
@@ -47,6 +47,8 @@ object JobConfig {
   val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
   val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions"
+  val JOB_DEBOUNCE_TIME_MS = "job.debounce.time.ms"
+  val DEFAULT_DEBOUNCE_TIME_MS = 2000
 
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
 
@@ -172,4 +174,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(mode) => mode.toBoolean
     case _ => false
   }
+
+  def getDebounceTimeMs = getInt(JobConfig.JOB_DEBOUNCE_TIME_MS, JobConfig.DEFAULT_DEBOUNCE_TIME_MS)
 }