samza-1156: Improve ApplicationRunner method naming and class structure
authorJacob Maes <jmaes@linkedin.com>
Wed, 22 Mar 2017 14:59:08 +0000 (07:59 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 22 Mar 2017 14:59:08 +0000 (07:59 -0700)
navina xinyuiscool nickpan47 take a look

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>

Closes #93 from jmakes/app-runner-class-hier

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java

index 6da1242..d148626 100644 (file)
@@ -27,16 +27,15 @@ import org.apache.samza.system.StreamSpec;
 
 
 /**
- * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
- *
- * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
- * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
+ * A physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
  */
 @InterfaceStability.Unstable
-public interface ApplicationRunner {
+public abstract class ApplicationRunner {
+
+  private static final String RUNNER_CONFIG = "app.runner.class";
+  private static final String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner";
 
-  String RUNNER_CONFIG = "app.runner.class";
-  String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner";
+  protected final Config config;
 
   /**
    * Static method to create the local {@link ApplicationRunner}.
@@ -44,19 +43,17 @@ public interface ApplicationRunner {
    * @param config  configuration passed in to initialize the Samza local process
    * @return  the local {@link ApplicationRunner} to run the user-defined stream applications
    */
-  static ApplicationRunner getLocalRunner(Config config) {
+  public static ApplicationRunner getLocalRunner(Config config) {
     return null;
   }
 
   /**
    * Static method to load the {@link ApplicationRunner}
    *
-   * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
-   *
    * @param config  configuration passed in to initialize the Samza processes
    * @return  the configure-driven {@link ApplicationRunner} to run the user-defined stream applications
    */
-  static ApplicationRunner fromConfig(Config config) {
+  public static ApplicationRunner fromConfig(Config config) {
     try {
       Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS));
       if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
@@ -68,16 +65,25 @@ public interface ApplicationRunner {
           RUNNER_CONFIG)), e);
     }
     throw new ConfigException(String.format(
-        "Class %s does not implement interface ApplicationRunner properly",
+        "Class %s does not extend ApplicationRunner properly",
         config.get(RUNNER_CONFIG)));
   }
 
+
+  public ApplicationRunner(Config config) {
+    if (config == null) {
+      throw new NullPointerException("Parameter 'config' cannot be null.");
+    }
+
+    this.config = config;
+  }
+
   /**
    * Method to be invoked to deploy and run the actual Samza jobs to execute {@link StreamApplication}
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
    */
-  void run(StreamApplication streamApp);
+  public abstract void run(StreamApplication streamApp);
 
   /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
@@ -98,5 +104,5 @@ public interface ApplicationRunner {
    * @param streamId  The logical identifier for the stream in Samza.
    * @return          The {@link StreamSpec} instance.
    */
-  StreamSpec streamFromConfig(String streamId);
+  public abstract StreamSpec getStream(String streamId);
 }
index 1b36f76..fe86699 100644 (file)
@@ -255,7 +255,7 @@ public class StreamGraphImpl implements StreamGraph {
         config.get(JobConfig.JOB_NAME()),
         config.get(JobConfig.JOB_ID(), "1"),
         opNameWithId);
-    StreamSpec streamSpec = runner.streamFromConfig(streamId);
+    StreamSpec streamSpec = runner.getStream(streamId);
 
     this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
     IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
index a5d784a..5f01ca7 100644 (file)
@@ -24,23 +24,20 @@ import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.StreamSpec;
 
 
-public abstract class AbstractApplicationRunner implements ApplicationRunner {
-
-  protected final Config config;
+/**
+ * Defines common, core behavior for implementations of the {@link ApplicationRunner} API
+ */
+public abstract class AbstractApplicationRunner extends ApplicationRunner {
 
   public AbstractApplicationRunner(Config config) {
-    if (config == null) {
-      throw new NullPointerException("Parameter 'config' cannot be null.");
-    }
-
-    this.config = config;
+    super(config);
   }
 
   @Override
-  public StreamSpec streamFromConfig(String streamId) {
+  public StreamSpec getStream(String streamId) {
     StreamConfig streamConfig = new StreamConfig(config);
     String physicalName = streamConfig.getPhysicalName(streamId);
-    return streamFromConfig(streamId, physicalName);
+    return getStream(streamId, physicalName);
   }
 
   /**
@@ -61,11 +58,11 @@ public abstract class AbstractApplicationRunner implements ApplicationRunner {
    * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
    * @return              The {@link StreamSpec} instance.
    */
-  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
+  /*package private*/ StreamSpec getStream(String streamId, String physicalName) {
     StreamConfig streamConfig = new StreamConfig(config);
     String system = streamConfig.getSystem(streamId);
 
-    return streamFromConfig(streamId, physicalName, system);
+    return getStream(streamId, physicalName, system);
   }
 
   /**
@@ -79,7 +76,7 @@ public abstract class AbstractApplicationRunner implements ApplicationRunner {
    * @param system        The name of the System on which this stream will be used.
    * @return              The {@link StreamSpec} instance.
    */
-  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
+  /*package private*/ StreamSpec getStream(String streamId, String physicalName, String system) {
     StreamConfig streamConfig = new StreamConfig(config);
     Map<String, String> properties = streamConfig.getStreamProperties(streamId);
 
index 155a47d..8d7db9f 100644 (file)
@@ -52,13 +52,13 @@ public class TestAbstractApplicationRunner {
 
   // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
   @Test
-  public void testStreamFromConfigWithPhysicalNameInConfig() {
+  public void testgetStreamWithPhysicalNameInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
   }
@@ -66,71 +66,71 @@ public class TestAbstractApplicationRunner {
   // The streamId should be used as the physicalName when the physical name is not specified.
   // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
   @Test
-  public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+  public void testgetStreamWithoutPhysicalNameInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     assertEquals(STREAM_ID, spec.getPhysicalName());
   }
 
   // If the system is specified at the stream scope, use it
   @Test
-  public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+  public void testgetStreamWithSystemAtStreamScopeInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
 
   // If system isn't specified at stream scope, use the default system
   @Test
-  public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+  public void testgetStreamWithSystemAtDefaultScopeInConfig() {
     Config config = addConfigs(buildStreamConfig(STREAM_ID,
                                                   StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
                                 JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
   }
 
   // Stream scope should override default scope
   @Test
-  public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+  public void testgetStreamWithSystemAtBothScopesInConfig() {
     Config config = addConfigs(buildStreamConfig(STREAM_ID,
                                                 StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                                 StreamConfig.SYSTEM(), TEST_SYSTEM),
                                 JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
 
   // System is required. Throw if it cannot be determined.
   @Test(expected = Exception.class)
-  public void testStreamFromConfigWithOutSystemInConfig() {
+  public void testgetStreamWithOutSystemInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
 
   // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
   @Test
-  public void testStreamFromConfigPropertiesPassthrough() {
+  public void testgetStreamPropertiesPassthrough() {
     Config config = buildStreamConfig(STREAM_ID,
                                     StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                     StreamConfig.SYSTEM(), TEST_SYSTEM,
@@ -138,8 +138,8 @@ public class TestAbstractApplicationRunner {
                                     "systemProperty2", "systemValue2",
                                     "systemProperty3", "systemValue3");
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -153,7 +153,7 @@ public class TestAbstractApplicationRunner {
 
   // The samza properties (which are invalid for the underlying system) should be filtered out.
   @Test
-  public void testStreamFromConfigSamzaPropertiesOmitted() {
+  public void testgetStreamSamzaPropertiesOmitted() {
     Config config = buildStreamConfig(STREAM_ID,
                               StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                     StreamConfig.SYSTEM(), TEST_SYSTEM,
@@ -161,8 +161,8 @@ public class TestAbstractApplicationRunner {
                                     "systemProperty2", "systemValue2",
                                     "systemProperty3", "systemValue3");
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID);
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -174,13 +174,13 @@ public class TestAbstractApplicationRunner {
 
   // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
   @Test
-  public void testStreamFromConfigPhysicalNameArgSimple() {
+  public void testgetStreamPhysicalNameArgSimple() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME);
 
     assertEquals(STREAM_ID, spec.getId());
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -189,37 +189,37 @@ public class TestAbstractApplicationRunner {
 
   // Special characters are allowed for the physical name
   @Test
-  public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+  public void testgetStreamPhysicalNameArgSpecialCharacters() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
     assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
   }
 
   // Null is allowed for the physical name
   @Test
-  public void testStreamFromConfigPhysicalNameArgNull() {
+  public void testgetStreamPhysicalNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID, null);
     assertNull(spec.getPhysicalName());
   }
 
   // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
   @Test
-  public void testStreamFromConfigSystemNameArgValid() {
+  public void testgetStreamSystemNameArgValid() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
                                       StreamConfig.SYSTEM(), TEST_SYSTEM2);              // This too
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
 
     assertEquals(STREAM_ID, spec.getId());
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -228,65 +228,65 @@ public class TestAbstractApplicationRunner {
 
   // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigSystemNameArgInvalid() {
+  public void testgetStreamSystemNameArgInvalid() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM2);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
   }
 
   // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigSystemNameArgEmpty() {
+  public void testgetStreamSystemNameArgEmpty() {
     Config config = buildStreamConfig(STREAM_ID,
         StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
         StreamConfig.SYSTEM(), TEST_SYSTEM2);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, "");
   }
 
   // Null is not allowed for system name.
   @Test(expected = NullPointerException.class)
-  public void testStreamFromConfigSystemNameArgNull() {
+  public void testgetStreamSystemNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM2);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, null);
   }
 
   // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigStreamIdInvalid() {
+  public void testgetStreamStreamIdInvalid() {
     Config config = buildStreamConfig(STREAM_ID_INVALID,
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    env.streamFromConfig(STREAM_ID_INVALID);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    runner.getStream(STREAM_ID_INVALID);
   }
 
   // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigStreamIdEmpty() {
+  public void testgetStreamStreamIdEmpty() {
     Config config = buildStreamConfig("",
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    env.streamFromConfig("");
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    runner.getStream("");
   }
 
   // Null is not allowed for streamId.
   @Test(expected = NullPointerException.class)
-  public void testStreamFromConfigStreamIdNull() {
+  public void testgetStreamStreamIdNull() {
     Config config = buildStreamConfig(null,
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    env.streamFromConfig(null);
+    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+    runner.getStream(null);
   }