SAMZA-975 - Initial Samza REST Implementation
authorJacob Maes <jacob.maes@gmail.com>
Tue, 23 Aug 2016 06:38:13 +0000 (23:38 -0700)
committerNavina Ramesh <nramesh@linkedin.com>
Tue, 23 Aug 2016 06:38:13 +0000 (23:38 -0700)
45 files changed:
LICENSE
build.gradle
gradle/dependency-versions.gradle
samza-rest/src/main/bash/run-samza-rest-service.sh [new file with mode: 0755]
samza-rest/src/main/config/samza-rest.properties [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/model/Job.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java [new file with mode: 0644]
samza-rest/src/main/resources/log4j.xml [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java [new file with mode: 0644]
samza-shell/src/main/bash/kill-yarn-job-by-name.sh [new file with mode: 0755]
settings.gradle

diff --git a/LICENSE b/LICENSE
index e1439fe..1d81cd3 100644 (file)
--- a/LICENSE
+++ b/LICENSE
@@ -252,3 +252,10 @@ SIL Open Font License (OFT) - http://scripts.sil.org/OFL/
 - Font-awesome font files v4.0.3 (http://fortawesome.github.io/Font-Awesome/)
 - Ropa Sans fonts v1.1 - Copyright (c) 2011, Botjo Nikoltchev, with Reserved Font Name "Ropa Sans"
 
+-----------------------------------------------------------------------
+ The CDDL License
+-----------------------------------------------------------------------
+
+The Apache Samza project bundles the following files under the CDDL License
+
+- Jersey v2.22.1 (https://jersey.java.net/license.html) - Copyright (c) 2010-2015 Oracle Corporation
\ No newline at end of file
index 1d4eb74..004c81e 100644 (file)
@@ -469,6 +469,64 @@ project(":samza-hdfs_$scalaVersion") {
     }
 }
 
+project(":samza-rest") {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(":samza-shell")
+    compile project(":samza-core_$scalaVersion")
+
+    runtime "org.slf4j:slf4j-log4j12:$slf4jVersion"
+    compile "com.google.guava:guava:$guavaVersion"
+    compile "org.glassfish.jersey.core:jersey-server:$jerseyVersion"
+    compile "org.glassfish.jersey.containers:jersey-container-servlet-core:$jerseyVersion"
+    compile "org.glassfish.jersey.containers:jersey-container-jetty-http:$jerseyVersion"
+    compile "org.glassfish.jersey.media:jersey-media-moxy:$jerseyVersion"
+    compile "org.eclipse.jetty.aggregate:jetty-all:$jettyVersion"
+    compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+      exclude group: 'com.sun.jersey'
+    }
+    runtime("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+      exclude group: 'com.sun.jersey'
+    }
+
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion"
+  }
+
+  tasks.create(name: "releaseRestServiceTar", type: Tar) {
+    description 'Build a tarball containing the samza-rest component and its dependencies'
+    compression = Compression.GZIP
+    from(file("$projectDir/src/main/config")) { into "config/" }
+    from(file("$projectDir/src/main/resources/log4j.xml")) { into "bin/" }
+    from(file("$projectDir/src/main/bash/run-samza-rest-service.sh")) { into "bin/" }
+    from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "bin/" }
+    from '../LICENSE'
+    from '../NOTICE'
+    from(configurations.runtime) { into("lib/") }
+    from(jar) { into("lib/") }
+  }
+
+  tasks.create(name: "restTarGz", type: Tar) {
+    description 'Build a tarball containing the samza-rest supplementary files'
+    compression = Compression.GZIP
+    from 'src/main/bash'
+    from 'src/main/resources'
+    from(project(':samza-shell').file("src/main/bash/run-class.sh"))
+  }
+
+  artifacts {
+    archives(restTarGz) {
+      name 'samza-rest-scripts'
+      classifier 'dist'
+    }
+  }
+}
+
 project(":samza-test_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'
index 47c71bf..8c757b9 100644 (file)
@@ -18,6 +18,7 @@
  */
  ext {
   elasticsearchVersion = "1.5.1"
+  jerseyVersion = "2.22.1"
   jodaTimeVersion = "2.2"
   joptSimpleVersion = "3.2"
   jacksonVersion = "1.9.13"
diff --git a/samza-rest/src/main/bash/run-samza-rest-service.sh b/samza-rest/src/main/bash/run-samza-rest-service.sh
new file mode 100755 (executable)
index 0000000..bd52afd
--- /dev/null
@@ -0,0 +1,22 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j.xml"
+[[ -z "$SAMZA_LOG_DIR" ]] && export SAMZA_LOG_DIR="$PWD/logs"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.rest.SamzaRestService "$@"
diff --git a/samza-rest/src/main/config/samza-rest.properties b/samza-rest/src/main/config/samza-rest.properties
new file mode 100644 (file)
index 0000000..7be0b47
--- /dev/null
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Service port. Set to 0 for a dynamic port.
+services.rest.port=9139
+
+# JobsResource
+job.proxy.factory.class=org.apache.samza.rest.proxy.job.SimpleYarnJobProxyFactory
+job.installations.path=/export/content/samza/deploy/
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java
new file mode 100644 (file)
index 0000000..d69df5f
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+/**
+ * A Monitor is a class implementing some functionality that should be done every N milliseconds on a YARN RM or NM.
+ * Classes specified in the config will have their monitor() method called at a configurable interval.
+ * For example, one could implement a Monitor that checks for leaked containers and kills them, ensuring that
+ * no leaked container survives on a NodeManager host for more than N ms (where N is the monitor run interval.)
+ *
+ * Implementations can override .toString() for better logging.
+ */
+public interface Monitor {
+
+    /**
+     * Do the work of the monitor. Because this can be arbitrary behavior up to and including script execution,
+     * IPC-related IOExceptions and concurrency-related InterruptedExceptions are caught by the SamzaMonitorService.
+     * @throws Exception if there was any problem running the monitor.
+     */
+    void monitor()
+        throws Exception;
+
+}
\ No newline at end of file
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
new file mode 100644 (file)
index 0000000..75f3867
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import java.lang.reflect.Constructor;
+
+class MonitorLoader {
+
+    private MonitorLoader() {}
+
+    public static Monitor fromClassName(String monitorClassName)
+        throws InstantiationException {
+        Object monitorObject;
+        try {
+            Class<?> klass = Class.forName(monitorClassName);
+            Constructor<?> constructor = klass.getConstructor();
+            monitorObject = constructor.newInstance();
+        } catch (Exception e) {
+            throw (InstantiationException)
+                new InstantiationException("Unable to instantiate " + monitorClassName).initCause(e);
+        }
+        if (!(monitorObject instanceof Monitor)) {
+            throw new InstantiationException(monitorClassName + " is not an instance of Monitor");
+        }
+        return (Monitor) monitorObject;
+    }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
new file mode 100644 (file)
index 0000000..2f4d9dd
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.apache.samza.rest.SamzaRestConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The class responsible for handling long-running/scheduled monitors in Samza REST.
+ * Takes a SamzaRestConfig object in the constructor and handles instantiation of
+ * monitors and scheduling them to run based on the properties in the config.
+ */
+public class SamzaMonitorService {
+
+    private static final Logger log = LoggerFactory.getLogger(SamzaMonitorService.class);
+
+    private final SchedulingProvider scheduler;
+    private final SamzaRestConfig config;
+
+    public SamzaMonitorService(SamzaRestConfig config, SchedulingProvider schedulingProvider) {
+        this.scheduler = schedulingProvider;
+        this.config = config;
+    }
+
+    public void start() {
+        List<Monitor> monitors = getMonitorsFromConfig(config);
+        int monitorRunInterval = config.getConfigMonitorIntervalMs();
+        for (Monitor monitor : monitors) {
+            log.debug("Scheduling monitor {} to run every {}ms", monitor, monitorRunInterval);
+            this.scheduler.schedule(getRunnable(monitor), monitorRunInterval);
+        }
+    }
+
+    public void stop() {
+        this.scheduler.stop();
+    }
+
+    private Runnable getRunnable(final Monitor monitor) {
+        return new Runnable() {
+            public void run() {
+                try {
+                    monitor.monitor();
+                } catch (IOException e) {
+                    log.warn("Caught IOException during " + monitor.toString() + ".monitor()", e);
+                } catch (InterruptedException e) {
+                    log.warn("Caught InterruptedException during " + monitor.toString() + ".monitor()", e);
+                } catch (Exception e) {
+                    log.warn("Unexpected exception during {}.monitor()", monitor, e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Get all the registered monitors for the service.
+     * @return a list of Monitor objects ready to be scheduled.
+     */
+    private static List<Monitor> getMonitorsFromConfig(SamzaRestConfig config) {
+        List<String> classNames = config.getConfigMonitorClassList();
+        List<Monitor> monitors = new ArrayList<>();
+
+        for (String name: classNames) {
+            try {
+                Monitor monitor = MonitorLoader.fromClassName(name);
+                monitors.add(monitor);
+            } catch (InstantiationException e) {
+                log.warn("Unable to instantiate monitor " + name, e);
+            }
+        }
+        return monitors;
+    }
+
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
new file mode 100644 (file)
index 0000000..c0c448c
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+public class ScheduledExecutorSchedulingProvider implements SchedulingProvider {
+
+    private final ScheduledExecutorService scheduler;
+
+    public ScheduledExecutorSchedulingProvider(ScheduledExecutorService scheduler) {
+        this.scheduler = scheduler;
+    }
+
+    public void schedule(Runnable runnable, int interval) {
+        this.scheduler.scheduleAtFixedRate(runnable, 0, interval, MILLISECONDS);
+    }
+
+    public void stop() {
+        this.scheduler.shutdownNow();
+    }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
new file mode 100644 (file)
index 0000000..aea1a92
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides scheduling functionality to the SamzaMonitorService.
+ */
+public interface SchedulingProvider {
+    /* Schedule a the given Runnable to run() every INTERVAL ms. */
+    void schedule(Runnable runnable, int intervalMs);
+
+    /* Stop any future executions of any scheduled tasks. */
+    void stop();
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
new file mode 100644 (file)
index 0000000..61f3c46
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.resources.DefaultResourceFactory;
+import org.apache.samza.rest.resources.ResourceFactory;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Samza REST implementation of the JAX-RS {@link javax.ws.rs.core.Application} model.
+ */
+public class SamzaRestApplication extends ResourceConfig {
+
+  private static final Logger log = LoggerFactory.getLogger(SamzaRestApplication.class);
+
+  public SamzaRestApplication(SamzaRestConfig config) {
+    register(JacksonJsonProvider.class);
+    registerConfiguredResources(config);
+  }
+
+  /**
+   * Registers resources specified in the config. If there are no factories
+   * or resources specified in the config, it uses the
+   * {@link org.apache.samza.rest.resources.DefaultResourceFactory}
+   *
+   * @param config  the config to pass to the factories.
+   */
+  private void registerConfiguredResources(SamzaRestConfig config) {
+    try {
+      // Use default if there were no configured resources or factories
+      if (config.getResourceFactoryClassNames().isEmpty() && config.getResourceClassNames().isEmpty()) {
+        log.info("No resource factories or classes configured. Using DefaultResourceFactory.");
+        registerInstances(new DefaultResourceFactory().getResourceInstances(config).toArray());
+        return;
+      }
+
+      for (String factoryClassName : config.getResourceFactoryClassNames()) {
+        log.info("Invoking factory {}", factoryClassName);
+        registerInstances(instantiateFactoryResources(factoryClassName, config).toArray());
+      }
+
+      for (String resourceClassName : config.getResourceClassNames()) {
+        log.info("Using resource class {}", resourceClassName);
+        register(Class.forName(resourceClassName));
+      }
+    } catch (Throwable t) {
+      throw new SamzaException(t);
+    }
+  }
+
+  /**
+   * Passes the specified config to the specified factory to instantiate its resources.
+   *
+   * @param factoryClassName  the name of a class that implements {@link ResourceFactory}
+   * @param config            the config to pass to the factory
+   * @return                  a collection of resources returned by the factory.
+   * @throws InstantiationException
+   */
+  private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config)
+      throws InstantiationException {
+    try {
+      Class factoryCls = Class.forName(factoryClassName);
+      ResourceFactory factory = (ResourceFactory) factoryCls.newInstance();
+      return factory.getResourceInstances(config);
+    } catch (Exception e) {
+      throw (InstantiationException)
+          new InstantiationException("Unable to instantiate " + factoryClassName).initCause(e);
+    }
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java
new file mode 100644 (file)
index 0000000..6f5c10a
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+
+
+/**
+ * The set of configurations required by the core components of the {@link org.apache.samza.rest.SamzaRestService}.
+ * Other configurations (e.g. from {@link org.apache.samza.config.JobConfig}) may also be used by some of the
+ * implementation classes.
+ */
+public class SamzaRestConfig extends MapConfig {
+  /**
+   * Specifies a comma-delimited list of class names that implement ResourceFactory.
+   * These factories will be used to create specific instances of resources, passing the server config.
+   */
+  public static final String CONFIG_REST_RESOURCE_FACTORIES = "rest.resource.factory.classes";
+
+  /**
+   * Specifies a comma-delimited list of class names of resources to register with the server.
+   */
+  public static final String CONFIG_REST_RESOURCE_CLASSES = "rest.resource.classes";
+
+  /**
+   * Specifies a comma-delimited list of class names corresponding to Monitor implementations.
+   * These will be instantiated and scheduled to run periodically at runtime.
+   * Note that you must include the ENTIRE package name (org.apache.samza...).
+   */
+  public static final String CONFIG_MONITOR_CLASSES = "monitor.classes";
+
+  /**
+   * Specifies the interval at which each registered Monitor's monitor method will be called.
+   */
+  public static final String CONFIG_MONITOR_INTERVAL_MS = "monitor.run.interval.ms";
+
+  /**
+   * Monitors run every 60s by default
+   */
+  private static final int DEFAULT_MONITOR_INTERVAL = 60000;
+
+  /**
+   * The port number to use for the HTTP server or 0 to dynamically choose a port.
+   */
+  public static final String CONFIG_SAMZA_REST_SERVICE_PORT = "services.rest.port";
+
+  public SamzaRestConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_SAMZA_REST_SERVICE_PORT
+   * @return  the port number to use for the HTTP server or 0 to dynamically choose a port.
+   */
+  public int getPort() {
+    return getInt(CONFIG_SAMZA_REST_SERVICE_PORT);
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_REST_RESOURCE_FACTORIES
+   * @return a list of class names as Strings corresponding to factories
+   *          that Samza REST should use to instantiate and register resources
+   *          or an empty list if none were configured.
+   */
+  public List<String> getResourceFactoryClassNames() {
+    return parseCommaDelimitedStrings(get(CONFIG_REST_RESOURCE_FACTORIES));
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_REST_RESOURCE_CLASSES
+   * @return a list of class names as Strings corresponding to resource classes
+   *          that Samza REST should register or an empty list if none were configured.
+   */
+  public List<String> getResourceClassNames() {
+    return parseCommaDelimitedStrings(get(CONFIG_REST_RESOURCE_CLASSES));
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_MONITOR_CLASSES
+   * @return a list of class names as Strings corresponding to Monitors that
+   *          Samza REST should schedule or an empty list if none were configured.
+   */
+  public List<String> getConfigMonitorClassList() {
+    return parseCommaDelimitedStrings(get(CONFIG_MONITOR_CLASSES));
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_MONITOR_INTERVAL_MS
+   * @return an integer number of milliseconds, the period at which to schedule monitor runs.
+   */
+  public int getConfigMonitorIntervalMs() {
+    return getInt(CONFIG_MONITOR_INTERVAL_MS, DEFAULT_MONITOR_INTERVAL);
+  }
+
+  /**
+   * Parses a string containing a set of comma-delimited strings. Whitespace is ignored.
+   * If the input string is null or empty, an empty list is returned.
+   *
+   * @param commaDelimitedStrings the string to parse.
+   * @return                      the list of strings parsed from the input or an empty list if none.
+   */
+  private static List<String> parseCommaDelimitedStrings(String commaDelimitedStrings) {
+    if (commaDelimitedStrings == null || commaDelimitedStrings.trim().isEmpty()) {
+      return Collections.emptyList();
+    }
+    return Arrays.asList(commaDelimitedStrings.split("\\s*,\\s*"));
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
new file mode 100644 (file)
index 0000000..5b34da8
--- /dev/null
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.monitor.SamzaMonitorService;
+import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider;
+import org.apache.samza.util.CommandLine;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Servlet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+/**
+ * The main Java class for the Samza REST API. It runs an embedded Jetty server so it can be deployed as a Jar file.
+ *
+ * This class can be started from the command line by providing the --config-path parameter to a Samza REST config file
+ * which will be used to configure the default resources exposed by the API.
+ *
+ * It can also be managed programmatically using the
+ * {@link org.apache.samza.rest.SamzaRestService#addServlet(javax.servlet.Servlet, String)},
+ * {@link #start()} and {@link #stop()} methods.
+ */
+public class SamzaRestService {
+
+  private static final Logger log = LoggerFactory.getLogger(SamzaRestService.class);
+
+  private final Server server;
+  private final ServletContextHandler context;
+
+
+  public SamzaRestService(SamzaRestConfig config) {
+    log.info("Creating new SamzaRestService with config: {}", config);
+    server = new Server(config.getPort());
+
+    context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+  }
+
+  /**
+   * Command line interface to run the server.
+   *
+   * @param args arguments supported by {@link org.apache.samza.util.CommandLine}.
+   *             In particular, --config-path and --config-factory are used to read the Samza REST config file.
+   * @throws Exception if the server could not be successfully started.
+   */
+  public static void main(String[] args)
+      throws Exception {
+    try {
+      SamzaRestConfig config = parseConfig(args);
+      SamzaRestService restService = new SamzaRestService(config);
+
+      // Add applications
+      SamzaRestApplication samzaRestApplication = new SamzaRestApplication(config);
+      ServletContainer container = new ServletContainer(samzaRestApplication);
+      restService.addServlet(container, "/*");
+
+      // Schedule monitors to run
+      ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1);
+      ScheduledExecutorSchedulingProvider schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService);
+      SamzaMonitorService monitorService = new SamzaMonitorService(config, schedulingProvider);
+      monitorService.start();
+
+      restService.runBlocking();
+      monitorService.stop();
+    } catch (Throwable t) {
+      log.error("Exception in main.", t);
+    }
+  }
+
+  /**
+   * Reads a {@link org.apache.samza.config.Config} from command line parameters.
+   * @param args  the command line parameters supported by {@link org.apache.samza.util.CommandLine}.
+   * @return      the parsed {@link org.apache.samza.config.Config}.
+   */
+  private static SamzaRestConfig parseConfig(String[] args) {
+    CommandLine cmd = new CommandLine();
+    OptionSet options = cmd.parser().parse(args);
+    MapConfig cfg = cmd.loadConfig(options);
+    return new SamzaRestConfig(new MapConfig(cfg));
+  }
+
+  /**
+   * Adds the specified {@link javax.servlet.Servlet} to the server at the specified path.
+   * @param servlet the {@link javax.servlet.Servlet} to be added.
+   * @param path    the path for the servlet.
+   */
+  public void addServlet(Servlet servlet, String path) {
+    log.info("Adding servlet {} for path {}", servlet, path);
+    ServletHolder holder = new ServletHolder(servlet);
+    context.addServlet(holder, path);
+    holder.setInitOrder(0);
+  }
+
+  /**
+   * Runs the server and waits for it to finish.
+   *
+   * @throws Exception if the server could not be successfully started.
+   */
+  private void runBlocking()
+      throws Exception {
+    try {
+      start();
+      server.join();
+    } finally {
+      server.destroy();
+      log.info("Server terminated.");
+    }
+  }
+
+  /**
+   * Starts the server asynchronously. To stop the server, see {@link #stop()}.
+   *
+   * @throws Exception if the server could not be successfully started.
+   */
+  public void start()
+      throws Exception {
+    log.info("Starting server on port {}", server.getConnectors()[0].getPort());
+    server.start();
+    log.info("Server is running");
+  }
+
+  /**
+   * Stops the server.
+   *
+   * @throws Exception if the server could not be successfully stopped.
+   */
+  public void stop()
+      throws Exception {
+    log.info("Stopping server");
+    server.stop();
+    log.info("Server is stopped");
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java
new file mode 100644 (file)
index 0000000..e540635
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * The client view of a job. Includes the job name, id, and status.
+ *
+ * The minimum goal is to provide enough information to execute REST commands on that job, so the name and id are required.
+ */
+public class Job {
+  private String jobName;
+  private String jobId;
+  private JobStatus status = JobStatus.UNKNOWN; // started, stopped, or starting
+  private String statusDetail; // Detailed status e.g. for YARN it could be ACCEPTED, RUNNING, etc.
+
+  public Job(String jobName, String jobId) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+  }
+
+  public Job(@JsonProperty("jobName") String jobName, @JsonProperty("jobId")String jobId, @JsonProperty("status") JobStatus status, @JsonProperty("statusDetail") String statusDetail) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.status = status;
+    this.statusDetail = statusDetail;
+  }
+
+  public Job() {
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(JobStatus status) {
+    this.status = status;
+  }
+
+  public String getStatusDetail() {
+    return statusDetail;
+  }
+
+  public void setStatusDetail(String statusDetail) {
+    this.statusDetail = statusDetail;
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java b/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java
new file mode 100644 (file)
index 0000000..8c655d8
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+/**
+ * The abstract status of the job, irrespective of the status in any underlying cluster management system (e.g. YARN).
+ * This status is the client view of the job status.
+ */
+public enum JobStatus {
+
+    /** Job is in the process of starting but is not yet running. */
+    STARTING("starting"),
+
+    /** Job has been started. */
+    STARTED("started"),
+
+    /** Job has been stopped. */
+    STOPPED("stopped"),
+
+    /** Job status is unknown. */
+    UNKNOWN("unknown");
+
+  private final String stringVal;
+
+  JobStatus(final String stringVal) {
+    this.stringVal = stringVal;
+  }
+
+  @Override
+  public String toString() {
+    return stringVal;
+  }
+
+  public boolean hasBeenStarted() {
+    return !(this == STOPPED || this == UNKNOWN);
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java
new file mode 100644 (file)
index 0000000..1ac49ae
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.installation;
+
+import java.util.Map;
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+/**
+ * Finds all the installed jobs. For example, one implementation may take an installation root directory
+ * and scan all subdirectories for job installations.
+ *
+ * Provides a map from a {@link JobInstance} to its {@link InstallationRecord} based on the structure within the
+ * installation directory. Implementations of this interface should encapsulate any custom installation
+ * structure such that the resulting {@link InstallationRecord} simply contains the locations of the files
+ * needed to control the job.
+ */
+public interface InstallationFinder {
+
+  /**
+   * @param jobInstance the job to check.
+   * @return            <code>true</code> if a job with the specified name and id is installed on the local host.
+   */
+  boolean isInstalled(JobInstance jobInstance);
+
+  /**
+   * @return  a map from each {@link JobInstance} to the corresponding {@link InstallationRecord}
+   *          for each Samza installation found in the installRoot.
+   */
+  Map<JobInstance, InstallationRecord> getAllInstalledJobs();
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java
new file mode 100644 (file)
index 0000000..27b1ab8
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.installation;
+
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+/**
+ * Represents an installation of one Samza job instance on a file system.
+ *
+ * This class does not hard code any knowledge about the structure of a Samza job installation. Rather, it
+ * just points to the relevant paths within the installation. The structure is resolved by an implementation
+ * of the {@link InstallationFinder} interface.
+ */
+public class InstallationRecord extends JobInstance {
+
+  private final String rootPath;
+  private final String configFilePath;
+  private final String binPath;
+
+  public InstallationRecord(String jobName, String jobId, String rootPath, String configFilePath, String binPath) {
+    super(jobName, jobId);
+    this.rootPath = rootPath;
+    this.configFilePath = configFilePath;
+    this.binPath = binPath;
+  }
+
+  /**
+   * @return  the path of the config file for the job.
+   */
+  public String getConfigFilePath() {
+    return configFilePath;
+  }
+
+  /**
+   * @return  the path of the directory containing the scripts.
+   */
+  public String getScriptFilePath() {
+    return binPath;
+  }
+
+  /**
+   * @return  the root path of the installed Samza job on the file system. This path may be in common with
+   *          other job instances if, for example, there are multiple configs defining separate instances.
+   */
+  public String getRootPath() {
+    return rootPath;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Job %s installed at %s", super.toString(), getRootPath());
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java
new file mode 100644 (file)
index 0000000..0adad5b
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.installation;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A simple default implementation of {@link InstallationFinder}.
+ *
+ * Assumes that one or more Samza jobs are contained in each sub directory of the provided installationsPath.
+ * Each sub directory is also expected to contian a bin directory and a config directory containing one or
+ * more job config files.
+ */
+public class SimpleInstallationFinder implements InstallationFinder {
+  private static final Logger log = LoggerFactory.getLogger(SimpleInstallationFinder.class);
+
+  protected static final String BIN_SUBPATH = "bin";
+  protected static final String CFG_SUBPATH = "config";
+
+  protected final String installationsPath;
+  protected final ConfigFactory jobConfigFactory;
+
+  /**
+   * Required constructor.
+   *
+   * @param installationsPath the root path where all Samza jobs are installed.
+   * @param jobConfigFactory  the {@link ConfigFactory} to use to read the job configs.
+   */
+  public SimpleInstallationFinder(String installationsPath, ConfigFactory jobConfigFactory) {
+    this.installationsPath = installationsPath;
+    this.jobConfigFactory = jobConfigFactory;
+  }
+
+  @Override
+  public boolean isInstalled(JobInstance jobInstance) {
+    return getAllInstalledJobs().containsKey(jobInstance);
+  }
+
+  @Override
+  public Map<JobInstance, InstallationRecord> getAllInstalledJobs() {
+    Map<JobInstance, InstallationRecord> installations = new HashMap<>();
+    for (File jobInstallPath : new File(installationsPath).listFiles()) {
+      if (!jobInstallPath.isDirectory()) {
+        continue;
+      }
+
+      findJobInstances(jobInstallPath, installations);
+    }
+    return installations;
+  }
+
+  /**
+   * Finds all the job instances in the specified path and adds a corresponding {@link JobInstance} and
+   * {@link InstallationRecord} for each instance.
+   *
+   * @param jobInstallPath  the path to search for job instances.
+   * @param jobs            the map to which the job instances will be added.
+   */
+  private void findJobInstances(final File jobInstallPath, final Map<JobInstance, InstallationRecord> jobs) {
+    try {
+      String jobInstallCanonPath = jobInstallPath.getCanonicalPath();
+      File configPath = Paths.get(jobInstallCanonPath, CFG_SUBPATH).toFile();
+      if (!(configPath.exists() && configPath.isDirectory())) {
+        log.debug("Config path not found: " + configPath);
+        return;
+      }
+
+      for (File configFile : configPath.listFiles()) {
+
+        if (configFile.isFile()) {
+
+          String configFilePath = configFile.getCanonicalPath();
+          Config config = jobConfigFactory.getConfig(new URI("file://" + configFilePath));
+
+          if (config.containsKey(JobConfig.JOB_NAME()) && config.containsKey(JobConfig.STREAM_JOB_FACTORY_CLASS())) {
+
+            String jobName = config.get(JobConfig.JOB_NAME());
+            String jobId = config.get(JobConfig.JOB_ID(), "1");
+            JobInstance jobInstance = new JobInstance(jobName, jobId);
+
+            if (jobs.containsKey(jobInstance)) {
+              throw new IllegalStateException(
+                  String.format("Found more than one job config with jobName:%s and jobId:%s", jobName, jobId));
+            }
+            InstallationRecord jobInstall =
+                new InstallationRecord(jobName, jobId, jobInstallCanonPath, configFilePath, getBinPath(jobInstallCanonPath));
+            jobs.put(jobInstance, jobInstall);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new SamzaException("Exception finding job instance in path: " + jobInstallPath, e);
+    }
+  }
+
+  /**
+   * @param jobInstallPath  the root path of the job installation.
+   * @return                the bin directory within the job installation.
+   */
+  private String getBinPath(String jobInstallPath) {
+    return Paths.get(jobInstallPath, BIN_SUBPATH).toString();
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
new file mode 100644 (file)
index 0000000..bcc88d0
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements a subset of the {@link JobProxy} interface with the default, cluster-agnostic,
+ * implementations. Subclasses are expected to override these default methods where necessary.
+ */
+public abstract class AbstractJobProxy implements JobProxy {
+  private static final Logger log = LoggerFactory.getLogger(AbstractJobProxy.class);
+
+  protected final JobsResourceConfig config;
+
+  /**
+   * Creates a new JobProxy instance from the factory class specified in the config.
+   *
+   * @param config  the config containing the job proxy factory property.
+   * @return        the JobProxy produced by the factory.
+   */
+  public static JobProxy fromFactory(JobsResourceConfig config) {
+    String jobProxyFactory = config.getJobProxyFactory();
+    if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
+      try {
+        Class factoryCls = Class.forName(jobProxyFactory);
+        JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance();
+        return factory.getJobProxy(config);
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    } else {
+      throw new SamzaException("Missing config: " + JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY);
+    }
+  }
+
+  /**
+   * Required constructor.
+   *
+   * @param config  the config containing the installations path.
+   */
+  public AbstractJobProxy(JobsResourceConfig config) {
+    this.config = config;
+  }
+
+  @Override
+  public List<Job> getAllJobStatuses()
+      throws IOException, InterruptedException {
+    List<Job> allJobs = new ArrayList<>();
+    Collection<JobInstance> jobInstances = getAllJobInstances();
+    for(JobInstance jobInstance : jobInstances) {
+        allJobs.add(new Job(jobInstance.getJobName(), jobInstance.getJobId()));
+    }
+    getJobStatusProvider().getJobStatuses(allJobs);
+
+    return allJobs;
+  }
+
+  /**
+   * Convenience method to get the Samza job status from the name and id.
+   *
+   * @param jobInstance           the instance of the job.
+   * @return                      the current Samza status for the job.
+   * @throws IOException          if there was a problem executing the command to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting for the status result.
+   */
+  protected JobStatus getJobSamzaStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    return getJobStatus(jobInstance).getStatus();
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    return getJobStatusProvider().getJobStatus(jobInstance);
+  }
+
+  @Override
+  public boolean jobExists(JobInstance jobInstance) {
+    return getAllJobInstances().contains(jobInstance);
+  }
+
+  /**
+   * @return the {@link ConfigFactory} to use to read job configuration files.
+   */
+  protected ConfigFactory getJobConfigFactory() {
+    String configFactoryClassName = config.get(JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY);
+    if (configFactoryClassName == null) {
+      configFactoryClassName = PropertiesConfigFactory.class.getCanonicalName();
+      log.warn("{} not specified. Defaulting to {}", JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName);
+    }
+
+    try {
+      Class factoryCls = Class.forName(configFactoryClassName);
+      return (ConfigFactory) factoryCls.newInstance();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+  /**
+   * @return the {@link JobStatusProvider} to use in retrieving the job status.
+   */
+  protected abstract JobStatusProvider getJobStatusProvider();
+
+  /**
+   * @return all available job instances.
+   */
+  protected abstract Set<JobInstance> getAllJobInstances();
+
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java
new file mode 100644 (file)
index 0000000..97599c2
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Allows us to encapsulate the jobName,jobId tuple as one entity.
+ */
+public class JobInstance {
+
+  private final String jobName;
+  private final String jobId;
+
+  /**
+   * Required constructor.
+   *
+   * @param jobName the name of the job.
+   * @param jobId   the id of the job.
+   */
+  public JobInstance(String jobName, String jobId) {
+    this.jobName = Preconditions.checkNotNull(jobName);
+    this.jobId = Preconditions.checkNotNull(jobId);
+  }
+
+  /**
+   * @return  the name of the job.
+   */
+  public String getJobName() {
+    return jobName;
+  }
+
+  /**
+   * @return  the id of the job.
+   */
+  public String getJobId() {
+    return jobId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 139;
+    int hash = prime * jobName.hashCode() + prime * jobId.hashCode();
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof JobInstance)) {
+      return false;
+    }
+
+    JobInstance otherJob = (JobInstance) other;
+    return this.jobName.equals(otherJob.jobName) && this.jobId.equals(otherJob.jobId);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("jobName:%s jobId:%s", jobName, jobId);
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java
new file mode 100644 (file)
index 0000000..7e168d7
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+
+
+/**
+ * Job proxy is the primary abstraction used by the REST API to interact with jobs.
+ *
+ * Concrete implementations of this interface may vary in the APIs they use to discover, start, stop, and stat jobs,
+ * how to interact with the installed jobs on disk, etc.
+ */
+public interface JobProxy {
+  /**
+   * @param jobInstance the instance of the job
+   * @return            true if the job exists and can be started, stopped, etc.
+   */
+  boolean jobExists(JobInstance jobInstance);
+
+  /**
+   * @return                      a {@link Job} for each Samza job instance installed on this host.
+   * @throws IOException          if there was a problem executing the command to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting for the status result.
+   */
+  List<Job> getAllJobStatuses()
+      throws IOException, InterruptedException;
+
+  /**
+   * @param jobInstance           the instance of the job for which the status is needed.
+   * @return                      a {@link Job} containing
+   *                              the status for the job specified by jobName and jobId.
+   * @throws IOException          if there was a problem executing the command to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting for the status result.
+   */
+  Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException;
+
+  /**
+   * Starts the job instance specified by jobName and jobId. When this method returns, the status of the job
+   * should be {@link JobStatus#STARTING} or
+   * {@link JobStatus#STARTED} depending on the implementation.
+   *
+   * @param jobInstance the instance of the job to start.
+   * @throws Exception  if the job could not be successfully started.
+   */
+  void start(JobInstance jobInstance)
+      throws Exception;
+
+  /**
+   * Stops the job instance specified by jobName and jobId. When this method returns, the status of the job
+   * should be {@link JobStatus#STOPPED}.
+   *
+   * @param jobInstance the instance of the job to stop.
+   * @throws Exception  if the job could not be successfully stopped.
+   */
+  void stop(JobInstance jobInstance)
+      throws Exception;
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
new file mode 100644 (file)
index 0000000..067711a
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+/**
+ * Simple factory interface to produce instances of {@link JobProxy},
+ * depending on the implementation.
+ *
+ * To use a custom {@link JobProxy}, create an implementation of that interface, an implementation
+ * of this interface which instantiates the custom proxy and finally reference the custom factory
+ * in the config {@link JobsResourceConfig#CONFIG_JOB_PROXY_FACTORY}.
+ */
+public interface JobProxyFactory {
+
+  /**
+   * Creates a new {@link JobProxy} and initializes it with the specified config.
+   *
+   * @param config  the {@link org.apache.samza.rest.SamzaRestConfig} to pass to the proxy.
+   * @return        the created proxy.
+   */
+  JobProxy getJobProxy(JobsResourceConfig config);
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java
new file mode 100644 (file)
index 0000000..23a7f73
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.samza.rest.model.Job;
+
+
+/**
+ * Interface for getting job status independent of the underlying cluster implementation.
+ */
+public interface JobStatusProvider {
+  /**
+   * Populates the status* fields of each {@link Job} in the provided Collection.
+   *
+   * @param jobs                  the collection of {@link Job} for which the status is needed.
+   * @throws IOException          if there was a problem executing the command to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting for the status result.
+   */
+  void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException;
+
+  /**
+   * @param jobInstance           the instance of the job.
+   * @return                      a {@link Job} containing
+   *                              the status for the job specified by jobName and jobId.
+   * @throws IOException          if there was a problem executing the command to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting for the status result.
+   */
+  Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException;
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java
new file mode 100644 (file)
index 0000000..2d14366
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.Paths;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.rest.script.ScriptPathProvider;
+import org.apache.samza.rest.script.ScriptRunner;
+
+/**
+ * Extends {@link AbstractJobProxy} with some script support functionality.
+ */
+public abstract class ScriptJobProxy extends AbstractJobProxy implements ScriptPathProvider {
+
+  protected final ScriptRunner scriptRunner = new ScriptRunner();
+
+  /**
+   * Required constructor.
+   *
+   * @param config  the config which specifies the path to the Samza framework installation.
+   */
+  public ScriptJobProxy(JobsResourceConfig config) {
+    super(config);
+  }
+
+  /**
+   * Constructs the path to the specified script within the job installation.
+   *
+   * @param jobInstance             the instance of the job.
+   * @param scriptName              the name of the script.
+   * @return                        the full path to the script.
+   * @throws FileNotFoundException  if the job installation path doesn't exist.
+   */
+  public String getScriptPath(JobInstance jobInstance, String scriptName)
+      throws FileNotFoundException {
+    String scriptPath;
+    InstallationRecord jobInstallation = getInstallationFinder().getAllInstalledJobs().get(jobInstance);
+    scriptPath = Paths.get(jobInstallation.getScriptFilePath(), scriptName).toString();
+
+    File scriptFile = new File(scriptPath);
+    if (!scriptFile.exists()) {
+      throw new FileNotFoundException("Script does not exist: " + scriptPath);
+    }
+    return scriptPath;
+  }
+
+  /**
+   * @return the {@link InstallationFinder} which will be used to find jobs installed on this machine.
+   */
+  protected abstract InstallationFinder getInstallationFinder();
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
new file mode 100644 (file)
index 0000000..a935c98
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Extends the {@link ScriptJobProxy} with methods specific to simple Samza deployments.
+ */
+public class SimpleYarnJobProxy extends ScriptJobProxy {
+  private static final Logger log = LoggerFactory.getLogger(SimpleYarnJobProxy.class);
+
+  private static final String START_SCRIPT_NAME = "run-job.sh";
+  private static final String STOP_SCRIPT_NAME = "kill-yarn-job-by-name.sh";
+
+  private static final String CONFIG_FACTORY_PARAM = "--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory";
+  private static final String CONFIG_PATH_PARAM_FORMAT = "--config-path=file://%s";
+
+  private final JobStatusProvider statusProvider = new YarnCliJobStatusProvider(this);
+
+  private final InstallationFinder installFinder;
+
+  public SimpleYarnJobProxy(JobsResourceConfig config) {
+    super(config);
+
+    installFinder = new SimpleInstallationFinder(config.getInstallationsPath(), getJobConfigFactory());
+  }
+
+  @Override
+  public void start(JobInstance jobInstance)
+      throws Exception {
+    JobStatus currentStatus = getJobSamzaStatus(jobInstance);
+    if (currentStatus.hasBeenStarted()) {
+      log.info("Job {} will not be started because it is currently {}.", jobInstance, currentStatus.toString());
+      return;
+    }
+
+    String scriptPath = getScriptPath(jobInstance, START_SCRIPT_NAME);
+    int resultCode = scriptRunner.runScript(scriptPath, CONFIG_FACTORY_PARAM,
+        generateConfigPathParameter(jobInstance));
+    if (resultCode != 0) {
+      throw new SamzaException("Failed to start job. Result code: " + resultCode);
+    }
+  }
+
+  @Override
+  public void stop(JobInstance jobInstance)
+      throws Exception {
+    JobStatus currentStatus = getJobSamzaStatus(jobInstance);
+    if (!currentStatus.hasBeenStarted()) {
+      log.info("Job {} will not be stopped because it is currently {}.", jobInstance, currentStatus.toString());
+      return;
+    }
+
+    String scriptPath = getScriptPath(jobInstance, STOP_SCRIPT_NAME);
+    int resultCode = scriptRunner.runScript(scriptPath, YarnCliJobStatusProvider.getQualifiedJobName(jobInstance));
+    if (resultCode != 0) {
+      throw new SamzaException("Failed to stop job. Result code: " + resultCode);
+    }
+  }
+
+  /**
+   * Generates the command line argument which specifies the path to the config file for the job.
+   *
+   * @param jobInstance the instance of the job.
+   * @return            the --config-path command line argument.
+   */
+  private String generateConfigPathParameter(JobInstance jobInstance) {
+    InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
+    return String.format(CONFIG_PATH_PARAM_FORMAT, record.getConfigFilePath());
+  }
+
+  /**
+   * @return the {@link JobStatusProvider} to use for retrieving job status.
+   */
+  public JobStatusProvider getJobStatusProvider() {
+    return statusProvider;
+  }
+
+  @Override
+  protected Set<JobInstance> getAllJobInstances() {
+    return installFinder.getAllInstalledJobs().keySet();
+  }
+
+  @Override
+  protected InstallationFinder getInstallationFinder() {
+    return installFinder;
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
new file mode 100644 (file)
index 0000000..11d93d4
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+/**
+ * Factory to produce SimpleJobProxy instances.
+ *
+ * See {@link AbstractJobProxy#fromFactory(org.apache.samza.rest.resources.JobsResourceConfig)}
+ */
+public class SimpleYarnJobProxyFactory implements JobProxyFactory {
+
+  @Override
+  public JobProxy getJobProxy(JobsResourceConfig config) {
+    return new SimpleYarnJobProxy(config);
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
new file mode 100644 (file)
index 0000000..d1f34e8
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.script.ScriptOutputHandler;
+import org.apache.samza.rest.script.ScriptPathProvider;
+import org.apache.samza.rest.script.ScriptRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the {@link JobStatusProvider} that retrieves
+ * the job status from the YARN command line interface.
+ */
+public class YarnCliJobStatusProvider implements JobStatusProvider {
+  private static final Logger log = LoggerFactory.getLogger(YarnCliJobStatusProvider.class);
+  private static final String JOB_NAME_ID_FORMAT = "%s_%s";
+  private final ScriptPathProvider scriptPathProvider;
+
+  /**
+   * Constructs the job name used in YARN. This is the value shown in the "Name"
+   * column of the Resource Manager UI.
+   *
+   * @param jobInstance the instance of the job.
+   * @return            the job name to use for the job in YARN.
+   */
+  public static String getQualifiedJobName(JobInstance jobInstance) {
+    return String.format(JOB_NAME_ID_FORMAT, jobInstance.getJobName(), jobInstance.getJobId());
+  }
+
+  /**
+   * Default constructor.
+   *
+   * @param provider a delegate that provides the path to the Samza yarn scripts.
+   */
+  public YarnCliJobStatusProvider(ScriptPathProvider provider) {
+    scriptPathProvider = provider;
+  }
+
+  @Override
+  public void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException {
+    if (jobs == null || jobs.isEmpty()) {
+      return;
+    }
+
+    // If the scripts are in the jobs, they will be in all job installations, so just pick one and get the script path.
+    Job anyJob = jobs.iterator().next();
+    String scriptPath = scriptPathProvider.getScriptPath(new JobInstance(anyJob.getJobName(), anyJob.getJobId()), "run-class.sh");
+
+    // We will identify jobs returned by the YARN application states by their qualified names, so build a map
+    // to translate back from that name to the JobInfo we wish to populate. This avoids parsing/delimiter issues.
+    final Map<String, Job> qualifiedJobToInfo = new HashMap<>();
+    for(Job job : jobs) {
+      qualifiedJobToInfo.put(getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId())), job);
+    }
+
+    // Run "application -list" command and get the YARN state for each application
+    ScriptRunner runner = new ScriptRunner();
+    int resultCode = runner.runScript(scriptPath, new ScriptOutputHandler() {
+      @Override
+      public void processScriptOutput(InputStream output)
+          throws IOException {
+        InputStreamReader isr = new InputStreamReader(output);
+        BufferedReader br = new BufferedReader(isr);
+        String line;
+        String APPLICATION_PREFIX = "application_";
+        log.debug("YARN status:");
+        while ((line = br.readLine()) != null) {
+          log.debug(line);
+          if (line.startsWith(APPLICATION_PREFIX)) {
+            String[] columns = line.split("\\s+");
+            String qualifiedName = columns[1];
+            String yarnState = columns[5];
+
+            JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnState.toUpperCase()));
+            Job job = qualifiedJobToInfo.get(qualifiedName);
+
+            // If job is null, it wasn't requested.  The default status is STOPPED because there could be many
+            // application attempts in that status. Only update the job status if it's not STOPPED.
+            if (job != null && (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED)) {
+              job.setStatusDetail(yarnState);
+              job.setStatus(samzaStatus);
+            }
+          }
+        }
+      }
+    }, "org.apache.hadoop.yarn.client.cli.ApplicationCLI", "application", "-list", "-appStates", "ALL");
+
+    if (resultCode != 0) {
+      throw new SamzaException("Failed to get job status. Result code: " + resultCode);
+    }
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId());
+    getJobStatuses(Collections.singletonList(info));
+    return info;
+  }
+
+  /**
+   * Translates the YARN application state to the more generic Samza job status.
+   *
+   * @param yarnState the YARN application state to translate.
+   * @return          the corresponding Samza job status.
+   */
+  private JobStatus yarnStateToSamzaStatus(YarnApplicationState yarnState) {
+    switch (yarnState) {
+      case RUNNING:
+        return JobStatus.STARTED;
+      case NEW:
+      case NEW_SAVING:
+      case SUBMITTED:
+      case ACCEPTED:
+        return JobStatus.STARTING;
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+      default:
+        return JobStatus.STOPPED;
+    }
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
new file mode 100644 (file)
index 0000000..e0224c6
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.SamzaRestConfig;
+
+
+/**
+ * Instantiates all the resources that are shipped with the REST service.
+ */
+public class DefaultResourceFactory implements ResourceFactory {
+  @Override
+  public List<? extends Object> getResourceInstances(Config config) {
+    return Collections.singletonList(new JobsResource(new JobsResourceConfig(config)));
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
new file mode 100644 (file)
index 0000000..a566db5
--- /dev/null
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.Collections;
+import javax.inject.Singleton;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.proxy.job.AbstractJobProxy;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.proxy.job.JobProxyFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The REST resource for jobs. Handles all requests for the jobs collection
+ * or individual job instances.
+ */
+@Singleton
+@Path("/v1/jobs")
+public class JobsResource {
+  private static final Logger log = LoggerFactory.getLogger(JobsResource.class);
+
+  /** The primary interface for interacting with jobs. */
+  private final JobProxy jobProxy;
+
+  /**
+   * Initializes a JobResource with {@link JobProxy} from the
+   * {@link JobProxyFactory} class specified in the configuration.
+   *
+   * @param config  the configuration containing the {@link JobProxyFactory} class.
+   */
+  public JobsResource(JobsResourceConfig config) {
+    jobProxy = AbstractJobProxy.fromFactory(config);
+  }
+
+  /**
+   * Gets the {@link Job} for all the jobs installed on this host.
+   *
+   * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response} containing a list of
+   * {@link Job} for all the installed Samza jobs installed on this host.
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getInstalledJobs() {
+    try {
+      return Response.ok(jobProxy.getAllJobStatuses()).build();
+    } catch (Exception e) {
+      log.error("Error in getInstalledJobs.", e);
+      return errorResponse(e.getMessage());
+    }
+  }
+
+  /**
+   * Gets the {@link Job} for the job instance specified by jobName and jobId if
+   * it is installed on this host.
+   *
+   * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME}.
+   * @param jobId   the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}.
+   * @return        a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response}
+   *                containing a {@link Job} for the Samza job if it is
+   *                installed on this host. {@link javax.ws.rs.core.Response.Status#NOT_FOUND} and
+   *                {@link javax.ws.rs.core.Response.Status#INTERNAL_SERVER_ERROR} can occur for corresponding errors.
+   */
+  @GET
+  @Path("/{jobName}/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getJob(
+      @PathParam("jobName") final String jobName,
+      @PathParam("jobId") final String jobId) {
+    JobInstance jobInstance = new JobInstance(jobName, jobId);
+    try {
+      if (!jobProxy.jobExists(jobInstance)) {
+        return Response.status(Response.Status.NOT_FOUND).entity(Collections.singletonMap("message",
+            String.format("%s does not exist.", jobInstance))).build();
+      }
+
+      Job job = jobProxy.getJobStatus(jobInstance);
+      return Response.ok(job).build();
+    } catch (Exception e) {
+      log.error("Error in getJob.", e);
+      return errorResponse(e.getMessage());
+    }
+  }
+
+  /**
+   *
+   * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME}.
+   * @param jobId   the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}.
+   * @param status   the {@link JobStatus} to which the job will transition.
+   * @return        a {@link javax.ws.rs.core.Response.Status#ACCEPTED} {@link javax.ws.rs.core.Response}
+   *                containing a {@link Job} for the Samza job if it is
+   *                installed on this host. {@link javax.ws.rs.core.Response.Status#NOT_FOUND}
+   *                {@link javax.ws.rs.core.Response.Status#BAD_REQUEST} and
+   *                {@link javax.ws.rs.core.Response.Status#INTERNAL_SERVER_ERROR} can occur for corresponding errors.
+   */
+  @PUT
+  @Path("/{jobName}/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateJobStatus(
+      @PathParam("jobName") final String jobName,
+      @PathParam("jobId") final String jobId,
+      @QueryParam("status") String status) {
+    JobInstance jobInstance = new JobInstance(jobName, jobId);
+    try {
+      if (!jobProxy.jobExists(jobInstance)) {
+        return Response.status(Response.Status.NOT_FOUND).entity(Collections
+            .singletonMap("message", String.format("Job %s instance %s is not installed on this host.", jobName, jobId))).build();
+      }
+
+      if (status == null) {
+        throw new IllegalArgumentException("Unrecognized status parameter: " + status);
+      }
+
+      JobStatus samzaStatus = JobStatus.valueOf(status.toUpperCase());
+      switch (samzaStatus) {
+        case STARTED:
+          log.info("Starting {}", jobInstance);
+          jobProxy.start(jobInstance);
+          Job infoStarted = jobProxy.getJobStatus(jobInstance);
+          return Response.accepted(infoStarted).build();
+        case STOPPED:
+          log.info("Stopping {}", jobInstance);
+          jobProxy.stop(jobInstance);
+          Job infoStopped = jobProxy.getJobStatus(jobInstance);
+          return Response.accepted(infoStopped).build();
+        default:
+          throw new IllegalArgumentException("Unsupported status: " + status);
+      }
+    } catch (IllegalArgumentException e) {
+      log.info(String.format("Illegal arguments updateJobStatus. JobName:%s JobId:%s Status=%s", jobName, jobId, status), e);
+      return Response.status(Response.Status.BAD_REQUEST).entity(
+          Collections.singletonMap("message", e.getMessage())).build();
+    } catch (Exception e) {
+      log.error("Error in updateJobStatus.", e);
+      return errorResponse(String.format("Error type: %s message: %s", e.toString(), e.getMessage()));
+    }
+  }
+
+  /**
+   * Constructs a consistent format for error responses. This method should be used for every error case.
+   *
+   * @param message the error message to report.
+   * @return        the {@link Response} containing the error message.
+   */
+  private Response errorResponse(String message) {
+    return Response.serverError().entity(Collections.singletonMap("message", message)).build();
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
new file mode 100644 (file)
index 0000000..527482d
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.proxy.job.JobProxyFactory;
+
+
+/**
+ * Configurations for the {@link JobsResource} endpoint.
+ */
+public class JobsResourceConfig extends MapConfig {
+  /**
+   * Specifies the canonical name of the {@link JobProxyFactory} class to produce
+   * {@link JobProxy} instances.
+   *
+   * To use your own proxy, implement the factory and specify the class for this config.
+   */
+  public static final String CONFIG_JOB_PROXY_FACTORY = "job.proxy.factory.class";
+
+  /**
+   * The path where all the Samza jobs are installed (unzipped). Each subdirectory of this path
+   * is expected to be a Samza job installation and corresponds to one {@link InstallationRecord}.
+   */
+  public static final String CONFIG_JOB_INSTALLATIONS_PATH = "job.installations.path";
+
+  /**
+   * Specifies the canonical name of the {@link org.apache.samza.config.ConfigFactory} to read the job configs.
+   */
+  public static final String CONFIG_JOB_CONFIG_FACTORY = "job.config.factory.class";
+
+  public JobsResourceConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * @see JobsResourceConfig#CONFIG_JOB_CONFIG_FACTORY
+   * @return the canonical name of the {@link JobProxyFactory} class to produce {@link JobProxy} instances.
+   */
+  public String getJobProxyFactory() {
+    return get(CONFIG_JOB_PROXY_FACTORY);
+  }
+
+  /**
+   * @see JobsResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
+   * @return the path where all the Samza jobs are installed (unzipped).
+   */
+  public String getInstallationsPath() {
+    return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH));
+  }
+
+  /**
+   * Ensures a usable file path when the user specifies a tilde for the home path.
+   *
+   * @param rawPath the original path.
+   * @return        the updated path with the tilde resolved to home.
+   */
+  private static String sanitizePath(String rawPath) {
+    if (rawPath == null) {
+      return null;
+    }
+    return rawPath.replaceFirst("^~", System.getProperty("user.home"));
+  }
+
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java
new file mode 100644 (file)
index 0000000..be83eb6
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.List;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Instantiates a resource using the provided config.
+ *
+ * This is used to instantiate and register a specific instance of the object rather than registering the class.
+ */
+public interface ResourceFactory {
+
+  /**
+   * Constructs and returns resource instances to register with the server.
+   *
+   * @param config  the server config used to initialize the objects.
+   * @return        a collection of instances to register with the server.
+   */
+  List<? extends Object> getResourceInstances(Config config);
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java
new file mode 100644 (file)
index 0000000..af3bf0a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.script;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * A script output handler processes the stream of output from the stdout and stderr channels of a script.
+ */
+public interface ScriptOutputHandler {
+
+  /**
+   * Processes the script output represented by the InputStream.
+   *
+   * Implementations must fully process the stream or the script may hang.
+   *
+   * @param output the stream of output from the script.
+   * @throws IOException if there are problems reading the output.
+   */
+  void processScriptOutput(InputStream output)
+      throws IOException;
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java
new file mode 100644 (file)
index 0000000..dbb9849
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.script;
+
+import java.io.FileNotFoundException;
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+/**
+ * Defines the protocol for getting script paths.
+ */
+public interface ScriptPathProvider {
+  /**
+   * @param jobInstance             the job instance which may be used to access the job installation for the script.
+   * @param scriptName              the name of the script file. Not the full path.
+   * @return                        the full path to the specified script.
+   * @throws FileNotFoundException  if the script does not exist.
+   */
+  String getScriptPath(JobInstance jobInstance, String scriptName)
+      throws FileNotFoundException;
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
new file mode 100644 (file)
index 0000000..b70a0f1
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.script;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Runs a script process and returns the exit code.
+ *
+ * The script can be run with an output handler or with output redirected to console.
+ */
+public class ScriptRunner {
+  private static final Logger log = LoggerFactory.getLogger(ScriptRunner.class);
+  private static final int DEFAULT_SCRIPT_CMD_TIMEOUT_S = 30;
+  private int scriptTimeout = DEFAULT_SCRIPT_CMD_TIMEOUT_S;
+
+  protected long getScriptTimeoutS() {
+    return scriptTimeout;
+  }
+
+  /**
+   * Runs a script with IO inherited from the current Java process. Typically this redirects to console.
+   *
+   * @param scriptPath            the path to the script file.
+   * @param args                  the command line args to pass to the script.
+   * @return                      the exit code returned by the script.
+   * @throws IOException          if there was a problem running the process.
+   * @throws InterruptedException if the thread is interrupted while waiting for the process to finish.
+   */
+  public int runScript(String scriptPath, String... args)
+      throws IOException, InterruptedException {
+    ProcessBuilder processBuilder = getProcessBuilder(scriptPath, args);
+    Process p = processBuilder.inheritIO().start();
+
+    return waitForExitValue(p);
+  }
+
+  /**
+   * @param scriptPath            the path to the script file.
+   * @param outputHandler         the handler for any stdout and stderr produced by the script.
+   * @param args                  the command line args to pass to the script.
+   * @return                      the exit code returned by the script.
+   * @throws IOException          if there was a problem running the process.
+   * @throws InterruptedException if the thread is interrupted while waiting for the process to finish.
+   */
+  public int runScript(String scriptPath, ScriptOutputHandler outputHandler, String... args)
+      throws IOException, InterruptedException {
+    ProcessBuilder processBuilder = getProcessBuilder(scriptPath, args);
+    Process p = processBuilder.redirectErrorStream(true).start();
+
+    InputStream output = p.getInputStream();
+    outputHandler.processScriptOutput(output);
+
+    return waitForExitValue(p);
+  }
+
+  /**
+   * @param scriptPath  the path to the script file.
+   * @param args        the command line args to pass to the script.
+   * @return            a {@link java.lang.ProcessBuilder} for the script and args.
+   */
+  private ProcessBuilder getProcessBuilder(String scriptPath, String[] args) {
+    List<String> command = new ArrayList<>(args.length + 1);
+    command.add(scriptPath);
+    command.addAll(Arrays.asList(args));
+
+    log.debug("Building process with command {}", command);
+    return new ProcessBuilder(command);
+  }
+
+  /**
+   * Waits for a finite time interval for the script to complete.
+   *
+   * @param p                     the process on which this method will wait.
+   * @return                      the exit code returned by the process.
+   * @throws InterruptedException if the thread is interrupted while waiting for the process to finish.
+   */
+  private int waitForExitValue(final Process p)
+      throws InterruptedException {
+    log.debug("Waiting for the exit value for process {}", p);
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          p.waitFor();
+        } catch (InterruptedException ignore) {
+          return;
+        }
+      }
+    });
+
+    t.start();
+    try {
+      t.join(TimeUnit.MILLISECONDS.convert(getScriptTimeoutS(), TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new SamzaException("Timeout running shell command", e);
+    }
+
+    int exitVal = p.exitValue();
+    log.debug("Exit value {}", exitVal);
+    return exitVal;
+  }
+}
diff --git a/samza-rest/src/main/resources/log4j.xml b/samza-rest/src/main/resources/log4j.xml
new file mode 100644 (file)
index 0000000..c5c1556
--- /dev/null
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
+    <param name="File" value="${samza.log.dir}/samza-rest-service.log" />
+    <param name="MaxFileSize" value="256MB" />
+    <param name="MaxBackupIndex" value="20" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+    </layout>
+  </appender>
+
+  <logger name="org.apache.hadoop">
+    <level value="off" />
+  </logger>
+
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+  </root>
+</log4j:configuration>
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
new file mode 100644 (file)
index 0000000..1da3430
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.monitor.mock.ExceptionThrowingMonitor;
+import org.apache.samza.monitor.mock.InstantSchedulingProvider;
+import org.apache.samza.rest.SamzaRestConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestMonitorService {
+
+    @Test
+    public void testGetMonitorsFromClassName() {
+        // Test that monitors are instantiated properly from config strings.
+        Monitor monitor = null;
+        try {
+            monitor = MonitorLoader.fromClassName("org.apache.samza.monitor.mock.DummyMonitor");
+        } catch (InstantiationException e) {
+            fail();
+        }
+
+        // Object should implement monitor().
+        try {
+            monitor.monitor();
+        } catch (Exception e) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testMonitorExceptionIsolation() {
+        // Test that an exception from a monitor doesn't bubble up out of the scheduler.
+        Monitor monitor = new ExceptionThrowingMonitor();
+        InstantSchedulingProvider provider = new InstantSchedulingProvider();
+
+        // Initialize with a monitor that immediately throws an exception when run.
+        Map<String, String> map = new HashMap<>();
+        map.put(SamzaRestConfig.CONFIG_MONITOR_CLASSES, "org.apache.samza.monitor.mock.ExceptionThrowingMonitor");
+        map.put(SamzaRestConfig.CONFIG_MONITOR_INTERVAL_MS, "1");
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config, provider);
+
+        // This will throw if the exception isn't caught within the provider.
+        monitorService.start();
+        monitorService.stop();
+    }
+
+    @Test
+    public void testScheduledExecutorSchedulingProvider() {
+        // Test that the monitor is scheduled by the ScheduledExecutorSchedulingProvider
+        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
+        ScheduledExecutorSchedulingProvider provider =
+                new ScheduledExecutorSchedulingProvider(executorService);
+
+        // notifyingMonitor.monitor() should be called repeatedly.
+        final CountDownLatch wasCalledLatch = new CountDownLatch(3);
+
+        final Monitor notifyingMonitor = new Monitor() {
+            @Override
+            public void monitor() {
+                wasCalledLatch.countDown();
+            }
+        };
+
+        Runnable runnableMonitor = new Runnable() {
+            public void run() {
+                try {
+                    notifyingMonitor.monitor();
+                } catch (Exception e) {
+                    // Must be caught because they are checked in monitor()
+                    fail();
+                }
+            }
+        };
+
+        // monitor should get called every 1ms, so if await() misses the first call, there will be more.
+        provider.schedule(runnableMonitor, 1);
+
+        try {
+            assertTrue(wasCalledLatch.await(5l, TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            executorService.shutdownNow();
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java
new file mode 100644 (file)
index 0000000..8621db1
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.monitor.Monitor;
+
+public class DummyMonitor implements Monitor {
+
+    public void monitor() {
+        // Do nothing!
+    }
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java
new file mode 100644 (file)
index 0000000..c4f3f73
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.monitor.Monitor;
+
+import java.io.IOException;
+
+public class ExceptionThrowingMonitor implements Monitor {
+    public void monitor() throws IOException {
+        throw new IOException("I don't know what I was expecting.");
+    }
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
new file mode 100644 (file)
index 0000000..6ae80e6
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.monitor.SchedulingProvider;
+
+/**
+ * Instead of scheduling a monitor to run, just runs it ASAP.
+ */
+public class InstantSchedulingProvider implements SchedulingProvider {
+
+    public void schedule(Runnable runnableMonitor, int interval) {
+        runnableMonitor.run();
+    }
+
+    // Nothing to stop because no deferred task was started
+    public void stop() {}
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
new file mode 100644 (file)
index 0000000..7db437b
--- /dev/null
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Form;
+import javax.ws.rs.core.Response;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.SamzaRestApplication;
+import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.mock.MockJobProxy;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobsResource extends JerseyTest {
+  ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper();
+
+  @Override
+  protected Application configure() {
+    Map<String, String> map = new HashMap<>();
+    map.put(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, "org.apache.samza.rest.resources.mock.MockJobProxyFactory");
+    map.put(JobsResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH, ".");
+    SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
+    return new SamzaRestApplication(config);
+  }
+
+  @Test
+   public void testGetJobs()
+      throws IOException {
+
+    Response resp = target("v1/jobs").request().get();
+    assertEquals(200, resp.getStatus());
+    final Job[] jobs = objectMapper.readValue(resp.readEntity(String.class), Job[].class);
+    assertEquals(4, jobs.length);
+
+    assertEquals(MockJobProxy.JOB_INSTANCE_1_NAME, jobs[0].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_1_ID, jobs[0].getJobId());
+    assertStatusNotDefault(jobs[0]);
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, jobs[1].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, jobs[1].getJobId());
+    assertStatusNotDefault(jobs[1]);
+    assertEquals(MockJobProxy.JOB_INSTANCE_3_NAME, jobs[2].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_3_ID, jobs[2].getJobId());
+    assertStatusNotDefault(jobs[2]);
+    assertEquals(MockJobProxy.JOB_INSTANCE_4_NAME, jobs[3].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_4_ID, jobs[3].getJobId());
+    assertStatusNotDefault(jobs[3]);
+    resp.close();
+  }
+
+  @Test
+   public void testPostJobs()
+      throws IOException {
+    Response resp = target("v1/jobs").request().post(Entity.text(""));
+    assertEquals(405, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testPutJobs()
+      throws IOException {
+    Response resp = target("v1/jobs").request().put(Entity.text(""));
+    assertEquals(405, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testGetJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request().get();
+    assertEquals(200, resp.getStatus());
+    final Job job2 = objectMapper.readValue(resp.readEntity(String.class), Job.class);
+
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId());
+    assertStatusNotDefault(job2);
+    resp.close();
+  }
+
+  @Test
+  public void testPostJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request().post(
+        Entity.text(""));
+    assertEquals(405, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testGetJobNameNotFound()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", "BadJobName", MockJobProxy.JOB_INSTANCE_2_ID)).request().get();
+    assertEquals(404, resp.getStatus());
+
+    final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("does not exist"));
+    resp.close();
+  }
+
+  @Test
+  public void testGetJobIdNotFound()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, "BadJobId")).request().get();
+    assertEquals(404, resp.getStatus());
+
+    final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("does not exist"));
+    resp.close();
+  }
+
+  @Test
+  public void testGetJobNameWithoutId()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s", MockJobProxy.JOB_INSTANCE_2_NAME)).request().get();
+    assertEquals(404, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testStartJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID))
+        .queryParam("status", "started").request().put(Entity.form(new Form()));
+    assertEquals(202, resp.getStatus());
+
+    final Job job2 = objectMapper.readValue(resp.readEntity(String.class), Job.class);
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId());
+    assertStatusNotDefault(job2);
+    resp.close();
+  }
+
+  @Test
+  public void testStopJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID))
+        .queryParam("status", "stopped").request().put(Entity.form(new Form()));
+    assertEquals(202, resp.getStatus());
+
+    final Job job2 = objectMapper.readValue(resp.readEntity(String.class), Job.class);
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId());
+    assertStatusNotDefault(job2);
+    resp.close();
+  }
+
+  @Test
+  public void testPutBadJobStatus()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID))
+        .queryParam("status", "BADSTATUS").request().put(Entity.form(new Form()));
+    assertEquals(400, resp.getStatus());
+
+    final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message").contains("BADSTATUS"));
+    resp.close();
+  }
+
+  @Test
+  public void testPutMissingStatus()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request()
+        .put(Entity.form(new Form()));
+    assertEquals(400, resp.getStatus());
+
+    final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message").contains("status"));
+    resp.close();
+  }
+
+  private void assertStatusNotDefault(Job job)  {
+    // Job status should be populated, not the defaults.
+    // We're not testing whether it value matches a specific value here,
+    // just that the value reflects whatever the JobStatusProvider returns.
+    assertFalse(JobStatus.UNKNOWN == job.getStatus());
+    assertNotNull(job.getStatusDetail());
+  }
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java
new file mode 100644 (file)
index 0000000..96f8db5
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.apache.samza.rest.proxy.job.AbstractJobProxy;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobStatusProvider;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+public class MockJobProxy extends AbstractJobProxy {
+
+  public static final String JOB_INSTANCE_1_NAME = "Job1";
+  public static final String JOB_INSTANCE_1_ID = "i001";
+
+  public static final String JOB_INSTANCE_2_NAME = "Job1";
+  public static final String JOB_INSTANCE_2_ID = "i002";
+
+  public static final String JOB_INSTANCE_3_NAME = "Job2";
+  public static final String JOB_INSTANCE_3_ID = "i001";
+
+  public static final String JOB_INSTANCE_4_NAME = "Job3";
+  public static final String JOB_INSTANCE_4_ID = "1";
+  /**
+   * Required constructor.
+   *
+
+   * @param config  the config containing the installations path.
+   */
+  public MockJobProxy(JobsResourceConfig config) {
+    super(config);
+  }
+
+  @Override
+  protected JobStatusProvider getJobStatusProvider() {
+    return new MockJobStatusProvider();
+  }
+
+  @Override
+  protected Set<JobInstance> getAllJobInstances() {
+    Set<JobInstance> validatedInstallations = new LinkedHashSet<>();
+
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_1_NAME, JOB_INSTANCE_1_ID));
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_2_NAME, JOB_INSTANCE_2_ID));
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_3_NAME, JOB_INSTANCE_3_ID));
+
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_4_NAME, JOB_INSTANCE_4_ID));
+
+    return validatedInstallations;
+  }
+
+  @Override
+  public void start(JobInstance jobInstance)
+      throws Exception {
+
+  }
+
+  @Override
+  public void stop(JobInstance jobInstance)
+      throws Exception {
+
+  }
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java
new file mode 100644 (file)
index 0000000..03e95b1
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.proxy.job.JobProxyFactory;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+public class MockJobProxyFactory implements JobProxyFactory{
+  @Override
+  public JobProxy getJobProxy(JobsResourceConfig config) {
+    return new MockJobProxy(config);
+  }
+}
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java
new file mode 100644 (file)
index 0000000..df0f18a
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobStatusProvider;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+
+
+public class MockJobStatusProvider implements JobStatusProvider {
+  @Override
+  public void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException {
+     for (Job info : jobs) {
+       setStatusStarted(info);
+     }
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId());
+    setStatusStarted(info);
+    return info;
+  }
+
+  private void setStatusStarted(Job info) {
+    info.setStatus(JobStatus.STARTED);
+    info.setStatusDetail("RUNNING");
+  }
+}
diff --git a/samza-shell/src/main/bash/kill-yarn-job-by-name.sh b/samza-shell/src/main/bash/kill-yarn-job-by-name.sh
new file mode 100755 (executable)
index 0000000..06eacc7
--- /dev/null
@@ -0,0 +1,38 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+# Get the id for the app with the specified name that is also ACCEPTED or RUNNING status
+APP_ID=$(exec "$(dirname $0)"/run-class.sh org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | grep "[[:space:]]$1[[:space:]]" | grep "application_" | awk -F ' ' '{ print $1 }')
+echo "Job name ($1) matched app IDs: ($APP_ID)"
+
+# If the app id was not found, it either doesn't exist or was already stopped.
+if [ -z "$APP_ID" ];
+then
+  exit 0
+fi
+
+# Verify that only one application matches
+COUNT=$(echo "$APP_ID" | wc -l)
+if [ $COUNT -gt 1 ];
+then
+  exit 150
+fi
+
+# Kill the job and check the return code
+"$(dirname $0)"/kill-yarn-job.sh "$APP_ID"
index 4c1aa10..6ea62b4 100644 (file)
@@ -20,7 +20,8 @@ include \
   'samza-api',
   'samza-elasticsearch',
   'samza-log4j',
-  'samza-shell'
+  'samza-shell',
+  'samza-rest'
 
 def scalaModules = [
         'samza-core',