Address review comments.
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Wed, 6 Feb 2019 22:38:41 +0000 (14:38 -0800)
committerShanthoosh Venkataraman <spvenkat@usc.edu>
Tue, 12 Feb 2019 16:20:24 +0000 (08:20 -0800)
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala

index 5cbe893..b6949b9 100644 (file)
@@ -79,16 +79,16 @@ public class TaskPartitionAssignmentManager {
     // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
     // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
     // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
-    String serializedKey = getKey(partition);
+    String serializedSSPAsJson = serializeSSPToJson(partition);
     if (taskNames == null || taskNames.isEmpty()) {
       LOG.info("Deleting the key: {} from the metadata store.", partition);
-      metadataStore.delete(serializedKey);
+      metadataStore.delete(serializedSSPAsJson);
     } else {
       try {
         String taskNameAsString = taskNameMapper.writeValueAsString(taskNames);
         byte[] taskNameAsBytes = valueSerde.toBytes(taskNameAsString);
-        LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedKey, taskNames);
-        metadataStore.put(serializedKey, taskNameAsBytes);
+        LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
+        metadataStore.put(serializedSSPAsJson, taskNameAsBytes);
       } catch (Exception e) {
         throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
       }
@@ -104,8 +104,7 @@ public class TaskPartitionAssignmentManager {
       Map<SystemStreamPartition, List<String>> sspToTaskNamesMap = new HashMap<>();
       Map<String, byte[]> allMetadataEntries = metadataStore.all();
       for (Map.Entry<String, byte[]> entry : allMetadataEntries.entrySet()) {
-        LOG.info("Trying to deserialize the system stream partition: {}", entry.getKey());
-        SystemStreamPartition systemStreamPartition = getSystemStreamPartition(entry.getKey());
+        SystemStreamPartition systemStreamPartition = deserializeSSPInJsonFormat(entry.getKey());
         String taskNameAsJson = valueSerde.fromBytes(entry.getValue());
         List<String> taskNames = taskNameMapper.readValue(taskNameAsJson, new TypeReference<List<String>>() { });
         sspToTaskNamesMap.put(systemStreamPartition, taskNames);
@@ -123,8 +122,8 @@ public class TaskPartitionAssignmentManager {
   public void delete(Iterable<SystemStreamPartition> systemStreamPartitions) {
     for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
       LOG.info("Deleting the partition: {} from store.", systemStreamPartition);
-      String sspKey = getKey(systemStreamPartition);
-      metadataStore.delete(sspKey);
+      String serializedSSPAsJson = serializeSSPToJson(systemStreamPartition);
+      metadataStore.delete(serializedSSPAsJson);
     }
   }
 
@@ -135,7 +134,12 @@ public class TaskPartitionAssignmentManager {
     metadataStore.close();
   }
 
-  private String getKey(SystemStreamPartition systemStreamPartition) {
+  /**
+   * Serializes the {@param SystemStreamPartition} to json string using Jackson.
+   * @param systemStreamPartition represents the input system stream partition.
+   * @return the SystemStreamPartition serialized to a json string.
+   */
+  private String serializeSSPToJson(SystemStreamPartition systemStreamPartition) {
     try {
       return sspMapper.writeValueAsString(systemStreamPartition);
     } catch (IOException e) {
@@ -143,11 +147,16 @@ public class TaskPartitionAssignmentManager {
     }
   }
 
-  private SystemStreamPartition getSystemStreamPartition(String partitionAsString) {
+  /**
+   * Deserializes the {@param sspAsJson} in json format to {@link SystemStreamPartition}.
+   * @param sspAsJson the serialized SystemStreamPartition in json format.
+   * @return the deserialized SystemStreamPartition.
+   */
+  private SystemStreamPartition deserializeSSPInJsonFormat(String sspAsJson) {
     try {
-      return sspMapper.readValue(partitionAsString, SystemStreamPartition.class);
+      return sspMapper.readValue(sspAsJson, SystemStreamPartition.class);
     } catch (IOException e) {
-      throw new SamzaException(String.format("Exception occurred when deserializing the partition: %s", partitionAsString), e);
+      throw new SamzaException(String.format("Exception occurred when deserializing the partition: %s", sspAsJson), e);
     }
   }
 }
index 5770597..d66b40e 100644 (file)
@@ -327,12 +327,11 @@ object JobModelManager extends Logging {
 
     val isHostAffinityEnabled = new ClusterManagerConfig(config).getHostAffinityEnabled
 
-    var groups: util.Map[TaskName, util.Set[SystemStreamPartition]] = null
-    if (isHostAffinityEnabled) {
+    val groups: util.Map[TaskName, util.Set[SystemStreamPartition]] = if (isHostAffinityEnabled) {
       val sspGrouperProxy: SSPGrouperProxy =  new SSPGrouperProxy(config, grouper)
-      groups = sspGrouperProxy.group(allSystemStreamPartitions, grouperMetadata)
+      sspGrouperProxy.group(allSystemStreamPartitions, grouperMetadata)
     } else {
-      groups = grouper.group(allSystemStreamPartitions)
+      grouper.group(allSystemStreamPartitions)
     }
     info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups))