Samza 1214: Allow users to set a default replication.factor for intermediate topics
authorJacob Maes <jmaes@linkedin.com>
Thu, 27 Apr 2017 22:05:28 +0000 (15:05 -0700)
committerJacob Maes <jmaes@linkedin.com>
Thu, 27 Apr 2017 22:05:28 +0000 (15:05 -0700)
* Add a new "systems.sysName.default.stream.*" config structure that allows users to set system-wide defaults for streams.
* More thorough testing of system defaults and stream defaults
* Removed the old migration config from the config table since there's no code to support it.
* Moved 2 kafka-specific config accessors out of JobConfig and into KafkaConfig
* Removed duplicate impl of getChangelogStream()

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Jagadish <jvenkatr@linkedin.com>

Closes #141 from jmakes/samza-1214

docs/learn/documentation/versioned/jobs/configuration-table.html
samza-api/src/main/java/org/apache/samza/config/MapConfig.java
samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala

index df59547..0fc30c5 100644 (file)
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
-                    <td class="default" rowspan="2"></td>
+                    <td class="property" id="systems-default-stream">systems.<span class="system">system-name</span>.<br>default.stream.*</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        A set of default properties for any stream associated with the system. For example, if
+                        "systems.kafka-system.default.stream.replication.factor"=2 was configured, then every Kafka stream
+                        created on the kafka-system will have a replication factor of 2 unless the property is explicitly
+                        overridden at the stream scope using <a href="#streams-properties">streams properties</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>default.stream.samza.key.serde</td>
+                    <td class="default"></td>
                     <td class="description">
                         The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
                         <em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
                         the task and the output stream producer.
                     </td>
                 </tr>
-                <tr>
-                    <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
-                    <td class="description">
-                        This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
-                        streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
-                    </td>
-                </tr>
 
                 <tr>
-                    <td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
-                    <td class="default" rowspan="2"></td>
+                    <td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>default.stream.samza.msg.serde</td>
+                    <td class="default"></td>
                     <td class="description">
                         The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
                         <em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
                         the task and the output stream producer.
                     </td>
                 </tr>
-                <tr>
-                    <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
-                    <td class="description">
-                        This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
-                        streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
-                    </td>
-                </tr>
 
                 <tr>
-                    <td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
-                    <td class="default" rowspan="2">upcoming</td>
+                    <td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>default.stream.samza.offset.default</td>
+                    <td class="default">upcoming</td>
                     <td class="description">
                         If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
                         this property determines where in the input stream we should start consuming. The value must be an
                         If both are defined, the stream-level definition takes precedence.
                     </td>
                 </tr>
