SAMZA-1012 Generated changelog mappings are not consistent
authorTommy Becker <tobecker@tivo.com>
Tue, 4 Oct 2016 15:54:10 +0000 (08:54 -0700)
committerJacob Maes <jmaes@linkedin.com>
Tue, 4 Oct 2016 15:54:10 +0000 (08:54 -0700)
samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala

index ba38b5c..df63b97 100644 (file)
@@ -253,24 +253,23 @@ object JobModelManager extends Logging {
     // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
     // mapping.
     var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+    // Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive
+    val sortedGroups = new util.TreeMap[TaskName, util.Set[SystemStreamPartition]](groups)
 
     // Assign all SystemStreamPartitions to TaskNames.
-    val taskModels =
-    {
-      groups.map
-              { case (taskName, systemStreamPartitions) =>
-                val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
-                {
-                  case Some(changelogPartitionId) => new Partition(changelogPartitionId)
-                  case _ =>
-                    // If we've never seen this TaskName before, then assign it a
-                    // new changelog.
-                    maxChangelogPartitionId += 1
-                    info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
-                    new Partition(maxChangelogPartitionId)
-                }
-                new TaskModel(taskName, systemStreamPartitions, changelogPartition)
-              }.toSet
+    val taskModels = {
+      sortedGroups.map { case (taskName, systemStreamPartitions) =>
+        val changelogPartition = Option(previousChangelogMapping.get(taskName)) match {
+          case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+          case _ =>
+            // If we've never seen this TaskName before, then assign it a
+            // new changelog.
+            maxChangelogPartitionId += 1
+            info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+            new Partition(maxChangelogPartitionId)
+        }
+        new TaskModel(taskName, systemStreamPartitions, changelogPartition)
+      }.toSet
     }
 
     // Here is where we should put in a pluggable option for the
@@ -283,8 +282,7 @@ object JobModelManager extends Logging {
       else
         containerGrouper.group(taskModels)
     }
-    val containerMap = asScalaSet(containerModels).map
-            { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
+    val containerMap = asScalaSet(containerModels).map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
     new JobModel(config, containerMap, localityManager)
   }