Fix SAMZA-1018.
authorTommy Becker <tobecker@tivo.com>
Mon, 19 Sep 2016 12:21:12 +0000 (08:21 -0400)
committerJacob Maes <jmaes@linkedin.com>
Fri, 23 Sep 2016 21:02:50 +0000 (14:02 -0700)
Check error code from metadata fetch in getSystemStreamPartitionCounts to avoid returning no data for newly created topics.

samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala

index ba8de5c..5927cca 100644 (file)
@@ -51,7 +51,7 @@ object KafkaSystemAdmin extends Logging {
           val streamPartitionMetadata = systemStreamPartitions
             .map(systemStreamPartition => {
               val partitionMetadata = new SystemStreamPartitionMetadata(
-                // If the topic/partition is empty then oldest and newest will 
+                // If the topic/partition is empty then oldest and newest will
                 // be stripped of their offsets, so default to null.
                 oldestOffsets.getOrElse(systemStreamPartition, null),
                 newestOffsets.getOrElse(systemStreamPartition, null),
@@ -157,6 +157,7 @@ class KafkaSystemAdmin(
           metadataTTL)
         val result = metadata.map {
           case (topic, topicMetadata) => {
+            KafkaUtil.maybeThrowException(topicMetadata.errorCode)
             val partitionsMap = topicMetadata.partitionsMetadata.map {
               pm =>
                 new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
@@ -183,8 +184,8 @@ class KafkaSystemAdmin(
    * SystemStreamPartition that was passed in.
    */
   override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
-    // This is safe to do with Kafka, even if a topic is key-deduped. If the 
-    // offset doesn't exist on a compacted topic, Kafka will return the first 
+    // This is safe to do with Kafka, even if a topic is key-deduped. If the
+    // offset doesn't exist on a compacted topic, Kafka will return the first
     // message AFTER the offset that was specified in the fetch request.
     offsets.mapValues(offset => (offset.toLong + 1).toString)
   }
@@ -376,7 +377,7 @@ class KafkaSystemAdmin(
   private def getTopicsAndPartitionsByBroker(metadata: Map[String, TopicMetadata]) = {
     val brokersToTopicPartitions = metadata
       .values
-      // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] 
+      // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
       .flatMap(topicMetadata => {
         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
         topicMetadata