SAMZA-1089: Runner should support kill and status commands
authorJacob Maes <jmaes@linkedin.com>
Mon, 3 Apr 2017 16:50:27 +0000 (09:50 -0700)
committerJacob Maes <jmaes@linkedin.com>
Mon, 3 Apr 2017 16:50:27 +0000 (09:50 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>,Xinyu Liu <xiliu@linkedin.com>

Closes #106 from jmakes/samza-1089-2

18 files changed:
samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
samza-core/src/main/java/org/apache/samza/execution/StreamManager.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerOperation.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala

index d148626..d4f5b00 100644 (file)
@@ -20,9 +20,10 @@ package org.apache.samza.runtime;
 
 import java.lang.reflect.Constructor;
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.system.StreamSpec;
 
 
@@ -79,13 +80,28 @@ public abstract class ApplicationRunner {
   }
 
   /**
-   * Method to be invoked to deploy and run the actual Samza jobs to execute {@link StreamApplication}
+   * Deploy and run the Samza jobs to execute {@link StreamApplication}
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
    */
   public abstract void run(StreamApplication streamApp);
 
   /**
+   * Kill the Samza jobs represented by {@link StreamApplication}
+   *
+   * @param streamApp  the user-defined {@link StreamApplication} object
+   */
+  public abstract void kill(StreamApplication streamApp);
+
+  /**
+   * Get the collective status of the Samza jobs represented by {@link StreamApplication}.
+   * Returns {@link ApplicationStatus#Running} if any of the jobs are running.
+   *
+   * @param streamApp  the user-defined {@link StreamApplication} object
+   */
+  public abstract ApplicationStatus status(StreamApplication streamApp);
+
+  /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
    *
    * The stream configurations are read from the following properties in the config:
@@ -104,5 +120,5 @@ public abstract class ApplicationRunner {
    * @param streamId  The logical identifier for the stream in Samza.
    * @return          The {@link StreamSpec} instance.
    */
-  public abstract StreamSpec getStream(String streamId);
+  public abstract StreamSpec getStreamSpec(String streamId);
 }
index cf8d640..47d90a4 100644 (file)
@@ -22,6 +22,12 @@ package org.apache.samza.config;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.Util;
+
 
 /**
  * a java version of the system config
@@ -60,4 +66,37 @@ public class JavaSystemConfig extends MapConfig {
     }
     return systemNames;
   }
+
+  /**
+   * Get {@link SystemAdmin} instances for all the systems defined in this config.
+   *
+   * @return map of system name to {@link SystemAdmin}
+   */
+  public Map<String, SystemAdmin> getSystemAdmins() {
+    return getSystemFactories().entrySet()
+        .stream()
+        .collect(Collectors.toMap(systemNameToFactoryEntry -> systemNameToFactoryEntry.getKey(),
+            systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
+                .getAdmin(systemNameToFactoryEntry.getKey(), this)));
+  }
+
+  /**
+   * Get {@link SystemFactory} instances for all the systems defined in this config.
+   *
+   * @return a map from system name to {@link SystemFactory}
+   */
+  public Map<String, SystemFactory> getSystemFactories() {
+    Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap(
+      systemName -> systemName,
+      systemName -> {
+        String systemFactoryClassName = getSystemFactory(systemName);
+        if (systemFactoryClassName == null) {
+          throw new SamzaException(
+              String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+        }
+        return Util.getObj(systemFactoryClassName);
+      }));
+
+    return systemFactories;
+  }
 }
index ca2e71e..f9e44cf 100644 (file)
@@ -28,10 +28,8 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
@@ -39,11 +37,7 @@ import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,23 +50,20 @@ public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
   private final Config config;
+  private final StreamManager streamManager;
 
