SAMZA-1267: ApplicationRunner#getLocalRunner returns null
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Mon, 8 May 2017 16:39:43 +0000 (09:39 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Mon, 8 May 2017 16:39:43 +0000 (09:39 -0700)
Remove ApplicationRunner#getLocalRunner and clean up any usage examples.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jmakes@apache.org>

Closes #168 from xinyuiscool/SAMZA-1267

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
samza-core/src/test/java/org/apache/samza/example/WindowExample.java
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala

index bf2c643..eda09a2 100644 (file)
@@ -40,16 +40,6 @@ public abstract class ApplicationRunner {
   protected final Config config;
 
   /**
-   * Static method to create the local {@link ApplicationRunner}.
-   *
-   * @param config  configuration passed in to initialize the Samza local process
-   * @return  the local {@link ApplicationRunner} to run the user-defined stream applications
-   */
-  public static ApplicationRunner getLocalRunner(Config config) {
-    return null;
-  }
-
-  /**
    * Static method to load the {@link ApplicationRunner}
    *
    * @param config  configuration passed in to initialize the Samza processes
index a09247a..73a89af 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.util.CommandLine;
 
 
@@ -53,7 +53,7 @@ public class BroadcastExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
     localRunner.run(new BroadcastExample());
   }
 
index 6b913c4..5be3046 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
@@ -57,7 +57,7 @@ public class KeyValueStoreExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
     localRunner.run(new KeyValueStoreExample());
   }
 
index 80d0e16..f65c4ed 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -50,7 +50,7 @@ public class OrderShipmentJoinExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
     localRunner.run(new OrderShipmentJoinExample());
   }
 
index 547cac6..a3471a2 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -60,7 +60,7 @@ public class PageViewCounterExample implements StreamApplication {
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
     localRunner.run(new PageViewCounterExample());
   }
 
index 37375cd..7bf939b 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -55,7 +55,7 @@ public class RepartitionExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
     localRunner.run(new RepartitionExample());
   }
 
index 159dba2..1fd3be5 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -62,7 +62,7 @@ public class WindowExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
     localRunner.run(new WindowExample());
   }
 
index a3e70b8..980c2a2 100644 (file)
@@ -422,7 +422,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     @volatile var onContainerFailedThrowable: Throwable = null
 
     val mockRunLoop = mock[RunLoop]
-    when(mockRunLoop.run).then(new Answer[Unit] {
+    when(mockRunLoop.run).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
         Thread.sleep(100)
       }