-                <tr>
-                    <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
-                    <td class="description">
-                        This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
-                        streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
-                    </td>
-                </tr>
 
                 <tr>
                     <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
                         This property defines a store, Samza's mechanism for efficient
                         <a href="../container/state-management.html">stateful stream processing</a>. You can give a
                         store any <span class="store">store-name</span> except <em>default</em> (the <span class="store">store-name</span>
-                        <em>default</em> is reserved for defining default store parameters), and use that name to get a 
+                        <em>default</em> is reserved for defining default store parameters), and use that name to get a
                         reference to the store in your stream task (call
                         <a href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
                         in your task's
                     <td class="default"></td>
                     <td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td>
                 </tr>
-
-                <tr>
-                    <th colspan="3" class="section" id="task-migration">
-                        Migrating from Samza 0.9.1 to 0.10.0<br>
-                        <span class="subtitle">
-                            (This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set
-                            <a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a> to anything <b> other than </b>
-                            <code>org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code>)
-                        </span>
-                    </th>
-                </tr>
-
-                <tr>
-                    <td class="property" id="task-checkpoint-skip-migration">task.checkpoint.skip-migration</td>
-                    <td class="default">false</td>
-                    <td class="description">
-                        When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream. <br />
-                        If you are using a checkpoint manager other than kafka
-                        (<code>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code>), you have to
-                        manually migrate taskName-to-changelog partition mapping to the coordinator stream. <br />
-                        This can be achieved with the assistance of the <code>checkpoint-tool.sh</code>.
-                    </td>
-                </tr>
             </tbody>
         </table>
     </body>
index 0c3f14a..d72d486 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.apache.samza.config;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,11 +42,15 @@ public class MapConfig extends Config {
   }
 
   public MapConfig(List<Map<String, String>> maps) {
-    this.map = new HashMap<String, String>();
+    this.map = new HashMap<>();
     for (Map<String, String> m: maps)
       this.map.putAll(m);
   }
 
+  public MapConfig(Map<String, String>... maps) {
+    this(Arrays.asList(maps));
+  }
+
   public String get(Object k) {
     return map.get(k);
   }
index 5db94a6..a1f0ec0 100644 (file)
@@ -58,7 +58,7 @@ public class JavaStorageConfig extends MapConfig {
     // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
     // these values will be combined into <asystem>.<astream>
     String systemStream = get(String.format(CHANGELOG_STREAM, storeName), null);
-    String changelogSystem = get(CHANGELOG_SYSTEM, null);
+    String changelogSystem = getChangelogSystem();
 
     String systemStreamRes;
     if (systemStream != null  && !systemStream.contains(".")) {
@@ -85,4 +85,21 @@ public class JavaStorageConfig extends MapConfig {
   public String getStorageMsgSerde(String storeName) {
     return get(String.format(MSG_SERDE, storeName), null);
   }
+
+  /**
+   * Gets the System to use for reading/writing checkpoints. Uses the following precedence.
+   *
+   * 1. If job.changelog.system is defined, that value is used.
+   * 2. If job.default.system is defined, that value is used.
+   * 3. null
+   *
+   * Note: Changelogs can be defined using
+   * stores.storeName.changelog=systemName.streamName  or
+   * stores.storeName.changelog=streamName
+   *
+   * If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used.
+   */
+  public String getChangelogSystem() {
+    return get(CHANGELOG_SYSTEM,  get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
+  }
 }
index 47d90a4..6408438 100644 (file)
@@ -35,7 +35,8 @@ import org.apache.samza.util.Util;
 public class JavaSystemConfig extends MapConfig {
   private static final String SYSTEM_PREFIX = "systems.";
   private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
-  private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
+  private static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
+  private static final String SYSTEM_DEFAULT_STREAMS_PREFIX = SYSTEM_PREFIX + "%s" + ".default.stream.";
   private static final String EMPTY = "";
 
   public JavaSystemConfig(Config config) {
@@ -46,7 +47,7 @@ public class JavaSystemConfig extends MapConfig {
     if (name == null) {
       return null;
     }
-    String systemFactory = String.format(SYSTEM_FACTORY, name);
+    String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name);
     return get(systemFactory, null);
   }
 
@@ -99,4 +100,11 @@ public class JavaSystemConfig extends MapConfig {
 
     return systemFactories;
   }
+
+  /**
+   * Gets the system-wide defaults for streams.
+   */
+  public Config getDefaultStreamProperties(String systemName) {
+    return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true);
+  }
 }
index 4e86b7c..030d945 100644 (file)
@@ -23,7 +23,6 @@ package org.apache.samza.config
 import java.io.File
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.system.{RegexSystemStreamPartitionMatcher, SystemStreamPartitionMatcher}
 import org.apache.samza.util.Logging
 
 object JobConfig {
@@ -47,8 +46,6 @@ object JobConfig {
   val JOB_CONTAINER_COUNT = "job.container.count"
   val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
-  val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
-  val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
   val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions"
 
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
@@ -104,8 +101,22 @@ object JobConfig {
 class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getName = getOption(JobConfig.JOB_NAME)
 
-  def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
-      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
+  def getCoordinatorSystemName = {
+    val system = getCoordinatorSystemNameOrNull
+    if (system == null) {
+      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")
+    }
+    system
+  }
+
+  /**
+    * Gets the System to use for reading/writing the coordinator stream. Uses the following precedence.
+    *
+    * 1. If job.coordinator.system is defined, that value is used.
+    * 2. If job.default.system is defined, that value is used.
+    * 3. None
+    */
+  def getCoordinatorSystemNameOrNull =  getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(getDefaultSystem.orNull)
 
   def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)
 
@@ -144,31 +155,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSecurityManagerFactory = getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY)
 
-  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
-  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
-
-  def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
-    case Some(rplFactor) => rplFactor
-    case _ =>
-      getOption(CHECKPOINT_REPLICATION_FACTOR) match {
-        case Some(rplFactor) =>
-          info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_REPLICATION_FACTOR, CHECKPOINT_REPLICATION_FACTOR, rplFactor))
-          rplFactor
-        case _ => "3"
-      }
-  }
-
-  def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match {
-    case Some(segBytes) => segBytes
-    case _ =>
-      getOption(CHECKPOINT_SEGMENT_BYTES) match {
-        case Some(segBytes) =>
-          info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_SEGMENT_BYTES, CHECKPOINT_SEGMENT_BYTES, segBytes))
-          segBytes
-        case _ => "26214400"
-      }
-  }
-
   def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS)
 
   def getSSPMatcherConfigRegex = getExcept(JobConfig.SSP_MATCHER_CONFIG_REGEX)
