SAMZA-1131: RemoteApplicationRunner for cluster-based Samza applications
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Fri, 17 Mar 2017 18:54:57 +0000 (11:54 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Fri, 17 Mar 2017 18:54:57 +0000 (11:54 -0700)
RemoteApplicationRunner starts the Samza StreamApplication on the remote cluster, e.g. Yarn. It uses ExecutionPlanner for planning physical execution, and JobRunner to start each stage of the application.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jacob Maes <jmakes@apache.org>

Closes #88 from xinyuiscool/SAMZA-1131

14 files changed:
samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java [moved from samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java with 99% similarity]
samza-shell/src/main/bash/run-app.sh [new file with mode: 0644]

index 62c8a02..553b8d4 100644 (file)
@@ -76,9 +76,8 @@ public interface ApplicationRunner {
    * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
    *
    * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
-   * @param config  the {@link Config} object for this job
    */
-  void run(StreamGraphBuilder graphBuilder, Config config);
+  void run(StreamGraphBuilder graphBuilder);
 
   /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
index 07cc3d3..d94a9eb 100644 (file)
@@ -138,7 +138,7 @@ public class ProcessorGraph {
    * Returns the processors to be executed in the topological order
    * @return unmodifiable list of {@link ProcessorNode}
    */
-  public List<ProcessorNode> getProcessors() {
+  public List<ProcessorNode> getProcessorNodes() {
     List<ProcessorNode> sortedNodes = topologicalSort();
     return Collections.unmodifiableList(sortedNodes);
   }
index a864868..a5d784a 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.system.StreamSpec;
 
 public abstract class AbstractApplicationRunner implements ApplicationRunner {
 
-  private final Config config;
+  protected final Config config;
 
   public AbstractApplicationRunner(Config config) {
     if (config == null) {
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
new file mode 100644 (file)
index 0000000..e3f320f
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.JobRunner;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.Util;
+
+
+/**
+ * This class contains the main() method used by run-app.sh.
+ * For a StreamApplication, it creates the {@link ApplicationRunner} based on the config, and then run the application.
+ * For a Samza job using low level task API, it will create the JobRunner to run it.
+ */
+public class ApplicationRunnerMain {
+  // TODO: have the app configs consolidated in one place
+  private static final String STREAM_APPLICATION_CLASS_CONFIG = "'app.class";
+
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    OptionSet options = cmdLine.parser().parse(args);
+    Config config = cmdLine.loadConfig(options);
+
+    if (config.containsKey(STREAM_APPLICATION_CLASS_CONFIG)) {
+      ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+      StreamGraphBuilder app = (StreamGraphBuilder) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
+      runner.run(app);
+    } else {
+      new JobRunner(Util.rewriteConfig(config)).run(true);
+    }
+  }
+}
index eb9a997..75cd3d6 100644 (file)
@@ -32,7 +32,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     super(config);
   }
 
-  @Override public void run(StreamGraphBuilder app, Config config) {
+  @Override
+  public void run(StreamGraphBuilder app) {
     // 1. get logic graph for optimization
     // StreamGraph logicGraph = this.createGraph(app, config);
     // 2. potential optimization....
index 6e33fe8..fa13df8 100644 (file)
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.runtime;
 
+import org.apache.samza.SamzaException;
+import org.apache.samza.execution.ExecutionPlanner;
+import org.apache.samza.execution.ProcessorGraph;
+import org.apache.samza.job.JobRunner;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -27,16 +36,36 @@ import org.apache.samza.config.Config;
  */
 public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
+  private static final Logger log = LoggerFactory.getLogger(RemoteApplicationRunner.class);
+
   public RemoteApplicationRunner(Config config) {
     super(config);
   }
 
-  @Override public void run(StreamGraphBuilder app, Config config) {
-    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
-    // TODO: actually instantiate the tasks and run the job, i.e.
-    // 1. create all input/output/intermediate topics
-    // 2. create the single job configuration
-    // 3. execute JobRunner to submit the single job for the whole graph
-  }
+  /**
+   * Run the {@link StreamGraphBuilder} on the remote cluster
+   * @param app a StreamApplication
+   */
+  @Override
+  public void run(StreamGraphBuilder app) {
+    try {
+      // 1. build stream graph
+      StreamGraph streamGraph = new StreamGraphImpl(this, config);
+      app.init(streamGraph, config);
 
+      // 2. create the physical execution plan
+      ExecutionPlanner planner = new ExecutionPlanner(config);
+      ProcessorGraph processorGraph = planner.plan(streamGraph);
+
+      // 3. submit jobs for remote execution
+      processorGraph.getProcessorNodes().forEach(processor -> {
+          Config processorConfig = processor.generateConfig();
+          log.info("Starting processor {} with config {}", processor.getId(), config);
+          JobRunner runner = new JobRunner(processorConfig);
+          runner.run(true);
+        });
+    } catch (Throwable t) {
+      throw new SamzaException("Fail to run application", t);
+    }
+  }
 }
index e54d01a..7f63f8d 100644 (file)
@@ -71,7 +71,7 @@ public class KeyValueStoreExample implements StreamGraphBuilder {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new KeyValueStoreExample(), config);
+    localRunner.run(new KeyValueStoreExample());
   }
 
   class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
index 371294b..d6a50bd 100644 (file)
@@ -122,7 +122,7 @@ public class NoContextStreamExample implements StreamGraphBuilder {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new NoContextStreamExample(), config);
+    localRunner.run(new NoContextStreamExample());
   }
 
 }
index 861120d..43553f5 100644 (file)
@@ -65,7 +65,7 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new OrderShipmentJoinExample(), config);
+    localRunner.run(new OrderShipmentJoinExample());
   }
 
   StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
index 886bf1c..3e8f856 100644 (file)
@@ -58,7 +58,7 @@ public class PageViewCounterExample implements StreamGraphBuilder {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new PageViewCounterExample(), config);
+    localRunner.run(new PageViewCounterExample());
   }
 
   StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
index b8abacf..48e7626 100644 (file)
@@ -82,7 +82,7 @@ public class RepartitionExample implements StreamGraphBuilder {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new RepartitionExample(), config);
+    localRunner.run(new RepartitionExample());
   }
 
   StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
index 46250d7..5b0178d 100644 (file)
@@ -201,7 +201,7 @@ public class TestExecutionPlanner {
 
     runner = new AbstractApplicationRunner(config) {
       @Override
-      public void run(StreamGraphBuilder graphBuilder, Config config) {
+      public void run(StreamGraphBuilder graphBuilder) {
       }
     };
   }
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-public class TestAbstractExecutionEnvironment {
+public class TestAbstractApplicationRunner {
   private static final String STREAM_ID = "t3st-Stream_Id";
   private static final String STREAM_ID_INVALID = "test#Str3amId!";
 
@@ -326,7 +326,7 @@ public class TestAbstractExecutionEnvironment {
     }
 
     @Override
-    public void run(StreamGraphBuilder graphBuilder, Config config) {
+    public void run(StreamGraphBuilder graphBuilder) {
       // do nothing
     }
   }
diff --git a/samza-shell/src/main/bash/run-app.sh b/samza-shell/src/main/bash/run-app.sh
new file mode 100644 (file)
index 0000000..3e43463
--- /dev/null
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.runtime.ApplicationRunnerMain "$@"