-  public ExecutionPlanner(Config config) {
+  public ExecutionPlanner(Config config, StreamManager streamManager) {
     this.config = config;
+    this.streamManager = streamManager;
   }
 
   public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
-    Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
-
     // create physical processors based on stream graph
     ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
 
     if (!processorGraph.getIntermediateStreams().isEmpty()) {
       // figure out the partitions for internal streams
-      calculatePartitions(streamGraph, processorGraph, sysAdmins);
-
-      // create the streams
-      createStreams(processorGraph, sysAdmins);
+      calculatePartitions(streamGraph, processorGraph);
     }
 
     return processorGraph;
@@ -110,9 +101,9 @@ public class ExecutionPlanner {
   /**
    * Figure out the number of partitions of all streams
    */
-  /* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+  /* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
     // fetch the external streams partition info
-    updateExistingPartitions(processorGraph, sysAdmins);
+    updateExistingPartitions(processorGraph, streamManager);
 
     // calculate the partitions for the input streams of join operators
     calculateJoinInputPartitions(streamGraph, processorGraph);
@@ -127,9 +118,9 @@ public class ExecutionPlanner {
   /**
    * Fetch the partitions of source/sink streams and update the StreamEdges.
    * @param processorGraph ProcessorGraph
-   * @param sysAdmins mapping from system name to the {@link SystemAdmin}
+   * @param streamManager the {@StreamManager} to interface with the streams.
    */
-  /* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+  /* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, StreamManager streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
     existingStreams.addAll(processorGraph.getSources());
     existingStreams.addAll(processorGraph.getSinks());
@@ -146,14 +137,12 @@ public class ExecutionPlanner {
       Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
       // create the stream name to StreamEdge mapping for this system
       streamEdges.forEach(streamEdge -> streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
-      SystemAdmin systemAdmin = sysAdmins.get(systemName);
-      // retrieve the metadata for the streams in this system
-      Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamToStreamEdge.keySet());
+      // retrieve the partition counts for the streams in this system
+      Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(systemName, streamToStreamEdge.keySet());
       // set the partitions of a stream to its StreamEdge
-      streamToMetadata.forEach((stream, data) -> {
-          int partitions = data.getSystemStreamPartitionMetadata().size();
-          streamToStreamEdge.get(stream).setPartitionCount(partitions);
-          log.debug("Partition count is {} for stream {}", partitions, stream);
+      streamToPartitionCount.forEach((stream, partitionCount) -> {
+          streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
+          log.debug("Partition count is {} for stream {}", partitionCount, stream);
         });
     }
   }
@@ -283,55 +272,8 @@ public class ExecutionPlanner {
     }
   }
 
-  private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) {
-    Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create();
-    graph.getIntermediateStreams().forEach(edge -> {
-        StreamSpec streamSpec = createStreamSpec(edge);
-        streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec);
-      });
-
-    for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) {
-      String systemName = entry.getKey();
-      SystemAdmin systemAdmin = sysAdmins.get(systemName);
-
-      for (StreamSpec stream : entry.getValue()) {
-        log.info("Creating stream {} with partitions {} on system {}",
-            new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
-        systemAdmin.createStream(stream);
-      }
-    }
-  }
-
   /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
     return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
 
-  private static StreamSpec createStreamSpec(StreamEdge edge) {
-    StreamSpec orgSpec = edge.getStreamSpec();
-    return orgSpec.copyWithPartitionCount(edge.getPartitionCount());
-  }
-
-  private static Map<String, SystemAdmin> getSystemAdmins(Config config) {
-    return getSystemFactories(config).entrySet()
-        .stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().getAdmin(entry.getKey(), config)));
-  }
-
-  private static Map<String, SystemFactory> getSystemFactories(Config config) {
-    Map<String, SystemFactory> systemFactories =
-        getSystemNames(config).stream().collect(Collectors.toMap(systemName -> systemName, systemName -> {
-            String systemFactoryClassName = new JavaSystemConfig(config).getSystemFactory(systemName);
-            if (systemFactoryClassName == null) {
-              throw new SamzaException(
-                  String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-            }
-            return Util.getObj(systemFactoryClassName);
-          }));
-
-    return systemFactories;
-  }
-
-  private static Collection<String> getSystemNames(Config config) {
-    return new JavaSystemConfig(config).getSystemNames();
-  }
 }
index 5dc4178..3215a22 100644 (file)
@@ -54,8 +54,12 @@ public class StreamEdge {
     targetNodes.add(targetNode);
   }
 
-  StreamSpec getStreamSpec() {
-    return streamSpec;
+  public StreamSpec getStreamSpec() {
+    if (partitions == PARTITIONS_UNKNOWN) {
+      return streamSpec;
+    } else {
+      return streamSpec.copyWithPartitionCount(partitions);
+    }
   }
 
   SystemStream getSystemStream() {
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
new file mode 100644 (file)
index 0000000..3c13382
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StreamManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
+
+  private final Map<String, SystemAdmin> sysAdmins;
+
+  public StreamManager(Map<String, SystemAdmin> sysAdmins) {
+    this.sysAdmins = sysAdmins;
+  }
+
+  public void createStreams(List<StreamSpec> streams) {
+    Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
+    streams.forEach(streamSpec ->
+      streamsGroupedBySystem.put(streamSpec.getSystemName(), streamSpec));
+
+    for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) {
+      String systemName = entry.getKey();
+      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+
+      for (StreamSpec stream : entry.getValue()) {
+        LOGGER.info("Creating stream {} with partitions {} on system {}",
+            new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
+        systemAdmin.createStream(stream);
+      }
+    }
+  }
+
+  Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
+    Map<String, Integer> streamToPartitionCount = new HashMap<>();
+
+    SystemAdmin systemAdmin = sysAdmins.get(systemName);
+    // retrieve the metadata for the streams in this system
+    Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamNames);
+    // set the partitions of a stream to its StreamEdge
+    streamToMetadata.forEach((stream, data) ->
+      streamToPartitionCount.put(stream, data.getSystemStreamPartitionMetadata().size()));
+
+    return streamToPartitionCount;
+  }
+}
index fe86699..6f7377b 100644 (file)
@@ -255,7 +255,7 @@ public class StreamGraphImpl implements StreamGraph {
         config.get(JobConfig.JOB_NAME()),
         config.get(JobConfig.JOB_ID(), "1"),
         opNameWithId);
-    StreamSpec streamSpec = runner.getStream(streamId);
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
 
     this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
     IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
index 5f01ca7..c2e7861 100644 (file)
@@ -34,10 +34,10 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   }
 
   @Override
-  public StreamSpec getStream(String streamId) {
+  public StreamSpec getStreamSpec(String streamId) {
     StreamConfig streamConfig = new StreamConfig(config);
     String physicalName = streamConfig.getPhysicalName(streamId);
-    return getStream(streamId, physicalName);
+    return getStreamSpec(streamId, physicalName);
   }
 
   /**
@@ -58,11 +58,11 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
    * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
    * @return              The {@link StreamSpec} instance.
    */
-  /*package private*/ StreamSpec getStream(String streamId, String physicalName) {
+  /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName) {
     StreamConfig streamConfig = new StreamConfig(config);
     String system = streamConfig.getSystem(streamId);
 
-    return getStream(streamId, physicalName, system);
+    return getStreamSpec(streamId, physicalName, system);
   }
 
   /**
@@ -76,7 +76,7 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
    * @param system        The name of the System on which this stream will be used.
    * @return              The {@link StreamSpec} instance.
    */
-  /*package private*/ StreamSpec getStream(String streamId, String physicalName, String system) {
+  /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName, String system) {
     StreamConfig streamConfig = new StreamConfig(config);
     Map<String, String> properties = streamConfig.getStreamProperties(streamId);
 
index 9f0b995..84427ea 100644 (file)
 package org.apache.samza.runtime;
 
 import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.job.JobRunner;
+import org.apache.samza.job.JobRunner$;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.Util;
 
@@ -34,20 +35,49 @@ import org.apache.samza.util.Util;
  */
 public class ApplicationRunnerMain {
   // TODO: have the app configs consolidated in one place
-  private static final String STREAM_APPLICATION_CLASS_CONFIG = "app.class";
+  public static final String STREAM_APPLICATION_CLASS_CONFIG = "app.class";
+
+  public static class ApplicationRunnerCommandLine extends CommandLine {
+    public OptionSpec operationOpt =
+        parser().accepts("operation", "The operation to perform; run, status, kill.")
+            .withRequiredArg()
+            .ofType(String.class)
+            .describedAs("operation=run")
+            .defaultsTo("run");
+
+    public ApplicationRunnerOperation getOperation(OptionSet options) {
+      String rawOp = options.valueOf(operationOpt).toString();
+      return ApplicationRunnerOperation.fromString(rawOp);
+    }
+  }
 
   public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
+    ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
     OptionSet options = cmdLine.parser().parse(args);
     Config orgConfig = cmdLine.loadConfig(options);
     Config config = Util.rewriteConfig(orgConfig);
+    ApplicationRunnerOperation op = cmdLine.getOperation(options);
 
     if (config.containsKey(STREAM_APPLICATION_CLASS_CONFIG)) {
       ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-      StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
-      runner.run(app);
+      StreamApplication app =
+          (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
+      switch (op) {
+        case RUN:
+          runner.run(app);
+          break;
+        case KILL:
+          runner.kill(app);
+          break;
+        case STATUS:
+          System.out.println(runner.status(app));
+          break;
+        default:
+          throw new IllegalArgumentException("Unrecognized operation: " + op);
+      }
     } else {
-      new JobRunner(config).run(true);
+      JobRunner$.MODULE$.main(args);
     }
   }
 }
+
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerOperation.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerOperation.java
new file mode 100644 (file)
index 0000000..1fd60fc
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+/**
+ * Operation to perform in the {@link ApplicationRunnerMain}
+ */
+public enum ApplicationRunnerOperation {
+  RUN("run"), KILL("kill"), STATUS("status");
+
+  private final String str;
+
+  public static ApplicationRunnerOperation fromString(String string) {
+    return ApplicationRunnerOperation.valueOf(string.toUpperCase());
+  }
+
+  ApplicationRunnerOperation(String str) {
+    this.str = str;
+  }
+
+  @Override
+  public String toString() {
+    return str;
+  }
+}
index d05d9d0..f69e95e 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.runtime;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.job.ApplicationStatus;
 
 
 /**
@@ -41,4 +42,13 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     // 5. create the configuration for StreamProcessor
     // 6. start the StreamProcessor w/ optimized instance of StreamApplication
   }
+
+  @Override
+  public void kill(StreamApplication streamApp) {
+  }
+
+  @Override
+  public ApplicationStatus status(StreamApplication streamApp) {
+    return null;
+  }
 }
index 4bfad65..49c3228 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.JmxServer;
@@ -85,6 +86,17 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
     }
   }
 
+  @Override
+  public void kill(StreamApplication streamApp) {
+    // Ultimately this class probably won't end up extending ApplicationRunner, so this will be deleted
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ApplicationStatus status(StreamApplication streamApp) {
+    // Ultimately this class probably won't end up extending ApplicationRunner, so this will be deleted
+    throw new UnsupportedOperationException();
+  }
 
   public static void main(String[] args) throws Exception {
     setExceptionHandler(() -> {
index cb61a04..9fe493e 100644 (file)
 
 package org.apache.samza.runtime;
 
-import org.apache.samza.application.StreamApplication;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.ProcessorGraph;
+import org.apache.samza.execution.ProcessorNode;
+import org.apache.samza.execution.StreamManager;
+import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,33 +45,101 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
   private static final Logger log = LoggerFactory.getLogger(RemoteApplicationRunner.class);
 
+  private final StreamManager streamManager;
+
   public RemoteApplicationRunner(Config config) {
     super(config);
+    this.streamManager = new StreamManager(new JavaSystemConfig(config).getSystemAdmins());
   }
 
   /**
    * Run the {@link StreamApplication} on the remote cluster
    * @param app a StreamApplication
    */
-  @Override public void run(StreamApplication app) {
+  @Override
+  public void run(StreamApplication app) {
     try {
-      // 1. build stream graph
-      StreamGraph streamGraph = new StreamGraphImpl(this, config);
-      app.init(streamGraph, config);
+      // 1. initialize and plan
+      ProcessorGraph processorGraph = getExecutionPlan(app);
 
-      // 2. create the physical execution plan
-      ExecutionPlanner planner = new ExecutionPlanner(config);
-      ProcessorGraph processorGraph = planner.plan(streamGraph);
+      // 2. create the necessary streams
+      List<StreamSpec> streams = processorGraph.getIntermediateStreams().stream()
+          .map(streamEdge -> streamEdge.getStreamSpec())
+          .collect(Collectors.toList());
+      streamManager.createStreams(streams);
 
       // 3. submit jobs for remote execution
       processorGraph.getProcessorNodes().forEach(processor -> {
           Config processorConfig = processor.generateConfig();
-          log.info("Starting processor {} with config {}", processor.getId(), config);
+          log.info("Starting processor {} with config {}", processor.getId(), processorConfig);
           JobRunner runner = new JobRunner(processorConfig);
           runner.run(true);
         });
     } catch (Throwable t) {
-      throw new SamzaException("Fail to run application", t);
+      throw new SamzaException("Failed to run application", t);
+    }
+  }
+
+  @Override
+  public void kill(StreamApplication app) {
+    try {
+      ProcessorGraph processorGraph = getExecutionPlan(app);
+
+      processorGraph.getProcessorNodes().forEach(processor -> {
+          Config processorConfig = processor.generateConfig();
+          log.info("Killing processor {}", processor.getId());
+          JobRunner runner = new JobRunner(processorConfig);
+          runner.kill();
+        });
+    } catch (Throwable t) {
+      throw new SamzaException("Failed to kill application", t);
     }
   }
+
+  @Override
+  public ApplicationStatus status(StreamApplication app) {
+    try {
+      boolean finished = false;
+      boolean unsuccessfulFinish = false;
+
+      ProcessorGraph processorGraph = getExecutionPlan(app);
+      for (ProcessorNode processor : processorGraph.getProcessorNodes()) {
+        Config processorConfig = processor.generateConfig();
+        JobRunner runner = new JobRunner(processorConfig);
+        ApplicationStatus status = runner.status();
+        log.debug("Status is {} for processor {}", new Object[]{status, processor.getId()});
+
+        switch (status) {
+          case Running:
+            return ApplicationStatus.Running;
+          case UnsuccessfulFinish:
+            unsuccessfulFinish = true;
+          case SuccessfulFinish:
+            finished = true;
+            break;
+          default:
+            // Do nothing
+        }
+      }
+
+      if (unsuccessfulFinish) {
+        return ApplicationStatus.UnsuccessfulFinish;
+      } else if (finished) {
+        return ApplicationStatus.SuccessfulFinish;
+      }
+      return ApplicationStatus.New;
+    } catch (Throwable t) {
+      throw new SamzaException("Failed to get status for application", t);
+    }
+  }
+
+  private ProcessorGraph getExecutionPlan(StreamApplication app) throws Exception {
+    // build stream graph
+    StreamGraph streamGraph = new StreamGraphImpl(this, config);
+    app.init(streamGraph, config);
+
+    // create the physical execution plan
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    return planner.plan(streamGraph);
+  }
 }
index aeeb2aa..69fc383 100644 (file)
@@ -22,6 +22,9 @@ package org.apache.samza.config
 import scala.collection.JavaConverters._
 import org.apache.samza.util.Logging
 
+/**
+  * Note: All new methods are being added to [[org.apache.samza.config.JavaSystemConfig]]
+  */
 object SystemConfig {
   // system config constants
   val SYSTEM_PREFIX = "systems.%s."
index 68ad02f..b2f5bd0 100644 (file)
 
 package org.apache.samza.job
 
+
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
-import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.util.ClassLoaderHelper
-import org.apache.samza.util.CommandLine
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
-import scala.collection.JavaConverters._
+import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish}
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
+import org.apache.samza.runtime.ApplicationRunnerOperation
+import org.apache.samza.util.{Logging, Util}
+
+import scala.collection.JavaConverters._
 
 
 object JobRunner extends Logging {
   val SOURCE = "job-runner"
 
   def main(args: Array[String]) {
-    val cmdline = new CommandLine
+    val cmdline = new ApplicationRunnerCommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    new JobRunner(Util.rewriteConfig(config)).run()
+    val operation = cmdline.getOperation(options)
+
+    val runner = new JobRunner(Util.rewriteConfig(config))
+    doOperation(runner, operation)
+  }
+
+  def doOperation(runner: JobRunner, operation: ApplicationRunnerOperation): Unit = {
+    operation match {
+      case ApplicationRunnerOperation.RUN => runner.run()
+      case ApplicationRunnerOperation.KILL => runner.kill()
+      case ApplicationRunnerOperation.STATUS => println(runner.status())
+      case _ =>
+        throw new SamzaException("Invalid job runner operation: %s" format operation)
+    }
   }
 }
 
@@ -62,12 +76,7 @@ class JobRunner(config: Config) extends Logging {
    */
   def run(resetJobConfig: Boolean = true) = {
     debug("config: %s" format (config))
-    val jobFactoryClass = config.getStreamJobFactoryClass match {
-      case Some(factoryClass) => factoryClass
-      case _ => throw new SamzaException("no job factory class defined")
-    }
-    val jobFactory = ClassLoaderHelper.fromClassName[StreamJobFactory](jobFactoryClass)
-    info("job factory: %s" format (jobFactoryClass))
+    val jobFactory: StreamJobFactory = getJobFactory
     val factory = new CoordinatorStreamSystemFactory
     val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
     val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
@@ -118,4 +127,44 @@ class JobRunner(config: Config) extends Logging {
     info("exiting")
     job
   }
+
+  def kill(): Unit = {
+    val jobFactory: StreamJobFactory = getJobFactory
+
+    // Create the actual job, and kill it.
+    val job = jobFactory.getJob(config).kill()
+
+    info("waiting for job to terminate")
+
+    // Wait until the job has terminated, then exit.
+    Option(job.waitForStatus(SuccessfulFinish, 5000)) match {
+      case Some(appStatus) => {
+        if (SuccessfulFinish.equals(appStatus)) {
+          info("job terminated successfully - " + appStatus)
+        } else {
+          warn("unable to terminate job successfully. job has status %s" format (appStatus))
+        }
+      }
+      case _ => warn("unable to terminate job successfully.")
+    }
+
+    info("exiting")
+  }
+
+  def status(): ApplicationStatus = {
+    val jobFactory: StreamJobFactory = getJobFactory
+
+    // Create the actual job, and get its status.
+    jobFactory.getJob(config).getStatus
+  }
+
+  private def getJobFactory: StreamJobFactory = {
+    val jobFactoryClass = config.getStreamJobFactoryClass match {
+      case Some(factoryClass) => factoryClass
+      case _ => throw new SamzaException("no job factory class defined")
+    }
+    val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
+    info("job factory: %s" format (jobFactoryClass))
+    jobFactory
+  }
 }
index b69eec6..dc828d9 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -47,8 +48,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 
 public class TestExecutionPlanner {
@@ -65,6 +65,7 @@ public class TestExecutionPlanner {
   private StreamSpec output2;
 
   private Map<String, SystemAdmin> systemAdmins;
+  private StreamManager streamManager;
 
   private ApplicationRunner runner;
 
@@ -202,17 +203,28 @@ public class TestExecutionPlanner {
     systemAdmins = new HashMap<>();
     systemAdmins.put("system1", systemAdmin1);
     systemAdmins.put("system2", systemAdmin2);
+    streamManager = new StreamManager(systemAdmins);
 
     runner = new AbstractApplicationRunner(config) {
       @Override
       public void run(StreamApplication streamApp) {
       }
+
+      @Override
+      public void kill(StreamApplication streamApp) {
+
+      }
+
+      @Override
+      public ApplicationStatus status(StreamApplication streamApp) {
+        return null;
+      }
     };
   }
 
   @Test
   public void testCreateProcessorGraph() {
-    ExecutionPlanner planner = new ExecutionPlanner(config);
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
 
     ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
@@ -223,11 +235,11 @@ public class TestExecutionPlanner {
 
   @Test
   public void testFetchExistingStreamPartitions() {
-    ExecutionPlanner planner = new ExecutionPlanner(config);
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
     ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
 
-    ExecutionPlanner.updateExistingPartitions(processorGraph, systemAdmins);
+    ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
     assertTrue(processorGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
     assertTrue(processorGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
     assertTrue(processorGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
@@ -241,11 +253,11 @@ public class TestExecutionPlanner {
 
   @Test
   public void testCalculateJoinInputPartitions() {
-    ExecutionPlanner planner = new ExecutionPlanner(config);
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
     ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
 
-    ExecutionPlanner.updateExistingPartitions(processorGraph, systemAdmins);
+    ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
     ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
 
     // the partitions should be the same as input1
@@ -260,10 +272,10 @@ public class TestExecutionPlanner {
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
 
-    ExecutionPlanner planner = new ExecutionPlanner(cfg);
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamGraph streamGraph = createSimpleGraph();
     ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+    planner.calculatePartitions(streamGraph, processorGraph);
 
     // the partitions should be the same as input1
     processorGraph.getIntermediateStreams().forEach(edge -> {
@@ -273,10 +285,10 @@ public class TestExecutionPlanner {
 
   @Test
   public void testCalculateIntStreamPartitions() {
-    ExecutionPlanner planner = new ExecutionPlanner(config);
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createSimpleGraph();
     ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+    planner.calculatePartitions(streamGraph, processorGraph);
 
     // the partitions should be the same as input1
     processorGraph.getIntermediateStreams().forEach(edge -> {
index eeb783c..8e0d5fc 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
@@ -59,7 +60,7 @@ public class TestAbstractApplicationRunner {
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
   }
@@ -72,7 +73,7 @@ public class TestAbstractApplicationRunner {
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     assertEquals(STREAM_ID, spec.getPhysicalName());
   }
@@ -85,7 +86,7 @@ public class TestAbstractApplicationRunner {
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
@@ -98,7 +99,7 @@ public class TestAbstractApplicationRunner {
                                 JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
   }
@@ -112,7 +113,7 @@ public class TestAbstractApplicationRunner {
                                 JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
@@ -124,7 +125,7 @@ public class TestAbstractApplicationRunner {
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
@@ -140,7 +141,7 @@ public class TestAbstractApplicationRunner {
                                     "systemProperty3", "systemValue3");
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -154,7 +155,7 @@ public class TestAbstractApplicationRunner {
 
   // The samza properties (which are invalid for the underlying system) should be filtered out.
   @Test
-  public void testgetStreamSamzaPropertiesOmitted() {
+  public void testGetStreamSamzaPropertiesOmitted() {
     Config config = buildStreamConfig(STREAM_ID,
                               StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                     StreamConfig.SYSTEM(), TEST_SYSTEM,
@@ -163,7 +164,7 @@ public class TestAbstractApplicationRunner {
                                     "systemProperty3", "systemValue3");
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -173,15 +174,36 @@ public class TestAbstractApplicationRunner {
     assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
   }
 
+  @Test
+  public void testStreamConfigOverrides() {
+    final String sysStreamPrefix = String.format("systems.%s.streams.%s.", TEST_SYSTEM, TEST_PHYSICAL_NAME);
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2",
+        "systemProperty3", "systemValue3"),
+        sysStreamPrefix + "systemProperty4", "systemValue4",
+        sysStreamPrefix + "systemProperty2", "systemValue8");
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.getStreamSpec(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(4, properties.size());
+    assertEquals("systemValue4", properties.get("systemProperty4"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+  }
+
   // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
   @Test
-  public void testgetStreamPhysicalNameArgSimple() {
+  public void testGetStreamPhysicalNameArgSimple() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME);
 
     assertEquals(STREAM_ID, spec.getId());
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -190,37 +212,37 @@ public class TestAbstractApplicationRunner {
 
   // Special characters are allowed for the physical name
   @Test
-  public void testgetStreamPhysicalNameArgSpecialCharacters() {
+  public void testGetStreamPhysicalNameArgSpecialCharacters() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
     assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
   }
 
   // Null is allowed for the physical name
   @Test
-  public void testgetStreamPhysicalNameArgNull() {
+  public void testGetStreamPhysicalNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID, null);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID, null);
     assertNull(spec.getPhysicalName());
   }
 
   // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
   @Test
-  public void testgetStreamSystemNameArgValid() {
+  public void testGetStreamSystemNameArgValid() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
                                       StreamConfig.SYSTEM(), TEST_SYSTEM2);              // This too
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+    StreamSpec spec = runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
 
     assertEquals(STREAM_ID, spec.getId());
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -229,65 +251,65 @@ public class TestAbstractApplicationRunner {
 
   // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testgetStreamSystemNameArgInvalid() {
+  public void testGetStreamSystemNameArgInvalid() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM2);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+    runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
   }
 
   // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testgetStreamSystemNameArgEmpty() {
+  public void testGetStreamSystemNameArgEmpty() {
     Config config = buildStreamConfig(STREAM_ID,
         StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
         StreamConfig.SYSTEM(), TEST_SYSTEM2);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, "");
+    runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME, "");
   }
 
   // Null is not allowed for system name.
   @Test(expected = NullPointerException.class)
-  public void testgetStreamSystemNameArgNull() {
+  public void testGetStreamSystemNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
                                       StreamConfig.SYSTEM(), TEST_SYSTEM2);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, null);
+    runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME, null);
   }
 
   // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testgetStreamStreamIdInvalid() {
+  public void testGetStreamStreamIdInvalid() {
     Config config = buildStreamConfig(STREAM_ID_INVALID,
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStream(STREAM_ID_INVALID);
+    runner.getStreamSpec(STREAM_ID_INVALID);
   }
 
   // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
   @Test(expected = IllegalArgumentException.class)
-  public void testgetStreamStreamIdEmpty() {
+  public void testGetStreamStreamIdEmpty() {
     Config config = buildStreamConfig("",
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStream("");
+    runner.getStreamSpec("");
   }
 
   // Null is not allowed for streamId.
   @Test(expected = NullPointerException.class)
-  public void testgetStreamStreamIdNull() {
+  public void testGetStreamStreamIdNull() {
     Config config = buildStreamConfig(null,
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
     AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStream(null);
+    runner.getStreamSpec(null);
   }
 
 
@@ -328,7 +350,18 @@ public class TestAbstractApplicationRunner {
 
     @Override
     public void run(StreamApplication streamApp) {
-      // do nothing
+      // do nothing. We're only testing the stream creation methods at this point.
+    }
+
+    @Override
+    public void kill(StreamApplication streamApp) {
+      // do nothing. We're only testing the stream creation methods at this point.
+    }
+
+    @Override
+    public ApplicationStatus status(StreamApplication streamApp) {
+      // do nothing. We're only testing the stream creation methods at this point.
+      return null;
     }
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
new file mode 100644 (file)
index 0000000..05f3cc2
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.io.File;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.operators.StreamGraph;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestApplicationRunnerMain {
+
+  @Test
+  public void TestRunOperation() throws Exception {
+    assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
+    ApplicationRunnerMain.main(new String[]{
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        String.format("file://%s/src/test/resources/test.properties", new File(".").getCanonicalPath()),
+        "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + "=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy",
+        "-config", "app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts"
+    });
+
+    assertEquals(1, TestApplicationRunnerInvocationCounts.runCount);
+  }
+
+  @Test
+  public void TestKillOperation() throws Exception {
+    assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
+    ApplicationRunnerMain.main(new String[]{
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        String.format("file://%s/src/test/resources/test.properties", new File(".").getCanonicalPath()),
+        "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + "=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy",
+        "-config", "app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts",
+        "--operation=kill"
+    });
+
+    assertEquals(1, TestApplicationRunnerInvocationCounts.killCount);
+  }
+
+  @Test
+  public void TestStatusOperation() throws Exception {
+    assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
+    ApplicationRunnerMain.main(new String[]{
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        String.format("file://%s/src/test/resources/test.properties", new File(".").getCanonicalPath()),
+        "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + "=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy",
+        "-config", "app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts",
+        "--operation=status"
+    });
+
+    assertEquals(1, TestApplicationRunnerInvocationCounts.statusCount);
+  }
+
+  public static class TestApplicationRunnerInvocationCounts extends AbstractApplicationRunner {
+    protected static int runCount = 0;
+    protected static int killCount = 0;
+    protected static int statusCount = 0;
+
+    public TestApplicationRunnerInvocationCounts(Config config) {
+      super(config);
+    }
+
+    @Override
+    public void run(StreamApplication streamApp) {
+      runCount++;
+    }
+
+    @Override
+    public void kill(StreamApplication streamApp) {
+      killCount++;
+    }
+
+    @Override
+    public ApplicationStatus status(StreamApplication streamApp) {
+      statusCount++;
+      return ApplicationStatus.Running;
+    }
+  }
+
+  public static class TestStreamApplicationDummy implements StreamApplication {
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+
+    }
+  }
+}
index 17c5297..0a1314e 100644 (file)
@@ -29,6 +29,8 @@ import org.junit.Test
 
 object TestJobRunner {
   var processCount = 0
+  var killCount = 0
+  var getStatusCount = 0
 }
 
 class TestJobRunner {
@@ -42,6 +44,7 @@ class TestJobRunner {
   def testJobRunnerWorks {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
+    assertEquals(0, TestJobRunner.processCount)
     JobRunner.main(Array(
       "--config-factory",
       "org.apache.samza.config.factories.PropertiesConfigFactory",
@@ -49,16 +52,44 @@ class TestJobRunner {
       "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
     assertEquals(1, TestJobRunner.processCount)
   }
+
+  @Test
+  def testJobRunnerKillWorks {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+    assertEquals(0, TestJobRunner.killCount)
+    JobRunner.main(Array(
+      "--config-factory",
+      "org.apache.samza.config.factories.PropertiesConfigFactory",
+      "--config-path",
+      "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath,
+      "--operation=kill"))
+    assertEquals(1, TestJobRunner.killCount)
+  }
+
+  @Test
+  def testJobRunnerStatusWorks {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+    assertEquals(0, TestJobRunner.getStatusCount)
+    JobRunner.main(Array(
+      "--config-factory",
+      "org.apache.samza.config.factories.PropertiesConfigFactory",
+      "--config-path",
+      "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath,
+      "--operation=status"))
+    assertEquals(1, TestJobRunner.getStatusCount)
+  }
 }
 
 class MockJobFactory extends StreamJobFactory {
   def getJob(config: Config): StreamJob = {
     return new StreamJob {
       def submit() = { TestJobRunner.processCount += 1; this }
-      def kill() = this
+      def kill() = { TestJobRunner.killCount += 1; this }
       def waitForFinish(timeoutMs: Long) = ApplicationStatus.SuccessfulFinish
       def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = status
-      def getStatus() = ApplicationStatus.SuccessfulFinish
+      def getStatus() = { TestJobRunner.getStatusCount += 1; ApplicationStatus.SuccessfulFinish }
     }
   }
 }