SAMZA-1387: Unable to Start Samza App Because Regex Check
[samza.git] / samza-kafka / src / main / scala / org / apache / samza / system / kafka / KafkaSystemAdmin.scala
index af77d5b..1e59b61 100644 (file)
@@ -38,8 +38,9 @@ import scala.collection.JavaConverters._
 
 object KafkaSystemAdmin extends Logging {
   // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
-  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
+  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 and 1387
   val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+  val COORDINATOR_STREAMID = "unused-temp-coordinator-stream-id"
 
   /**
    * A helper method that takes oldest, newest, and upcoming offsets for each
@@ -331,7 +332,7 @@ class KafkaSystemAdmin(
   override def createCoordinatorStream(streamName: String) {
     info("Attempting to create coordinator stream %s." format streamName)
 
-    val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+    val streamSpec = new KafkaStreamSpec(COORDINATOR_STREAMID, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
 
     if (createStream(streamSpec)) {
       info("Created coordinator stream %s." format streamName)
@@ -496,7 +497,7 @@ class KafkaSystemAdmin(
   class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
     def this(s: String) = this(s, null)
   }
-  
+
   override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
     val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
     val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)