index 10b4d1d..8dbf739 100644 (file)
@@ -46,24 +46,8 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
   def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
 
   def getChangelogStream(name: String) = {
-    // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
-    // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
-    // these values will be combined into <asystem>.<astream>
-    val systemStream = getOption(CHANGELOG_STREAM format name)
-    val changelogSystem = getOption(CHANGELOG_SYSTEM)
-    val systemStreamRes =
-      if ( systemStream.isDefined  && ! systemStream.getOrElse("").contains('.')) {
-        // contains only stream name
-        if (changelogSystem.isDefined) {
-          Some(changelogSystem.get + "." + systemStream.get)
-        }
-        else {
-          throw new SamzaException("changelog system is not defined:" + systemStream.get)
-        }
-      } else {
-        systemStream
-      }
-    systemStreamRes
+    val javaStorageConfig = new JavaStorageConfig(config)
+    Option(javaStorageConfig.getChangelogStream(name))
   }
 
   def getChangeLogDeleteRetentionInMs(storeName: String) = {
index e40994a..43cc9a9 100644 (file)
@@ -81,11 +81,14 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
    * Returns a list of all SystemStreams that have a serde defined from the config file.
    */
   def getSerdeStreams(systemName: String) = {
+    val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(systemName)
+    val hasSystemDefaultSerde = defaultStreamProperties.containsKey(StreamConfig.MSG_SERDE) || defaultStreamProperties.containsKey(StreamConfig.KEY_SERDE)
+
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     val legacySystemStreams = subConf
       .asScala
       .keys
-      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
       .map(k => {
         val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
         new SystemStream(systemName, streamName)
@@ -94,7 +97,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
       .asScala
       .keys
-      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
       .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ))
       .filter(streamId => systemName.equals(getSystem(streamId)))
       .map(streamId => streamIdToSystemStream(streamId)).toSet
@@ -220,10 +223,13 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     * @return           the map of properties for the stream
     */
   private def getSystemStreamProperties(systemName: String, streamName: String) = {
-    if (systemName == null || streamName == null) {
+    if (systemName == null) {
       Map()
     }
-    config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
+    val systemConfig = new JavaSystemConfig(config);
+    val defaults = systemConfig.getDefaultStreamProperties(systemName);
+    val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
+    new MapConfig(defaults, explicitConfigs)
   }
 
   /**
index 639be3d..4580cc4 100644 (file)
@@ -41,6 +41,13 @@ public class TestStreamConfig {
   private static final String STREAM2_STREAM_ID = "streamId2";
   private static final SystemStream SYSTEM_STREAM_2 = new SystemStream(STREAM2_SYSTEM, STREAM2_PHYSICAL_NAME);
 
+  private static final String STREAM3_SYSTEM = "Sys3";
+  private static final String STREAM3_PHYSICAL_NAME = "Str3";
+  private static final String STREAM3_STREAM_ID = "streamId3";
+  private static final SystemStream SYSTEM_STREAM_3 = new SystemStream(STREAM3_SYSTEM, STREAM3_PHYSICAL_NAME);
+
+  private static final String SYSTEM_DEFAULT_STREAM_PATTERN = "systems.%s.default.stream.";
+
 
   @Test(expected = IllegalArgumentException.class)
   public void testGetSamzaPropertyThrowsIfInvalidPropertyName() {
@@ -185,6 +192,44 @@ public class TestStreamConfig {
     assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
   }
 
+  @Test
+  public void testStreamPropertyDefaults() {
+    final String nonSamzaProperty = "replication.factor";
+    StreamConfig config = buildConfig(
+        buildSystemDefaultProp(STREAM1_SYSTEM, nonSamzaProperty), "1",
+        buildSystemDefaultProp(STREAM1_SYSTEM, StreamConfig.KEY_SERDE()), "value1",
+        buildSystemDefaultProp(STREAM1_SYSTEM, StreamConfig.CONSUMER_OFFSET_DEFAULT()), "newest",
+        buildProp(SYSTEM_STREAM_1, "dummyStreamProperty"), "dummyValue",
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME,
+        buildSystemDefaultProp(STREAM2_SYSTEM, nonSamzaProperty), "2",
+        buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
+        buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM2_PHYSICAL_NAME,
+        buildProp(STREAM2_STREAM_ID, nonSamzaProperty), "3",
+        buildSystemDefaultProp(STREAM3_SYSTEM, nonSamzaProperty), "4",
+        buildProp(STREAM3_STREAM_ID, StreamConfig.SYSTEM()), STREAM3_SYSTEM,
+        buildProp(STREAM3_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM3_PHYSICAL_NAME,
+        buildProp(SYSTEM_STREAM_3, nonSamzaProperty), "5",
+        "key3", "value3");
+
+
+
+    // Ensure that we can set legacy system properties via the new system wide default
+    assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
+    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
+    assertEquals("newest", config.getDefaultStreamOffset(SYSTEM_STREAM_1).get());
+
+    // Property set via systems.x.default.stream.* only
+    assertEquals("1", config.getStreamProperties(STREAM1_STREAM_ID).get(nonSamzaProperty));
+
+    // Property set via systems.x.default.stream.* and streams.y.*
+    assertEquals("3", config.getStreamProperties(STREAM2_STREAM_ID).get(nonSamzaProperty));
+
+    // Property set via systems.x.default.stream.* and system.x.streams.z.*
+    assertEquals("5", config.getStreamProperties(STREAM3_STREAM_ID).get(nonSamzaProperty));
+  }
+
+
   private StreamConfig buildConfig(String... kvs) {
     if (kvs.length % 2 != 0) {
       throw new IllegalArgumentException("There must be parity between the keys and values");
@@ -205,6 +250,10 @@ public class TestStreamConfig {
     return String.format(SYSTEM_STREAM_PATTERN, systemStream.getSystem(), systemStream.getStream()) + suffix;
   }
 
+  private String buildSystemDefaultProp(String system, String suffix) {
+    return String.format(SYSTEM_DEFAULT_STREAM_PATTERN, system) + suffix;
+  }
+
   private Config addConfigs(Config original, String... kvs) {
     Map<String, String> result = new HashMap<>();
     result.putAll(original);
index 8e0d5fc..bd95d0b 100644 (file)
@@ -195,6 +195,26 @@ public class TestAbstractApplicationRunner {
     assertEquals("systemValue2", properties.get("systemProperty2"));
   }
 
+  // Verify that we use a default specified with systems.x.default.stream.*, if specified
+  @Test
+  public void testStreamConfigOverridesWithSystemDefaults() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "segment.bytes", "5309"),
+        String.format("systems.%s.default.stream.replication.factor", TEST_SYSTEM), "4", // System default property
+        String.format("systems.%s.default.stream.segment.bytest", TEST_SYSTEM), "867"
+        );
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.getStreamSpec(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertEquals("4", properties.get("replication.factor")); // Uses system default
+    assertEquals("5309", properties.get("segment.bytes")); // Overrides system default
+  }
+
   // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
   @Test
   public void testGetStreamPhysicalNameArgSimple() {
index a8c1f3a..9ac21ef 100644 (file)
@@ -47,9 +47,11 @@ object KafkaConfig {
   val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
   val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
+  val SEGMENT_BYTES = "segment.bytes"
+
   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
-  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES
 
   val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
   val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
@@ -60,6 +62,9 @@ object KafkaConfig {
   // Helper regular expression definitions to extract/match configurations
   val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
 
+  val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR
+  val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
+
   /**
     * Defines how low a queue can get for a single system/stream/partition
     * combination before trying to fetch more messages for it.
@@ -81,12 +86,83 @@ object KafkaConfig {
 }
 
 class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
-  // checkpoints
-  def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+  /**
+    * Gets the System to use for reading/writing checkpoints. Uses the following precedence.
+    *
+    * 1. If task.checkpoint.system is defined, that value is used.
+    * 2. If job.default.system is defined, that value is used.
+    * 3. None
+    */
+  def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM, new JobConfig(config).getDefaultSystem.orNull))
+
+  /**
+    * Gets the replication factor for the checkpoint topic. Uses the following precedence.
+    *
+    * 1. If task.checkpoint.replication.factor is configured, that value is used.
+    * 2. If systems.checkpoint-system.default.stream.replication.factor is configured, that value is used.
+    * 3. None
+    *
+    * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
+    */
+  def getCheckpointReplicationFactor() = {
+    val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, "3")
+    val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor)
+
+    Option(replicationFactor)
+  }
+
+  private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
+    val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+    defaultReplicationFactor
+  }
+
+  /**
+    * Gets the segment bytes for the checkpoint topic. Uses the following precedence.
+    *
+    * 1. If task.checkpoint.segment.bytes is configured, that value is used.
+    * 2. If systems.checkpoint-system.default.stream.segment.bytes is configured, that value is used.
+    * 3. None
+    *
+    * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
+    */
+  def getCheckpointSegmentBytes() = {
+    val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+    getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
+  }
 
