Address review comments.
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Mon, 4 Feb 2019 02:18:05 +0000 (18:18 -0800)
committerShanthoosh Venkataraman <spvenkat@usc.edu>
Tue, 12 Feb 2019 16:20:24 +0000 (08:20 -0800)
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskPartitionMapping.java
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index c38bcd4..5cbe893 100644 (file)
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Used to persisting and reading the task-to-partition assignment information
+ * Used to persist and read the task-to-partition assignment information
  * into the metadata store.
  */
 public class TaskPartitionAssignmentManager {
@@ -76,6 +76,9 @@ public class TaskPartitionAssignmentManager {
    * @param taskNames the task names to which the partition is assigned to.
    */
   public void writeTaskPartitionAssignment(SystemStreamPartition partition, List<String> taskNames) {
+    // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
+    // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
+    // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
     String serializedKey = getKey(partition);
     if (taskNames == null || taskNames.isEmpty()) {
       LOG.info("Deleting the key: {} from the metadata store.", partition);
index 07e0985..b0d5815 100644 (file)
@@ -68,7 +68,7 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
       return String.valueOf(setTaskModeMapping.getTaskMode());
     } else if (type.equalsIgnoreCase(SetTaskPartitionMapping.TYPE)) {
       SetTaskPartitionMapping setTaskPartitionMapping = new SetTaskPartitionMapping(message);
-      return setTaskPartitionMapping.getTaskName();
+      return setTaskPartitionMapping.getTaskNames();
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }
index 3b88293..51091bc 100644 (file)
@@ -41,7 +41,7 @@ package org.apache.samza.coordinator.stream.messages;
  */
 public class SetTaskPartitionMapping extends CoordinatorStreamMessage {
 
-  private static final String TASK_NAME_KEY = "taskNames";
+  private static final String TASK_NAMES_KEY = "taskNames";
 
   public static final String TYPE = "set-task-partition-assignment";
 
@@ -59,16 +59,16 @@ public class SetTaskPartitionMapping extends CoordinatorStreamMessage {
    * of a samza job to the coordinator stream.
    * @param source      the source of the message.
    * @param partition   the system stream partition serialized as a string.
-   * @param taskName    the name of the task mapped to the system stream partition.
+   * @param taskNames    the taskNames mapped to the system stream partition.
    */
-  public SetTaskPartitionMapping(String source, String partition, String taskName) {
+  public SetTaskPartitionMapping(String source, String partition, String taskNames) {
     super(source);
     setType(TYPE);
     setKey(partition);
-    putMessageValue(TASK_NAME_KEY, taskName);
+    putMessageValue(TASK_NAMES_KEY, taskNames);
   }
 
-  public String getTaskName() {
-    return getMessageValue(TASK_NAME_KEY);
+  public String getTaskNames() {
+    return getMessageValue(TASK_NAMES_KEY);
   }
 }
index c4a5db7..5770597 100644 (file)
@@ -218,6 +218,8 @@ object JobModelManager extends Logging {
       taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => x._1.getTaskName).asJava)
     }
 
+    // SystemStreamPartition to list of taskNames is stored in the metadata store due to the 1 MB value size limit in kafka.
+    // Conversion to taskName to SystemStreamPartitions is done to wire-in the data to JobModel.
     val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()
 
     for (container <- jobModel.getContainers.values()) {
index c773c5b..1f5b24e 100644 (file)
@@ -798,17 +798,18 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Wait for the JobModel version to change due to the increase in the input partition count.
     long jobModelWaitTimeInMillis = 10;
-    while (Objects.equals(zkUtils.getJobModelVersion(), jobModelVersion)) {
+    while (true) {
       LOGGER.info("Waiting for new jobModel to be published");
+      jobModelVersion = zkUtils.getJobModelVersion();
+      jobModel = zkUtils.getJobModel(jobModelVersion);
+      ssps = getSystemStreamPartitions(jobModel);
+
+      if (ssps.size() == 64) {
+        break;
+      }
       Thread.sleep(jobModelWaitTimeInMillis);
-      jobModelWaitTimeInMillis = jobModelWaitTimeInMillis * 2;
     }
 
-    // Read the latest JobModel for validation.
-    jobModelVersion = zkUtils.getJobModelVersion();
-    jobModel = zkUtils.getJobModel(jobModelVersion);
-    ssps = getSystemStreamPartitions(jobModel);
-
     // Validate that the input partition count is 64 in the new JobModel.
     Assert.assertEquals(64, ssps.size());
 
@@ -823,6 +824,15 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Validate that the new JobModel has the expected task assignments.
     actualTaskAssignments = getTaskAssignments(jobModel);
+
+    for (Map.Entry<TaskName, Set<SystemStreamPartition>> entry : expectedTaskAssignments.entrySet()) {
+      TaskName taskName = entry.getKey();
+      Set<SystemStreamPartition> expectedSSPs = entry.getValue();
+      Assert.assertTrue(actualTaskAssignments.containsKey(taskName));
+      Set<SystemStreamPartition> actualSSPs = actualTaskAssignments.get(taskName);
+      Assert.assertEquals(actualSSPs, expectedSSPs);
+    }
+
     Assert.assertEquals(expectedTaskAssignments, actualTaskAssignments);
   }
 
@@ -895,17 +905,19 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Wait for the JobModel version to change due to the increase in the input partition count.
     long jobModelWaitTimeInMillis = 10;
-    while (Objects.equals(zkUtils.getJobModelVersion(), jobModelVersion)) {
+    while (true) {
       LOGGER.info("Waiting for new jobModel to be published");
+      jobModelVersion = zkUtils.getJobModelVersion();
+      jobModel = zkUtils.getJobModel(jobModelVersion);
+      ssps = getSystemStreamPartitions(jobModel);
+
+      if (ssps.size() == 128) {
+        break;
+      }
+
       Thread.sleep(jobModelWaitTimeInMillis);
-      jobModelWaitTimeInMillis = jobModelWaitTimeInMillis * 2;
     }
 
-    // Read the latest JobModel for validation.
-    jobModelVersion = zkUtils.getJobModelVersion();
-    jobModel = zkUtils.getJobModel(jobModelVersion);
-    ssps = getSystemStreamPartitions(jobModel);
-
     // Validate that the input partition count is 128 in the new JobModel.
     Assert.assertEquals(128, ssps.size());
 
@@ -945,6 +957,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   }
 
   private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
+    System.out.println(jobModel);
     Set<SystemStreamPartition> ssps = new HashSet<>();
     jobModel.getContainers().forEach((containerName, containerModel) -> {
         containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));