SAMZA-1075: Refactor SystemAdmin Interface to expose a common method to create streams
authorJacob Maes <jmaes@linkedin.com>
Fri, 17 Feb 2017 20:49:19 +0000 (12:49 -0800)
committerJacob Maes <jmaes@linkedin.com>
Fri, 17 Feb 2017 20:49:19 +0000 (12:49 -0800)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>

Closes #53 from jmakes/samza-1075

samza-api/src/main/java/org/apache/samza/system/StreamSpec.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java [new file with mode: 0644]
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala

diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
new file mode 100644 (file)
index 0000000..d8a2144
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * StreamSpec is a blueprint for creating, validating, or simply describing a stream in the runtime environment.
+ *
+ * It has specific attributes for common behaviors that Samza uses.
+ *
+ * It also includes a map of configurations which may be system-specific.
+ *
+ * It is immutable by design.
+ */
+public class StreamSpec {
+
+  private static final int DEFAULT_PARTITION_COUNT = 1;
+
+  /**
+   * Unique identifier for the stream in a Samza application.
+   * This identifier is used as a key for stream properties in the
+   * job config and to distinguish between streams in a graph.
+   */
+  private final String id;
+
+  /**
+   * The System name on which this stream will exist. Corresponds to a named implementation of the
+   * Samza System abstraction.
+   */
+  private final String systemName;
+
+  /**
+   * The physical identifier for the stream. This is the identifier that will be used in remote
+   * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
+   * might be a file URN.
+   */
+  private final String physicalName;
+
+  /**
+   * The number of partitions for the stream.
+   */
+  private final int partitionCount;
+
+  /**
+   * A set of all system-specific configurations for the stream.
+   */
+  private final Map<String, String> config;
+
+  /**
+   *  @param id           The application-unique logical identifier for the stream. It is used to distinguish between
+   *                      streams in a Samza application so it must be unique in the context of one deployable unit.
+   *                      It does not need to be globally unique or unique with respect to a host.
+   *
+   * @param physicalName  The physical identifier for the stream. This is the identifier that will be used in remote
+   *                      systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
+   *                      might be a file URN.
+   *
+   * @param systemName    The System name on which this stream will exist. Corresponds to a named implementation of the
+   *                      Samza System abstraction. See {@link SystemFactory}
+   */
+  public StreamSpec(String id, String physicalName, String systemName) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap());
+  }
+
+  /**
+   *
+   *  @param id           The application-unique logical identifier for the stream. It is used to distinguish between
+   *                      streams in a Samza application so it must be unique in the context of one deployable unit.
+   *                      It does not need to be globally unique or unique with respect to a host.
+   *
+   * @param physicalName  The physical identifier for the stream. This is the identifier that will be used in remote
+   *                      systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
+   *                      might be a file URN.
+   *
+   * @param systemName    The System name on which this stream will exist. Corresponds to a named implementation of the
+   *                      Samza System abstraction. See {@link SystemFactory}
+   *
+   * @param partitionCount  The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
+   */
+  public StreamSpec(String id, String physicalName, String systemName, int partitionCount) {
+    this(id, physicalName, systemName, partitionCount, Collections.emptyMap());
+  }
+
+  /**
+   *  @param id           The application-unique logical identifier for the stream. It is used to distinguish between
+   *                      streams in a Samza application so it must be unique in the context of one deployable unit.
+   *                      It does not need to be globally unique or unique with respect to a host.
+   *
+   * @param physicalName  The physical identifier for the stream. This is the identifier that will be used in remote
+   *                      systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
+   *                      might be a file URN.
+   *
+   * @param systemName    The System name on which this stream will exist. Corresponds to a named implementation of the
+   *                      Samza System abstraction. See {@link SystemFactory}
+   *
+   * @param config        A map of properties for the stream. These may be System-specfic.
+   */
+  public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config);
+  }
+
+  /**
+   *  @param id             The application-unique logical identifier for the stream. It is used to distinguish between
+   *                        streams in a Samza application so it must be unique in the context of one deployable unit.
+   *                        It does not need to be globally unique or unique with respect to a host.
+   *
+   * @param physicalName    The physical identifier for the stream. This is the identifier that will be used in remote
+   *                        systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
+   *                        might be a file URN.
+   *
+   * @param systemName      The System name on which this stream will exist. Corresponds to a named implementation of the
+   *                        Samza System abstraction. See {@link SystemFactory}
+   *
+   * @param partitionCount  The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
+   *
+   * @param config          A map of properties for the stream. These may be System-specfic.
+   */
+  public StreamSpec(String id, String physicalName, String systemName, int partitionCount,  Map<String, String> config) {
+    if (id == null) {
+      throw new NullPointerException("Parameter 'id' must not be null");
+    }
+
+    if (systemName == null) {
+      throw new NullPointerException("Parameter 'systemName' must not be null");
+    }
+
+    if (partitionCount < 1) {
+      throw new NullPointerException("Parameter 'partitionCount' must not be greater than 0");
+    }
+
+    this.id = id;
+    this.systemName = systemName;
+    this.physicalName = physicalName;
+    this.partitionCount = partitionCount;
+
+    if (config != null) {
+      this.config = Collections.unmodifiableMap(new HashMap<>(config));
+    } else {
+      this.config = Collections.emptyMap();
+    }
+  }
+
+  /**
+   * Copies this StreamSpec, but applies a new partitionCount.
+   *
+   * This method is not static s.t. subclasses can override it.
+   *
+   * @param partitionCount  The partitionCount for the returned StreamSpec.
+   * @return                A copy of this StreamSpec with the specified partitionCount.
+   */
+  public StreamSpec copyWithPartitionCount(int partitionCount) {
+    return new StreamSpec(id, physicalName, systemName, partitionCount, config);
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getPhysicalName() {
+    return physicalName;
+  }
+
+  public int getPartitionCount() {
+    return partitionCount;
+  }
+
+  public Map<String, String> getConfig() {
+    return config;
+  }
+
+  public String get(String propertyName) {
+    return config.get(propertyName);
+  }
+
+  public String getOrDefault(String propertyName, String defaultValue) {
+    return config.getOrDefault(propertyName, defaultValue);
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java b/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java
new file mode 100644 (file)
index 0000000..fef4148
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.SamzaException;
+
+
+public class StreamValidationException extends SamzaException {
+  private static final long serialVersionUID = 1L;
+
+  public StreamValidationException(String s) {
+    super(s);
+  }
+}
index ef99893..b180712 100644 (file)
@@ -28,7 +28,6 @@ import java.util.Set;
  * utility methods that Samza needs in order to interact with a system.
  */
 public interface SystemAdmin {
-
   /**
    * Fetches the offsets for the messages immediately after the supplied offsets
    * for a group of SystemStreamPartitions.
@@ -52,11 +51,12 @@ public interface SystemAdmin {
 
   /**
    * An API to create a change log stream
-   * 
+   *
    * @param streamName
    *          The name of the stream to be created in the underlying stream
    * @param numOfPartitions
    *          The number of partitions in the changelog stream
+   * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
    */
   void createChangelogStream(String streamName, int numOfPartitions);
 
@@ -67,6 +67,7 @@ public interface SystemAdmin {
    *          The name of the stream to be created in the underlying stream
    * @param numOfPartitions
    *          The number of partitions in the changelog stream
+   * @deprecated since 0.12.1, use {@link #validateStream(StreamSpec)}
    */
   void validateChangelogStream(String streamName, int numOfPartitions);
 
@@ -76,6 +77,7 @@ public interface SystemAdmin {
    *
    * @param streamName
    *          The name of the coordinator stream to create.
+   * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
    */
   void createCoordinatorStream(String streamName);
 
@@ -89,4 +91,27 @@ public interface SystemAdmin {
    * @return -1 if offset1 &lt; offset2; 0 if offset1 == offset2; 1 if offset1 &gt; offset2. Null if not comparable
    */
   Integer offsetComparator(String offset1, String offset2);
+
+  /**
+   * Create a stream described by the spec.
+   *
+   * @param streamSpec  The spec, or blueprint from which the physical stream will be created on the system.
+   * @return            {@code true} if the stream was actually created and not pre-existing.
+   *                    {@code false} if the stream was pre-existing.
+   *                    A RuntimeException will be thrown if creation fails.
+   */
+  default boolean createStream(StreamSpec streamSpec) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Validates the stream described by the streamSpec on the system.
+   * A {@link StreamValidationException} should be thrown for any validation error.
+   *
+   * @param streamSpec  The spec, or blueprint for the physical stream on the system.
+   * @throws StreamValidationException if validation fails.
+   */
+  default void validateStream(StreamSpec streamSpec) throws StreamValidationException {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
new file mode 100644 (file)
index 0000000..3255f70
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.system.StreamSpec;
+
+
+/**
+ * Extends StreamSpec with the ability to easily get the topic replication factor.
+ */
+public class KafkaStreamSpec extends StreamSpec {
+  private static final int DEFAULT_REPLICATION_FACTOR = 2;
+
+  /**
+   * The number of replicas for stream durability.
+   */
+  private final int replicationFactor;
+
+  /**
+   * Convenience method to convert a config map to Properties.
+   * @param map The Map to convert.
+   * @return    The Properties instance.
+   */
+  private static Properties mapToProperties(Map<String, String> map) {
+    Properties props = new Properties();
+    props.putAll(map);
+    return props;
+  }
+
+  /**
+   * Convenience method to convert Properties to a config map.
+   * @param properties  The Properties to convert.
+   * @return            The Map instance.
+   */
+  private static Map<String, String> propertiesToMap(Properties properties) {
+    Map<String, String> map = new HashMap<String, String>();
+    for (final String name: properties.stringPropertyNames()) {
+      map.put(name, properties.getProperty(name));
+    }
+    return map;
+  }
+
+  /**
+   * Converts any StreamSpec to a KafkaStreamSpec.
+   * If the original spec already is a KafkaStreamSpec, it is simply returned.
+   *
+   * @param originalSpec  The StreamSpec instance to convert to KafkaStreamSpec.
+   * @return              A KafkaStreamSpec instance.
+   */
+  public static KafkaStreamSpec fromSpec(StreamSpec originalSpec) {
+    if (originalSpec instanceof KafkaStreamSpec) {
+      return ((KafkaStreamSpec) originalSpec);
+    }
+
+    int replicationFactor = Integer.parseInt(originalSpec.getOrDefault( KafkaConfig.TOPIC_REPLICATION_FACTOR(),
+                                                                        KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
+
+    return new KafkaStreamSpec( originalSpec.getId(),
+                                originalSpec.getPhysicalName(),
+                                originalSpec.getSystemName(),
+                                originalSpec.getPartitionCount(),
+                                replicationFactor,
+                                mapToProperties(originalSpec.getConfig()));
+  }
+
+  /**
+   * Convenience constructor to create a KafkaStreamSpec with just a topicName, systemName, and partitionCount.
+   *
+   * @param topicName       The name of the topic.
+   * @param systemName      The name of the System. See {@link org.apache.samza.system.SystemFactory}
+   * @param partitionCount  The number of partitions.
+   */
+  public KafkaStreamSpec(String topicName, String systemName, int partitionCount) {
+    this(topicName, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties());
+  }
+
+  /**
+   * Constructs a StreamSpec with a replication factor.
+   *
+   * @param id                The application-unique logical identifier for the stream. It is used to distinguish between
+   *                          streams in a Samza application so it must be unique in the context of one deployable unit.
+   *                          It does not need to be globally unique or unique with respect to a host.
+   *
+   * @param topicName         The physical identifier for the stream. This is the identifier that will be used in remote
+   *                          systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
+   *                          might be a file URN.
+   *
+   * @param systemName        The System name on which this stream will exist. Corresponds to a named implementation of the
+   *                          Samza System abstraction. See {@link org.apache.samza.system.SystemFactory}
+   *
+   * @param partitionCount    The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
+   *
+   * @param replicationFactor The number of topic replicas in the Kafka cluster for durability.
+   *
+   * @param properties        A set of properties for the stream. These may be System-specfic.
+   */
+  public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount, int replicationFactor,
+      Properties properties) {
+    super(id, topicName, systemName, partitionCount, propertiesToMap(properties));
+
+    if (replicationFactor <= 0) {
+      throw new IllegalArgumentException(
+          String.format("Replication factor %d must be greater than 0.", replicationFactor));
+    }
+    this.replicationFactor = replicationFactor;
+  }
+
+  @Override
+  public StreamSpec copyWithPartitionCount(int partitionCount) {
+    return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), getProperties());
+  }
+
+  public int getReplicationFactor() {
+    return replicationFactor;
+  }
+
+  public Properties getProperties() {
+    return mapToProperties(getConfig());
+  }
+}
index 770220c..e355e7e 100644 (file)
@@ -24,27 +24,33 @@ import java.util.regex.Pattern
 
 import org.apache.samza.util.Util
 import org.apache.samza.util.Logging
+
 import scala.collection.JavaConversions._
 import kafka.consumer.ConsumerConfig
 import java.util.{Properties, UUID}
+
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.samza.SamzaException
 import java.util
+
 import scala.collection.JavaConverters._
 import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 object KafkaConfig {
+  val TOPIC_REPLICATION_FACTOR = "replication.factor"
+  val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
+
   val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
   val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
   val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
-  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
 
-  val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
+  val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
   val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
   // The default segment size to use for changelog topics
   val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
@@ -53,20 +59,20 @@ object KafkaConfig {
   val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
 
   /**
-   * Defines how low a queue can get for a single system/stream/partition
-   * combination before trying to fetch more messages for it.
-   */
+    * Defines how low a queue can get for a single system/stream/partition
+    * combination before trying to fetch more messages for it.
+    */
   val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
 
   val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
 
   /**
-   * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
-   * The bytes for a single system/stream/partition are computed based on this.
-   * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
-   * the bytes limit + size of max message in the partition for a given stream.
-   * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
-   */
+    * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
+    * The bytes for a single system/stream/partition are computed based on this.
+    * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
+    * the bytes limit + size of max message in the partition for a given stream.
+    * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
+    */
   val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
 
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
@@ -75,18 +81,23 @@ object KafkaConfig {
 class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   // checkpoints
   def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+
   def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+
   def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
+
   def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
+
   def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
 
 
   /**
-   * Returns a map of topic -> fetch.message.max.bytes value for all streams that
-   * are defined with this property in the config.
-   */
+    * Returns a map of topic -> fetch.message.max.bytes value for all streams that
+    * are defined with this property in the config.
+    */
   def getFetchMessageMaxBytesTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
@@ -98,9 +109,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   }
 
   /**
-   * Returns a map of topic -> auto.offset.reset value for all streams that
-   * are defined with this property in the config.
-   */
+    * Returns a map of topic -> auto.offset.reset value for all streams that
+    * are defined with this property in the config.
+    */
   def getAutoOffsetResetTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
@@ -113,8 +124,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
   // regex resolver
   def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
   def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
   def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+
   def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
 
   // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
@@ -124,16 +138,16 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val storageConfig = new StorageConfig(config)
     val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
 
-    for((changelogConfig, cn) <- changelogConfigs){
+    for ((changelogConfig, cn) <- changelogConfigs) {
       // Lookup the factory for this particular stream and verify if it's a kafka system
 
       val matcher = pattern.matcher(changelogConfig)
-      val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
+      val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
 
       val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig));
       val systemStream = Util.getSystemStreamFromNames(changelogName)
       val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
-      if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
+      if (classOf[KafkaSystemFactory].getCanonicalName == factoryName) {
         storeToChangelog += storeName -> systemStream.getStream
       }
     }
@@ -147,16 +161,22 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
-    filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
+    filteredConfigs.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
 
+  def getTopicKafkaProperties(systemName: String, streamName: String) = {
+    val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
+    val topicProperties = new Properties
+    filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) }
+    topicProperties
+  }
+
   // kafka config
-  def getKafkaSystemConsumerConfig(
-    systemName: String,
-    clientId: String,
-    groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
-    injectedProps: Map[String, String] = Map()) = {
+  def getKafkaSystemConsumerConfig( systemName: String,
+                                    clientId: String,
+                                    groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
+                                    injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.consumer." format systemName, true)
     val consumerProps = new Properties()
@@ -167,10 +187,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     new ConsumerConfig(consumerProps)
   }
 
-  def getKafkaSystemProducerConfig(
-    systemName: String,
-    clientId: String,
-    injectedProps: Map[String, String] = Map()) = {
+  def getKafkaSystemProducerConfig( systemName: String,
+                                    clientId: String,
+                                    injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)
     val producerProps = new util.HashMap[String, Object]()
@@ -197,45 +216,45 @@ class KafkaProducerConfig(val systemName: String,
     val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
     producerProperties.putAll(properties)
 
-    if(!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-      debug("%s undefined. Defaulting to %s." format (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+    if (!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
       producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
     }
 
-    if(!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-      debug("%s undefined. Defaulting to %s." format (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+    if (!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
       producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
     }
 
-    if(producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
-        && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
-      warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged." 
-          format (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
+    if (producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
+      && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
+      warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
+        format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
     } else {
       producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
     }
 
-    if(producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG) 
-        && producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) {
-        warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
+    if (producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)
+      && producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) {
+      warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
     } else {
       // Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
       // for producer failure
       producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
     }
-    
+
     producerProperties
   }
 
-  val reconnectIntervalMs =  Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
-          .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
+  val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
+    .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
 
   val bootsrapServers = {
-    if(properties.containsKey("metadata.broker.list"))
+    if (properties.containsKey("metadata.broker.list"))
       warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
-             "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
+        "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
     Option(properties.get("bootstrap.servers"))
-            .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
-            .asInstanceOf[String]
+      .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
+      .asInstanceOf[String]
   }
 }
index 955fa44..309b653 100644 (file)
 package org.apache.samza.system.kafka
 
 import java.util
+import java.util.{Properties, UUID}
 
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
-import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging, KafkaUtil }
+import kafka.admin.AdminUtils
 import kafka.api._
-import kafka.consumer.SimpleConsumer
-import kafka.common.{ TopicExistsException, TopicAndPartition }
-import kafka.consumer.ConsumerConfig
+import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils.ZkUtils
-import java.util.{ Properties, UUID }
+import org.apache.samza.config.KafkaConfig
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system._
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
+import org.apache.samza.{Partition, SamzaException}
+
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
-import kafka.consumer.ConsumerConfig
-import kafka.admin.AdminUtils
-import org.apache.samza.util.KafkaUtil
 
 
 object KafkaSystemAdmin extends Logging {
@@ -269,12 +267,12 @@ class KafkaSystemAdmin(
   }
 
   /**
-    * Returns the newest offset for the specified SSP.
-    * This method is fast and targeted. It minimizes the number of kafka requests.
-    * It does not retry indefinitely if there is any failure.
-    * It returns null if the topic is empty. To get the offsets for *all*
-    * partitions, it would be more efficient to call getSystemStreamMetadata
-    */
+   * Returns the newest offset for the specified SSP.
+   * This method is fast and targeted. It minimizes the number of kafka requests.
+   * It does not retry indefinitely if there is any failure.
+   * It returns null if the topic is empty. To get the offsets for *all*
+   * partitions, it would be more efficient to call getSystemStreamMetadata
+   */
   override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = {
     debug("Fetching newest offset for: %s" format ssp)
     var offset: String = null
@@ -334,34 +332,14 @@ class KafkaSystemAdmin(
 
   override def createCoordinatorStream(streamName: String) {
     info("Attempting to create coordinator stream %s." format streamName)
-    new ExponentialSleepStrategy(initialDelayMs = 500).run(
-      loop => {
-        val zkClient = connectZk()
-        try {
-          AdminUtils.createTopic(
-            zkClient,
-            streamName,
-            1, // Always one partition for coordinator stream.
-            coordinatorStreamReplicationFactor,
-            coordinatorStreamProperties)
-        } finally {
-          zkClient.close
-        }
 
-        info("Created coordinator stream %s." format streamName)
-        loop.done
-      },
+    val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
 
-      (exception, loop) => {
-        exception match {
-          case e: TopicExistsException =>
-            info("Coordinator stream %s already exists." format streamName)
-            loop.done
-          case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format (streamName, e))
-            debug("Exception detail:", e)
-        }
-      })
+    if (createStream(streamSpec)) {
+      info("Created coordinator stream %s." format streamName)
+    } else {
+      info("Coordinator stream %s already exists." format streamName)
+    }
   }
 
   /**
@@ -435,44 +413,57 @@ class KafkaSystemAdmin(
     offsets
   }
 
-  private def createTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) {
-    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
-    info("Attempting to create change log topic %s." format topicName)
-    info("Using partition count " + numKafkaChangelogPartitions + " for creating change log topic")
-    val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
-    retryBackoff.run(
+  /**
+   * @inheritdoc
+   */
+  override def createStream(spec: StreamSpec): Boolean = {
+    val kSpec = KafkaStreamSpec.fromSpec(spec);
+    var streamCreated = false
+
+    new ExponentialSleepStrategy(initialDelayMs = 500).run(
       loop => {
         val zkClient = connectZk()
         try {
           AdminUtils.createTopic(
             zkClient,
-            topicName,
-            numKafkaChangelogPartitions,
-            topicMetaInfo.replicationFactor,
-            topicMetaInfo.kafkaProps)
+            kSpec.getPhysicalName,
+            kSpec.getPartitionCount,
+            kSpec.getReplicationFactor,
+            kSpec.getProperties)
         } finally {
           zkClient.close
         }
 
-        info("Created changelog topic %s." format topicName)
+        streamCreated = true
         loop.done
       },
 
       (exception, loop) => {
         exception match {
           case e: TopicExistsException =>
-            info("Changelog topic %s already exists." format topicName)
+            streamCreated = false
             loop.done
           case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format (topicName, e))
+            warn("Failed to create topic %s: %s. Retrying." format (spec.getPhysicalName, e))
             debug("Exception detail:", e)
         }
       })
+
+    streamCreated
   }
 
-  private def validateTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) {
+  /**
+    * @inheritdoc
+    *
+    * Validates a stream in Kafka. Should not be called before createStream(),
+    * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients,
+    * is not read-only and will auto-create a new topic.
+    */
+  override def validateStream(spec: StreamSpec): Unit = {
+    val topicName = spec.getPhysicalName
+    info("Validating topic %s." format topicName)
+
     val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
-    info("Validating changelog topic %s." format topicName)
     var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
     retryBackoff.run(
       loop => {
@@ -482,17 +473,17 @@ class KafkaSystemAdmin(
         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount < numKafkaChangelogPartitions) {
-          throw new KafkaChangelogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, numKafkaChangelogPartitions))
+        if (partitionCount != spec.getPartitionCount) {
+          throw new StreamValidationException("Topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, spec.getPartitionCount))
         }
 
-        info("Successfully validated changelog topic %s." format topicName)
+        info("Successfully validated topic %s." format topicName)
         loop.done
       },
 
       (exception, loop) => {
         exception match {
-          case e: KafkaChangelogException => throw e
+          case e: StreamValidationException => throw e
           case e: Exception =>
             warn("While trying to validate topic %s: %s. Retrying." format (topicName, e))
             debug("Exception detail:", e)
@@ -502,24 +493,32 @@ class KafkaSystemAdmin(
   }
 
   /**
-   * Exception to be thrown when the change log stream creation or validation has failed
-   */
+    * Exception to be thrown when the change log stream creation or validation has failed
+    */
   class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
     def this(s: String) = this(s, null)
   }
 
   override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
-    createTopicInKafka(topicName, numKafkaChangelogPartitions)
-    validateChangelogStream(topicName, numKafkaChangelogPartitions)
+    val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
+    val spec = new KafkaStreamSpec(topicName, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
+
+    if (createStream(spec)) {
+      info("Created changelog stream %s." format topicName)
+    } else {
+      info("Changelog stream %s already exists." format topicName)
+    }
+
+    validateStream(spec)
   }
 
   /**
-   * Validates change log stream in Kafka. Should not be called before createChangelogStream(),
-   * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and
-   * will auto-create a new topic.
-   */
+    * Validates a stream in Kafka. Should not be called before createStream(),
+    * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and
+    * will auto-create a new topic.
+    */
   override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
-    validateTopicInKafka(topicName, numKafkaChangelogPartitions)
+    validateStream(new KafkaStreamSpec(topicName, systemName, numKafkaChangelogPartitions))
   }
 
   /**
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
new file mode 100644 (file)
index 0000000..a786468
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.util.Util;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+
+
+public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
+
+  KafkaSystemAdmin basicSystemAdmin = createSystemAdmin();
+
+
+  @Test
+  public void testCreateCoordinatorStreamDelegatesToCreateStream() {
+    KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000);
+    SystemAdmin admin = Mockito.spy(systemAdmin);
+    StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", "testSystem");
+
+    admin.createCoordinatorStream(spec.getPhysicalName());
+    admin.validateStream(spec);
+
+    Mockito.verify(admin).createStream(Mockito.any());
+  }
+
+  @Test
+  public void testCreateChangelogStreamDelegatesToCreateStream() {
+    final String STREAM = "testChangeLogStream";
+    final int PARTITIONS = 12;
+    final int REP_FACTOR = 3;
+
+    Properties coordProps = new Properties();
+    Properties changeLogProps = new Properties();
+    changeLogProps.setProperty("cleanup.policy", "compact");
+    changeLogProps.setProperty("segment.bytes", "139");
+    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+    changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
+
+    SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
+    StreamSpec spec = new StreamSpec(STREAM, STREAM, SYSTEM(), PARTITIONS);
+    admin.createChangelogStream(STREAM, PARTITIONS);
+    admin.validateStream(spec);
+
+    ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
+    Mockito.verify(admin).createStream(specCaptor.capture());
+
+    StreamSpec internalSpec = specCaptor.getValue();
+    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
+    assertEquals(STREAM, internalSpec.getId());
+    assertEquals(SYSTEM(), internalSpec.getSystemName());
+    assertEquals(STREAM, internalSpec.getPhysicalName());
+    assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+    assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+    assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
+  }
+
+  @Test
+  public void testValidateChangelogStreamDelegatesToValidateStream() {
+    final String STREAM = "testChangeLogValidate";
+    Properties coordProps = new Properties();
+    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+    changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
+
+    KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
+    SystemAdmin admin = Mockito.spy(systemAdmin);
+    StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+
+    admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+    admin.validateStream(spec);
+    admin.validateChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+
+    Mockito.verify(admin).createStream(Mockito.any());
+    Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
+  }
+
+  @Test
+  public void testCreateStream() {
+    SystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
+
+    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
+    admin.validateStream(spec);
+
+    assertFalse("createStream should return false if the stream already exists.", admin.createStream(spec));
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testValidateStreamDoesNotExist() {
+    SystemAdmin admin = this.basicSystemAdmin;
+
+    StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8);
+
+    admin.validateStream(spec);
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testValidateStreamWrongPartitionCount() {
+    SystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
+    StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
+
+    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
+
+    admin.validateStream(spec2);
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testValidateStreamWrongName() {
+    SystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
+    StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
+
+    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
+
+    admin.validateStream(spec2);
+  }
+}
index 0e3c9b5..be7db97 100644 (file)
 
 package org.apache.samza.system.kafka
 
-import java.util
-import java.util.Properties
+import java.util.{Properties, UUID}
 
 import kafka.admin.AdminUtils
 import kafka.common.{ErrorMapping, LeaderNotAvailableException}
 import kafka.consumer.{Consumer, ConsumerConfig}
-import kafka.server.{KafkaConfig, KafkaServer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import kafka.utils.{TestUtils, ZkUtils}
 import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, ZkUtils}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.JaasUtils
-
 import org.apache.samza.Partition
 import org.apache.samza.config.KafkaProducerConfig
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -43,7 +41,9 @@ import org.junit._
 
 import scala.collection.JavaConversions._
 
-
+/**
+  * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava
+  */
 object TestKafkaSystemAdmin extends KafkaServerTestHarness {
 
   val SYSTEM = "kafka"
@@ -136,6 +136,14 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
     Consumer.create(consumerConfig)
   }
 
+  def createSystemAdmin: KafkaSystemAdmin = {
+    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+  }
+
+  def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
+    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation)
+  }
+
 }
 
 /**
@@ -146,7 +154,7 @@ class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
   // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
-  val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+  val systemAdmin = createSystemAdmin
 
   @Test
   def testShouldAssembleMetadata {