Minor: KafkaConfig should treat empty changelog name as no changelog.
authorPrateek Maheshwari <pmaheshwari@linkedin.com>
Wed, 6 Jun 2018 18:00:06 +0000 (11:00 -0700)
committerPrateek Maheshwari <pmaheshwari@linkedin.com>
Wed, 6 Jun 2018 18:00:06 +0000 (11:00 -0700)
If a store changelog stream name is empty, treat is as a non-changelogged store instead of throwing an exception.

Author: Prateek Maheshwari <pmaheshwari@linkedin.com>

Reviewers: Jagadish Venkatraman <vjagadish1989@gmail.com>

Closes #546 from prateekm/kafka-changelog

samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala

index 124c85a..07f4710 100644 (file)
@@ -236,10 +236,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
       val matcher = pattern.matcher(changelogConfig)
       val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
 
-      val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig));
-      val systemStream = Util.getSystemStreamFromNames(changelogName)
-      val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
-      storeToChangelog += storeName -> systemStream.getStream
+      storageConfig.getChangelogStream(storeName).foreach(changelogName => {
+        val systemStream = Util.getSystemStreamFromNames(changelogName)
+        val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
+        storeToChangelog += storeName -> systemStream.getStream
+      })
     }
     storeToChangelog
   }