SAMZA-1769: Remote app runner status
authorBoris Shkolnik <boryas@apache.org>
Fri, 3 Aug 2018 18:21:43 +0000 (11:21 -0700)
committerBoris S <boryas@apache.org>
Fri, 3 Aug 2018 18:21:43 +0000 (11:21 -0700)
Call for 'status' or 'kill' does not require Execution plan calculation.

Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Xinyu Liu <xinyuiscool@github.com>

Closes #597 from sborya/RemoteAppRunnerStatus and squashes the following commits:

7e1feea0 [Boris S] retry
1c0b3e4f [Boris S] checkstyle
e8d8d517 [Boris S] skipp graph planner for app status command
88f85595 [Boris S] Merge branch 'master' of https://github.com/apache/samza
0edf343b [Boris S] Merge branch 'master' of https://github.com/apache/samza
67e611ee [Boris S] Merge branch 'master' of https://github.com/apache/samza
dd39d089 [Boris S] Merge branch 'master' of https://github.com/apache/samza
1ad58d43 [Boris S] Merge branch 'master' of https://github.com/apache/samza
06b1ac36 [Boris Shkolnik] Merge branch 'master' of https://github.com/sborya/samza
5e6f5fb5 [Boris Shkolnik] Merge branch 'master' of https://github.com/apache/samza
010fa168 [Boris S] Merge branch 'master' of https://github.com/apache/samza
bbffb79b [Boris S] Merge branch 'master' of https://github.com/apache/samza
d4620d66 [Boris S] Merge branch 'master' of https://github.com/apache/samza
410ce78b [Boris S] Merge branch 'master' of https://github.com/apache/samza
a31a7aa2 [Boris Shkolnik] reduce debugging from info to debug in KafkaCheckpointManager.java

samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java

index 99fdc51..0ecb35e 100644 (file)
@@ -96,74 +96,29 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
   @Override
   public void kill(StreamApplication app) {
-    StreamManager streamManager = null;
-    try {
-      streamManager = buildAndStartStreamManager();
-      ExecutionPlan plan = getExecutionPlan(app, streamManager);
 
-      plan.getJobConfigs().forEach(jobConfig -> {
-          LOG.info("Killing job {}", jobConfig.getName());
-          JobRunner runner = new JobRunner(jobConfig);
-          runner.kill();
-        });
+    // since currently we only support single actual remote job, we can get its status without
+    // building the execution plan.
+    try {
+      JobConfig jc = new JobConfig(config);
+      LOG.info("Killing job {}", jc.getName());
+      JobRunner runner = new JobRunner(jc);
+      runner.kill();
     } catch (Throwable t) {
       throw new SamzaException("Failed to kill application", t);
-    } finally {
-      if (streamManager != null) {
-        streamManager.stop();
-      }
     }
   }
 
   @Override
   public ApplicationStatus status(StreamApplication app) {
-    StreamManager streamManager = null;
-    try {
-      boolean hasNewJobs = false;
-      boolean hasRunningJobs = false;
-      ApplicationStatus unsuccessfulFinishStatus = null;
-
-      streamManager = buildAndStartStreamManager();
-      ExecutionPlan plan = getExecutionPlan(app, streamManager);
-      for (JobConfig jobConfig : plan.getJobConfigs()) {
-        ApplicationStatus status = getApplicationStatus(jobConfig);
-
-        switch (status.getStatusCode()) {
-          case New:
-            hasNewJobs = true;
-            break;
-          case Running:
-            hasRunningJobs = true;
-            break;
-          case UnsuccessfulFinish:
-            unsuccessfulFinishStatus = status;
-            break;
-          case SuccessfulFinish:
-            break;
-          default:
-            // Do nothing
-        }
-      }
 
-      if (hasNewJobs) {
-        // There are jobs not started, report as New
-        return New;
-      } else if (hasRunningJobs) {
-        // All jobs are started, some are running
-        return Running;
-      } else if (unsuccessfulFinishStatus != null) {
-        // All jobs are finished, some are not successful
-        return unsuccessfulFinishStatus;
-      } else {
-        // All jobs are finished successfully
-        return SuccessfulFinish;
-      }
+    // since currently we only support single actual remote job, we can get its status without
+    // building the execution plan
+    try {
+      JobConfig jc = new JobConfig(config);
+      return getApplicationStatus(jc);
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
-    } finally {
-      if (streamManager != null) {
-        streamManager.stop();
-      }
     }
   }
 
index 2ef2b33..2734d56 100644 (file)
 package org.apache.samza.runtime;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.job.StreamJob;
+import org.apache.samza.job.StreamJobFactory;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -50,4 +56,68 @@ public class TestRemoteApplicationRunner {
     boolean finished = runner.waitForFinish(Duration.ofMillis(1000));
     assertFalse("Application finished before the timeout.", finished);
   }
+
+  @Test
+  public void testGetStatus() {
+    Map m = new HashMap<String, String>();
+    m.put(JobConfig.JOB_NAME(), "jobName");
+    m.put(JobConfig.STREAM_JOB_FACTORY_CLASS(), MockStreamJobFactory.class.getName());
+
+    m.put(JobConfig.JOB_ID(), "newJob");
+    RemoteApplicationRunner runner = new RemoteApplicationRunner(new MapConfig());
+    Assert.assertEquals(ApplicationStatus.New, runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
+
+    m.put(JobConfig.JOB_ID(), "runningJob");
+    runner = new RemoteApplicationRunner(new JobConfig(new MapConfig(m)));
+    Assert.assertEquals(ApplicationStatus.Running, runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
+  }
+
+  static public class MockStreamJobFactory implements StreamJobFactory {
+
+    public MockStreamJobFactory() {
+    }
+
+    @Override
+    public StreamJob getJob(final Config config) {
+
+      StreamJob streamJob = new StreamJob() {
+        JobConfig c = (JobConfig) config;
+
+        @Override
+        public StreamJob submit() {
+          return null;
+        }
+
+        @Override
+        public StreamJob kill() {
+          return null;
+        }
+
+        @Override
+        public ApplicationStatus waitForFinish(long timeoutMs) {
+          return null;
+        }
+
+        @Override
+        public ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs) {
+          return null;
+        }
+
+        @Override
+        public ApplicationStatus getStatus() {
+          String jobId = c.getJobId().get();
+          switch (jobId) {
+            case "newJob":
+              return ApplicationStatus.New;
+            case "runningJob":
+              return ApplicationStatus.Running;
+            default:
+              return ApplicationStatus.UnsuccessfulFinish;
+          }
+        }
+      };
+
+      return streamJob;
+    }
+  }
 }