SAMZA-1387: Unable to Start Samza App Because Regex Check
authorJacob Maes <jmaes@linkedin.com>
Fri, 11 Aug 2017 16:28:20 +0000 (09:28 -0700)
committerJacob Maes <jmaes@linkedin.com>
Fri, 11 Aug 2017 16:28:20 +0000 (09:28 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Fred Ji <fredji97@yahoo.com>

Closes #266 from jmakes/samza-1387

samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java

index af77d5b..1e59b61 100644 (file)
@@ -38,8 +38,9 @@ import scala.collection.JavaConverters._
 
 object KafkaSystemAdmin extends Logging {
   // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
-  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
+  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 and 1387
   val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+  val COORDINATOR_STREAMID = "unused-temp-coordinator-stream-id"
 
   /**
    * A helper method that takes oldest, newest, and upcoming offsets for each
@@ -331,7 +332,7 @@ class KafkaSystemAdmin(
   override def createCoordinatorStream(streamName: String) {
     info("Attempting to create coordinator stream %s." format streamName)
 
-    val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+    val streamSpec = new KafkaStreamSpec(COORDINATOR_STREAMID, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
 
     if (createStream(streamSpec)) {
       info("Created coordinator stream %s." format streamName)
@@ -496,7 +497,7 @@ class KafkaSystemAdmin(
   class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
     def this(s: String) = this(s, null)
   }
-  
+
   override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
     val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
     val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
index ce59b40..33c4017 100644 (file)
@@ -51,6 +51,25 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   }
 
   @Test
+  public void testCreateCoordinatorStreamDelegatesToCreateStream_specialCharsInTopicName() {
+    final String STREAM = "test.Coord_inator.Stream";
+
+    SystemAdmin admin = Mockito.spy(createSystemAdmin());
+    StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM());
+    admin.createCoordinatorStream(STREAM);
+    admin.validateStream(spec);
+
+    ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
+    Mockito.verify(admin).createStream(specCaptor.capture());
+
+    StreamSpec internalSpec = specCaptor.getValue();
+    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
+    assertEquals(KafkaSystemAdmin.COORDINATOR_STREAMID(), internalSpec.getId());
+    assertEquals(SYSTEM(), internalSpec.getSystemName());
+    assertEquals(STREAM, internalSpec.getPhysicalName());
+  }
+
+  @Test
   public void testCreateChangelogStreamDelegatesToCreateStream() {
     final String STREAM = "testChangeLogStream";
     final int PARTITIONS = 12;