SAMZA-1273: Make StreamConfig.getStreamIds() public
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 9 May 2017 16:27:28 +0000 (09:27 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 9 May 2017 16:27:28 +0000 (09:27 -0700)
Making StreamConfig.getStreamIds() public so config provider can scan through all the configured streams and expand some properties if needed.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jmakes@apache.org>

Closes #172 from xinyuiscool/SAMZA-1273

samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala

index c490642..389a883 100644 (file)
@@ -152,6 +152,15 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   }
 
   /**
+   * Gets the stream IDs of all the streams defined in the config
+   * @return collection of stream IDs
+   */
+  def getStreamIds(): Iterable[String] = {
+    // StreamIds are not allowed to have '.' so the first index of '.' marks the end of the streamId.
+    subset(StreamConfig.STREAMS_PREFIX).asScala.keys.map(key => key.substring(0, key.indexOf(".")))
+  }
+
+  /**
     * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza
     * interacts with the stream, as opposed to a property of the stream itself.
     *
@@ -246,11 +255,6 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, allProperties))
   }
 
-  private def getStreamIds(): Iterable[String] = {
-    // StreamIds are not allowed to have '.' so the first index of '.' marks the end of the streamId.
-    subset(StreamConfig.STREAMS_PREFIX).asScala.keys.map(key => key.substring(0, key.indexOf(".")))
-  }
-
   private def getStreamIdsForSystem(system: String): Iterable[String] = {
     getStreamIds().filter(streamId => system.equals(getSystem(streamId)))
   }