-  def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+  /**
+    * Gets the replication factor for the coordinator topic. Uses the following precedence.
+    *
+    * 1. If job.coordinator.replication.factor is configured, that value is used.
+    * 2. If systems.coordinator-system.default.stream.replication.factor is configured, that value is used.
+    * 3. 3
+    *
+    * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
+    */
+  def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match {
+    case Some(rplFactor) => rplFactor
+    case _ =>
+      val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
+      val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+      systemReplicationFactor
+  }
 
-  def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+  /**
+    * Gets the segment bytes for the coordinator topic. Uses the following precedence.
+    *
+    * 1. If job.coordinator.segment.bytes is configured, that value is used.
+    * 2. If systems.coordinator-system.default.stream.segment.bytes is configured, that value is used.
+    * 3. None
+    *
+    * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
+    */
+  def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match {
+    case Some(segBytes) => segBytes
+    case _ =>
+      val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
+      val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+      segBytes
+  }
 
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
@@ -95,7 +171,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
   def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
 
-
   /**
     * Returns a map of topic -> fetch.message.max.bytes value for all streams that
     * are defined with this property in the config.
@@ -133,8 +208,21 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
 
+  /**
+    * Gets the replication factor for the changelog topics. Uses the following precedence.
+    *
+    * 1. If stores.myStore.changelog.replication.factor is configured, that value is used.
+    * 2. If systems.changelog-system.default.stream.replication.factor is configured, that value is used.
+    * 3. 2
+    *
+    * Note that the changelog-system has a similar precedence. See [[JavaStorageConfig]]
+    */
   def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor)
