Code cleanup.
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Thu, 7 Feb 2019 16:47:46 +0000 (08:47 -0800)
committerShanthoosh Venkataraman <spvenkat@usc.edu>
Tue, 12 Feb 2019 16:20:24 +0000 (08:20 -0800)
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala

index b6949b9..c40d591 100644 (file)
@@ -47,7 +47,7 @@ public class TaskPartitionAssignmentManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(TaskPartitionAssignmentManager.class);
 
-  private final ObjectMapper taskNameMapper = SamzaObjectMapper.getObjectMapper();
+  private final ObjectMapper taskNamesMapper = SamzaObjectMapper.getObjectMapper();
   private final ObjectMapper sspMapper = SamzaObjectMapper.getObjectMapper();
 
   private final Serde<String> valueSerde;
@@ -85,10 +85,10 @@ public class TaskPartitionAssignmentManager {
       metadataStore.delete(serializedSSPAsJson);
     } else {
       try {
-        String taskNameAsString = taskNameMapper.writeValueAsString(taskNames);
-        byte[] taskNameAsBytes = valueSerde.toBytes(taskNameAsString);
+        String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
+        byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
         LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
-        metadataStore.put(serializedSSPAsJson, taskNameAsBytes);
+        metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
       } catch (Exception e) {
         throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
       }
@@ -104,9 +104,9 @@ public class TaskPartitionAssignmentManager {
       Map<SystemStreamPartition, List<String>> sspToTaskNamesMap = new HashMap<>();
       Map<String, byte[]> allMetadataEntries = metadataStore.all();
       for (Map.Entry<String, byte[]> entry : allMetadataEntries.entrySet()) {
-        SystemStreamPartition systemStreamPartition = deserializeSSPInJsonFormat(entry.getKey());
-        String taskNameAsJson = valueSerde.fromBytes(entry.getValue());
-        List<String> taskNames = taskNameMapper.readValue(taskNameAsJson, new TypeReference<List<String>>() { });
+        SystemStreamPartition systemStreamPartition = deserializeSSPFromJson(entry.getKey());
+        String taskNamesAsJson = valueSerde.fromBytes(entry.getValue());
+        List<String> taskNames = taskNamesMapper.readValue(taskNamesAsJson, new TypeReference<List<String>>() { });
         sspToTaskNamesMap.put(systemStreamPartition, taskNames);
       }
       return sspToTaskNamesMap;
@@ -135,9 +135,9 @@ public class TaskPartitionAssignmentManager {
   }
 
   /**
-   * Serializes the {@param SystemStreamPartition} to json string using Jackson.
+   * Serializes the {@param SystemStreamPartition} to json string.
    * @param systemStreamPartition represents the input system stream partition.
-   * @return the SystemStreamPartition serialized to a json string.
+   * @return the SystemStreamPartition serialized to json.
    */
   private String serializeSSPToJson(SystemStreamPartition systemStreamPartition) {
     try {
@@ -152,7 +152,7 @@ public class TaskPartitionAssignmentManager {
    * @param sspAsJson the serialized SystemStreamPartition in json format.
    * @return the deserialized SystemStreamPartition.
    */
-  private SystemStreamPartition deserializeSSPInJsonFormat(String sspAsJson) {
+  private SystemStreamPartition deserializeSSPFromJson(String sspAsJson) {
     try {
       return sspMapper.readValue(sspAsJson, SystemStreamPartition.class);
     } catch (IOException e) {
index d66b40e..3243947 100644 (file)
@@ -130,6 +130,9 @@ object JobModelManager extends Logging {
     val sspToTaskMapping: util.Map[SystemStreamPartition, util.List[String]] = taskPartitionAssignmentManager.readTaskPartitionAssignments()
     val taskPartitionAssignments: util.Map[TaskName, util.List[SystemStreamPartition]] = new util.HashMap[TaskName, util.List[SystemStreamPartition]]()
 
+    // Task to partition assignments is stored as {@see SystemStreamPartition} to list of {@see TaskName} in
+    // coordinator stream. This is done due to the 1 MB value size limit in a kafka topic. Conversion to
+    // taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
     sspToTaskMapping foreach { case (systemStreamPartition: SystemStreamPartition, taskNames: util.List[String]) =>
       for (task <- taskNames) {
         val taskName: TaskName = new TaskName(task)
@@ -218,8 +221,9 @@ object JobModelManager extends Logging {
       taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => x._1.getTaskName).asJava)
     }
 
-    // SystemStreamPartition to list of taskNames is stored in the metadata store due to the 1 MB value size limit in kafka.
-    // Conversion to taskName to SystemStreamPartitions is done to wire-in the data to JobModel.
+    // Task to partition assignments is stored as {@see SystemStreamPartition} to list of {@see TaskName} in
+    // coordinator stream. This is done due to the 1 MB value size limit in a kafka topic. Conversion to
+    // taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
     val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()
 
     for (container <- jobModel.getContainers.values()) {