SAMZA-1507; Create changelog streams in Leader(ZkJobCoordinator) for stateful operators. master
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Wed, 22 Nov 2017 19:54:10 +0000 (11:54 -0800)
committerBoris S <boryas@apache.org>
Wed, 22 Nov 2017 19:54:10 +0000 (11:54 -0800)
Verified with a test standalone job. Will add integration test for this as a part of fixing and reenabling standalone integration tests.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Boris Shkolnik <boryas@apache.org>

Closes #362 from shanthoosh/master

samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala

index 9d44ec1..0509474 100644 (file)
@@ -20,12 +20,13 @@ package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.I0Itec.zkclient.IZkStateListener;
+import java.util.Objects;
 import java.util.Set;
+import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -33,11 +34,14 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsReporter;
@@ -88,6 +92,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
   private int debounceTimeMs;
+  private boolean hasCreatedChangeLogStreams = false;
+  private String cachedJobModelVersion = null;
+  private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
 
   ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
@@ -188,6 +195,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
+    if (!hasCreatedChangeLogStreams) {
+      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions);
+      hasCreatedChangeLogStreams = true;
+    }
     // Assign the next version of JobModel
     String currentJMVersion = zkUtils.getJobModelVersion();
     String nextJMVersion;
@@ -279,7 +290,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
   private JobModel generateNewJobModel(List<String> processors) {
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, processors);
+    String zkJobModelVersion = zkUtils.getJobModelVersion();
+    // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
+    if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
+      JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
+      for (ContainerModel containerModel : jobModel.getContainers().values()) {
+        containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
+      }
+      cachedJobModelVersion = zkJobModelVersion;
+    }
+    /**
+     * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
+     * to host mapping) is passed in as null when building the jobModel.
+     */
+    return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {
index 6ca9052..2f60d52 100644 (file)
@@ -407,7 +407,7 @@ public class ZkUtils {
    * @return jobmodel version as a string
    */
   public String getJobModelVersion() {
-    String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath());
+    String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), true);
     metrics.reads.inc();
     return jobModelVersion;
   }
index e915a8a..c2e0665 100644 (file)
@@ -276,7 +276,7 @@ object JobModelManager extends Logging {
     systemAdmins
   }
 
-  private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
+  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)