Fix loadDefaults error msg.
authorBoris S <boryas@apache.org>
Fri, 8 Jun 2018 21:25:27 +0000 (14:25 -0700)
committerBoris S <boryas@apache.org>
Fri, 8 Jun 2018 21:25:27 +0000 (14:25 -0700)
SAMZA-1744

Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes #551 from sborya/loadDefaultsErrorMsg and squashes the following commits:

c3003ad2 [Boris S] Fixed error message
0edf343b [Boris S] Merge branch 'master' of https://github.com/apache/samza
67e611ee [Boris S] Merge branch 'master' of https://github.com/apache/samza
dd39d089 [Boris S] Merge branch 'master' of https://github.com/apache/samza
1ad58d43 [Boris S] Merge branch 'master' of https://github.com/apache/samza
06b1ac36 [Boris Shkolnik] Merge branch 'master' of https://github.com/sborya/samza
5e6f5fb5 [Boris Shkolnik] Merge branch 'master' of https://github.com/apache/samza
010fa168 [Boris S] Merge branch 'master' of https://github.com/apache/samza
bbffb79b [Boris S] Merge branch 'master' of https://github.com/apache/samza
d4620d66 [Boris S] Merge branch 'master' of https://github.com/apache/samza
410ce78b [Boris S] Merge branch 'master' of https://github.com/apache/samza
a31a7aa2 [Boris Shkolnik] reduce debugging from info to debug in KafkaCheckpointManager.java

samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala

index ba2dfd9..53d5e98 100644 (file)
@@ -412,8 +412,8 @@ class OffsetManager(
     val taskNameToSSPs: Map[TaskName, Set[SystemStreamPartition]] = systemStreamPartitions
 
     taskNameToSSPs.foreach {
-      case (taskName, systemStreamPartitions) => {
-        systemStreamPartitions.foreach { systemStreamPartition =>
+      case (taskName, systemStreamPartitionsSet) => {
+        systemStreamPartitionsSet.foreach { systemStreamPartition =>
           if (!startingOffsets.contains(taskName) || !startingOffsets(taskName).contains(systemStreamPartition)) {
             val systemStream = systemStreamPartition.getSystemStream
             val partition = systemStreamPartition.getPartition
@@ -445,7 +445,7 @@ class OffsetManager(
               }
 
             } else {
-              throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
+              throw new SamzaException("No metadata available for partition %s." format systemStreamPartition)
             }
           }
         }