SAMZA-1752: Pass full config to the IO resolver master
authorSrinivasulu Punuru <spunuru@linkedin.com>
Fri, 10 Aug 2018 23:45:12 +0000 (16:45 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Fri, 10 Aug 2018 23:45:12 +0000 (16:45 -0700)
SQL IO Resolver needs full configs so that it can filter out the configs specific to the source that the SQL application is interested in. This change provides the IO resolver with the full config.

Author: Srinivasulu Punuru <spunuru@linkedin.com>

Reviewers: Aditya Toomula <atoomula@linkedin.com>

Closes #557 from srinipunuru/fullconfig.1

samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java

index c604e71..1ada813 100644 (file)
@@ -47,8 +47,8 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
   public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
 
   @Override
-  public SqlIOResolver create(Config config) {
-    return new ConfigBasedIOResolver(config);
+  public SqlIOResolver create(Config resolverConfig, Config fullConfig) {
+    return new ConfigBasedIOResolver(resolverConfig);
   }
 
   private class ConfigBasedIOResolver implements SqlIOResolver {
index 6efa57c..aa9fe48 100644 (file)
@@ -29,8 +29,9 @@ public interface SqlIOResolverFactory {
 
   /**
    * Create the {@link SqlIOResolver}. This is called during the application initialization.
-   * @param config config for the SqlIOResolver
+   * @param resolverConfig config specifically supplied for this SqlIOResolver
+   * @param fullConfig the full config object received by the application.
    * @return Returns the created {@link SqlIOResolver}
    */
-  SqlIOResolver create(Config config);
+  SqlIOResolver create(Config resolverConfig, Config fullConfig);
 }
index b7d9a59..316d174 100644 (file)
@@ -187,7 +187,7 @@ public class SamzaSqlApplicationConfig {
     String sourceResolveValue = config.get(CFG_IO_RESOLVER);
     Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or empty");
     return initializePlugin("SqlIOResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
-        (o, c) -> ((SqlIOResolverFactory) o).create(c));
+        (o, c) -> ((SqlIOResolverFactory) o).create(c, config));
   }
 
   private UdfResolver createUdfResolver(Map<String, String> config) {
index 7068e9b..574076e 100644 (file)
@@ -52,7 +52,7 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
   public static final String TEST_TABLE_ID = "testDbId";
 
   @Override
-  public SqlIOResolver create(Config config) {
+  public SqlIOResolver create(Config config, Config fullConfig) {
     return new TestIOResolver(config);
   }