SAMZA-1214: Bug - system-scoped stream default configs may not be honored
authorJacob Maes <jmaes@linkedin.com>
Mon, 1 May 2017 20:52:13 +0000 (13:52 -0700)
committerJacob Maes <jmaes@linkedin.com>
Mon, 1 May 2017 20:52:13 +0000 (13:52 -0700)
* Re-introduced deprecated system-stream configs into config table
* Fixed position of task.consumer.batch.size in config table
* Moved system-scoped defaults from StreamConfig to SystemConfig

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #150 from jmakes/samza-1214

docs/learn/documentation/versioned/jobs/configuration-table.html
samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java

index 0fc30c5..afa42f5 100644 (file)
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
+                    <td class="default">1</td>
+                    <td class="description">
+                        If set to a positive integer, the task will try to consume
+                        <a href="../container/streams.html#batching">batches</a> with the given number of messages
+                        from each input stream, rather than consuming round-robin from all the input streams on
+                        each individual message. Setting this property can improve performance in some cases.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="systems">Systems</th>
                 </tr>
 
                 </tr>
 
                 <tr>
-                    <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
-                    <td class="default">1</td>
+                    <td class="property" id="systems-samza-key-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
+                    <td class="default" rowspan="2"></td>
                     <td class="description">
-                        If set to a positive integer, the task will try to consume
-                        <a href="../container/streams.html#batching">batches</a> with the given number of messages
-                        from each input stream, rather than consuming round-robin from all the input streams on
-                        each individual message. Setting this property can improve performance in some cases.
+                        This is deprecated in favor of <a href="#systems-samza-key-serde" class="property">
+                        systems.<span class="system">system-name</span>.default.stream.samza.key.serde</a>.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-msg-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
+                    <td class="default" rowspan="2"></td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#systems-samza-msg-serde" class="property">
+                        systems.<span class="system">system-name</span>.default.stream.samza.msg.serde</a>.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-offset-default-legacy">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
+                    <td class="default" rowspan="2">upcoming</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#systems-samza-offset-default" class="property">
+                        systems.<span class="system">system-name</span>.default.stream.samza.offset.default</a>.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-streams-samza-reset-offset-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.reset.offset</td>
+                    <td>false</td>
+                    <td>
+                        This is deprecated in favor of <a href="#streams-samza-reset-offset" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.reset.offset</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-streams-samza-priority-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.priority</td>
+                    <td>-1</td>
+                    <td>
+                        This is deprecated in favor of <a href="#streams-samza-priority" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.priority</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-streams-samza-bootstrap-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.bootstrap</td>
+                    <td>false</td>
+                    <td>
+                        This is deprecated in favor of <a href="#streams-samza-bootstrap" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.bootstrap</a>.
                     </td>
                 </tr>
 
                 </tr>
 
                 <tr>
-                    <td class="property" id="streams-streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
+                    <td class="property" id="streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
                     <td class="default">false</td>
                     <td class="description">
                         If set to <code>true</code>, when a Samza container starts up, it ignores any
                 </tr>
 
                 <tr>
-                    <td class="property" id="streams-streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
+                    <td class="property" id="streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
                     <td class="default">-1</td>
                     <td class="description">
                         If one or more streams have a priority set (any positive integer), they will be processed
                 </tr>
 
                 <tr>
-                    <td class="property" id="streams-streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
+                    <td class="property" id="streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
                     <td class="default">false</td>
                     <td class="description">
                         If set to <code>true</code>, this stream will be processed as a
index a1f0ec0..c26601c 100644 (file)
@@ -98,6 +98,8 @@ public class JavaStorageConfig extends MapConfig {
    * stores.storeName.changelog=streamName
    *
    * If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used.
+   *
+   * @return the name of the system to use by default for all changelogs, if defined. 
    */
   public String getChangelogSystem() {
     return get(CHANGELOG_SYSTEM,  get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
index 6408438..350f20c 100644 (file)
@@ -103,6 +103,9 @@ public class JavaSystemConfig extends MapConfig {
 
   /**
    * Gets the system-wide defaults for streams.
+   *
+   * @param systemName the name of the system for which the defaults will be returned.
+   * @return a subset of the config with the system prefix removed.
    */
   public Config getDefaultStreamProperties(String systemName) {
     return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true);
index 43cc9a9..c490642 100644 (file)
@@ -81,14 +81,11 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
    * Returns a list of all SystemStreams that have a serde defined from the config file.
    */
   def getSerdeStreams(systemName: String) = {
-    val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(systemName)
-    val hasSystemDefaultSerde = defaultStreamProperties.containsKey(StreamConfig.MSG_SERDE) || defaultStreamProperties.containsKey(StreamConfig.KEY_SERDE)
-
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     val legacySystemStreams = subConf
       .asScala
       .keys
-      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
       .map(k => {
         val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
         new SystemStream(systemName, streamName)
@@ -97,7 +94,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
       .asScala
       .keys
-      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
       .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ))
       .filter(streamId => systemName.equals(getSystem(streamId)))
       .map(streamId => streamIdToSystemStream(streamId)).toSet
index 69fc383..804955c 100644 (file)
@@ -29,8 +29,6 @@ object SystemConfig {
   // system config constants
   val SYSTEM_PREFIX = "systems.%s."
   val SYSTEM_FACTORY = "systems.%s.samza.factory"
-  val KEY_SERDE = "systems.%s.samza.key.serde"
-  val MSG_SERDE = "systems.%s.samza.msg.serde"
   val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
 
   implicit def Config2System(config: Config) = new SystemConfig(config)
@@ -39,9 +37,9 @@ object SystemConfig {
 class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name)
 
-  def getSystemKeySerde(name: String) = getNonEmptyOption(SystemConfig.KEY_SERDE format name)
+  def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE)
 
-  def getSystemMsgSerde(name: String) = getNonEmptyOption(SystemConfig.MSG_SERDE format name)
+  def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE)
 
   def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
 
@@ -54,4 +52,14 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     // find all .samza.factory keys, and strip the suffix
     subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
   }
+
+  private def getSystemDefaultStreamProperty(name: String, property: String) = {
+    val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(name)
+    val streamDefault = defaultStreamProperties.get(property)
+    if (!(streamDefault == null || streamDefault.isEmpty)) {
+      Option(streamDefault)
+    } else {
+      getNonEmptyOption((SystemConfig.SYSTEM_PREFIX + property) format name)
+    }
+  }
 }
index 4580cc4..f6a617c 100644 (file)
@@ -216,7 +216,8 @@ public class TestStreamConfig {
 
     // Ensure that we can set legacy system properties via the new system wide default
     assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
-    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
+    assertEquals(0, config.getSerdeStreams(STREAM1_SYSTEM).size());
+    assertEquals("value1", new SystemConfig(config).getSystemKeySerde(STREAM1_SYSTEM).get());
     assertEquals("newest", config.getDefaultStreamOffset(SYSTEM_STREAM_1).get());
 
     // Property set via systems.x.default.stream.* only