-  def getDefaultChangelogStreamReplicationFactor = getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse("2")
+
+  def getDefaultChangelogStreamReplicationFactor() = {
+    val changelogSystem =  new JavaStorageConfig(config).getChangelogSystem()
+    getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2"))
+  }
 
   // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
   def getKafkaChangelogEnabledStores() = {
index 106a0d5..d4b8150 100644 (file)
@@ -29,20 +29,20 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.junit.Before
 
 class TestKafkaConfig {
-  
+
   var props : Properties = new Properties
   val SYSTEM_NAME = "kafka";
   val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."
   val TEST_CLIENT_ID = "TestClientId"
   val TEST_GROUP_ID = "TestGroupId"
-  
+
   @Before
   def setupProperties() {
     props = new Properties
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
     props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
   }
-  
+
   @Test
   def testIdGeneration = {
     val factory = new PropertiesConfigFactory()
@@ -93,7 +93,7 @@ class TestKafkaConfig {
     val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     // shared fetch size
     assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
-    
+
     props.setProperty("systems." + SYSTEM_NAME + ".streams.topic1.consumer.fetch.message.max.bytes", "65536")
     val mapConfig2 = new MapConfig(props.asScala.asJava)
     val kafkaConfig2 = new KafkaConfig(mapConfig2)
@@ -120,7 +120,7 @@ class TestKafkaConfig {
     props.setProperty("job.changelog.system", "kafka")
     props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
-    
+
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
@@ -131,14 +131,14 @@ class TestKafkaConfig {
     assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse(""))
     assertEquals("otherstream", storeToChangelog.get("test3").getOrElse(""))
   }
-  
+
   @Test
   def testDefaultValuesForProducerProperties() {
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
-    
+
     assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
     assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
     assertEquals(kafkaProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
@@ -148,45 +148,45 @@ class TestKafkaConfig {
   @Test
   def testMaxInFlightRequestsPerConnectionOverride() {
     val expectedValue = "200";
-    
+
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue);
-    
+
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
-    
+
     assertEquals(expectedValue, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
   }
-  
+
   @Test
   def testRetriesOverride() {
     val expectedValue = "200";
-    
+
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue);
-    
+
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
-    
+
     assertEquals(expectedValue, producerProperties.get(ProducerConfig.RETRIES_CONFIG))
   }
-  
+
   @Test(expected = classOf[NumberFormatException])
   def testMaxInFlightRequestsPerConnectionWrongNumberFormat() {
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza");
-    
+
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
   }
-  
+
   @Test(expected = classOf[NumberFormatException])
   def testRetriesWrongNumberFormat() {
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza");
-    
+
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
@@ -195,25 +195,136 @@ class TestKafkaConfig {
 
   @Test
   def testChangeLogReplicationFactor() {
+    props.setProperty("stores.store-with-override.changelog", "kafka-system.changelog-topic")
     props.setProperty("stores.store-with-override.changelog.replication.factor", "3")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
-    assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "3")
-    assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "2")
-    assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "2")
+    assertEquals("3", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
+    assertEquals("2", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
+    assertEquals("2", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
   }
 
   @Test
   def testChangeLogReplicationFactorWithOverriddenDefault() {
+    props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system")
+    props.setProperty("stores.store-with-override.changelog", "changelog-topic")
     props.setProperty("stores.store-with-override.changelog.replication.factor", "4")
     // Override the "default" default value
     props.setProperty("stores.default.changelog.replication.factor", "5")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
-    assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "4")
-    assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "5")
-    assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "5")
+    assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
+    assertEquals("5", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
+    assertEquals("5", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
+  }
+
+  @Test
+  def testChangeLogReplicationFactorWithSystemOverriddenDefault() {
+    props.setProperty(StorageConfig.CHANGELOG_SYSTEM, "kafka-system")
+    props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("stores.store-with-override.changelog.replication.factor", "4")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
+    assertEquals("8", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
+    assertEquals("8", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
+  }
+
+  @Test
+  def testCheckpointReplicationFactor() {
+    val emptyConfig = new KafkaConfig(new MapConfig())
+    assertEquals("3", emptyConfig.getCheckpointReplicationFactor.orNull)
+    assertNull(emptyConfig.getCheckpointSystem.orNull)
+
+    props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
+    props.setProperty("task.checkpoint.replication.factor", "4")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
+    assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
+  }
+
+  @Test
+  def testCheckpointReplicationFactorWithSystemDefault() {
+    props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
+    props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    assertEquals("other-kafka-system", kafkaConfig.getCheckpointSystem.orNull)
+    assertEquals("8", kafkaConfig.getCheckpointReplicationFactor.orNull)
+    assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
+  }
+
+  @Test
+  def testCheckpointReplicationFactorWithSystemOverriddenDefault() {
+    // Defaults
+    props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
+    props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
+
+    // Overrides
+    props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
+    props.setProperty(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, "4")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
+    assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
+    assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
+  }
+
+  @Test
+  def testCoordinatorReplicationFactor() {
+    val emptyConfig = new KafkaConfig(new MapConfig())
+    assertEquals("3", emptyConfig.getCoordinatorReplicationFactor)
+    assertNull(new JobConfig(new MapConfig()).getCoordinatorSystemNameOrNull)
+
+    props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
+    props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val jobConfig = new JobConfig(mapConfig)
+    assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
+    assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
+  }
+
+  @Test
+  def testCoordinatorReplicationFactorWithSystemDefault() {
+    props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
+    props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val jobConfig = new JobConfig(mapConfig)
+    assertEquals("other-kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
+    assertEquals("8", kafkaConfig.getCoordinatorReplicationFactor)
+    assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
+  }
+
+  @Test
+  def testCoordinatorReplicationFactorWithSystemOverriddenDefault() {
+    // Defaults
+    props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
+    props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
+
+    // Overrides
+    props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
+    props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val jobConfig = new JobConfig(mapConfig)
+    assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
+    assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
+    assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
   }
 }