SAMZA-1246: ApplicatonRunner.stats() should include exception in case of failure
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Wed, 3 May 2017 19:17:22 +0000 (12:17 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Wed, 3 May 2017 19:17:22 +0000 (12:17 -0700)
Current when ApplicationRunner.stats() only returns the enum representing the status. It also need to include the exception if the status is failure.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jmakes@apache.org>

Closes #154 from xinyuiscool/SAMZA-1246

checkstyle/checkstyle-suppressions.xml
samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java
samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala

index 428ac93..a88b341 100644 (file)
@@ -27,5 +27,8 @@
            files="TestZkProcessorLatch.java"
            lines="91-275"/>
            -->
+  <suppress checks="ConstantName"
+            files="ApplicationStatus.java"
+            lines="26-29"/>
 </suppressions>
 
index c41430a..baf095b 100644 (file)
@@ -22,16 +22,63 @@ package org.apache.samza.job;
 /**
  * Status of a {@link org.apache.samza.job.StreamJob} during and after its run.
  */
-public enum ApplicationStatus {
-  Running("Running"), SuccessfulFinish("SuccessfulFinish"), UnsuccessfulFinish("UnsuccessfulFinish"), New("New");
+public class ApplicationStatus {
+  public static final ApplicationStatus New = new ApplicationStatus(StatusCode.New, null);
+  public static final ApplicationStatus Running = new ApplicationStatus(StatusCode.Running, null);
+  public static final ApplicationStatus SuccessfulFinish = new ApplicationStatus(StatusCode.SuccessfulFinish, null);
+  public static final ApplicationStatus UnsuccessfulFinish = new ApplicationStatus(StatusCode.UnsuccessfulFinish, null);
 
-  private final String str;
+  public enum StatusCode {
+    New,
+    Running,
+    SuccessfulFinish,
+    UnsuccessfulFinish
+  }
+
+  private final StatusCode statusCode;
+  private final Throwable throwable;
+
+  private ApplicationStatus(StatusCode code, Throwable t) {
+    this.statusCode = code;
+    this.throwable = t;
+  }
 
-  private ApplicationStatus(String str) {
-    this.str = str;
+  public StatusCode getStatusCode() {
+    return statusCode;
   }
 
+  public Throwable getThrowable() {
+    return throwable;
+  }
+
+  @Override
   public String toString() {
-    return str;
+    return statusCode.name();
+  }
+
+
+  public static ApplicationStatus unsuccessfulFinish(Throwable t) {
+    return new ApplicationStatus(StatusCode.UnsuccessfulFinish, t);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj == this) {
+      return true;
+    }
+    if (obj.getClass() != getClass()) {
+      return false;
+    }
+
+    ApplicationStatus rhs = (ApplicationStatus) obj;
+    return statusCode.equals(rhs.statusCode);
+  }
+
+  @Override
+  public int hashCode() {
+    return statusCode.hashCode();
   }
 }
index b761d86..8fb991a 100644 (file)
@@ -96,7 +96,7 @@ public abstract class ApplicationRunner {
 
   /**
    * Get the collective status of the Samza jobs represented by {@link StreamApplication}.
-   * Returns {@link ApplicationStatus#Running} if any of the jobs are running.
+   * Returns {@link ApplicationRunner} running if all jobs are running.
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
    * @return the status of the application
index bff0f1c..3efbdc1 100644 (file)
@@ -133,7 +133,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       awaitComplete();
 
     } catch (Throwable t) {
-      appStatus = ApplicationStatus.UnsuccessfulFinish;
+      appStatus = ApplicationStatus.unsuccessfulFinish(t);
       throw new SamzaException("Failed to run application", t);
     } finally {
       if (coordination != null) {
@@ -247,7 +247,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     latch.await();
 
     if (throwable.get() != null) {
-      appStatus = ApplicationStatus.UnsuccessfulFinish;
       throw throwable.get();
     } else {
       appStatus = ApplicationStatus.SuccessfulFinish;
index d5f6e21..5daecda 100644 (file)
@@ -84,8 +84,9 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public ApplicationStatus status(StreamApplication app) {
     try {
-      boolean finished = false;
-      boolean unsuccessfulFinish = false;
+      boolean hasNewJobs = false;
+      boolean hasRunningJobs = false;
+      ApplicationStatus unsuccessfulFinishStatus = null;
 
       ExecutionPlan plan = getExecutionPlan(app);
       for (JobConfig jobConfig : plan.getJobConfigs()) {
@@ -93,25 +94,36 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
         ApplicationStatus status = runner.status();
         log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
 
-        switch (status) {
+        switch (status.getStatusCode()) {
+          case New:
+            hasNewJobs = true;
+            break;
           case Running:
-            return ApplicationStatus.Running;
+            hasRunningJobs = true;
+            break;
           case UnsuccessfulFinish:
-            unsuccessfulFinish = true;
+            unsuccessfulFinishStatus = status;
+            break;
           case SuccessfulFinish:
-            finished = true;
             break;
           default:
             // Do nothing
         }
       }
 
-      if (unsuccessfulFinish) {
-        return ApplicationStatus.UnsuccessfulFinish;
-      } else if (finished) {
+      if (hasNewJobs) {
+        // There are jobs not started, report as New
+        return ApplicationStatus.New;
+      } else if (hasRunningJobs) {
+        // All jobs are started, some are running
+        return ApplicationStatus.Running;
+      } else if (unsuccessfulFinishStatus != null) {
+        // All jobs are finished, some are not successful
+        return unsuccessfulFinishStatus;
+      } else {
+        // All jobs are finished successfully
         return ApplicationStatus.SuccessfulFinish;
       }
-      return ApplicationStatus.New;
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
     }
index f068773..bc2d74b 100644 (file)
@@ -70,7 +70,7 @@ class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager
 
   def kill: StreamJob = {
     process.destroyForcibly
-    jobStatus = Some(UnsuccessfulFinish);
+    jobStatus = Some(UnsuccessfulFinish)
     ProcessJob.this
   }
 
index f4fc757..e6c0896 100644 (file)
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.yarn
 
+
+import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.samza.config.{Config, JobConfig, YarnConfig}
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
@@ -260,7 +262,7 @@ class ClientHelper(conf: Configuration) extends Logging {
   def status(appId: ApplicationId): Option[ApplicationStatus] = {
     val statusResponse = yarnClient.getApplicationReport(appId)
     info("Got state: %s, final status: %s".format(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus))
-    toAppStatus(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
+    toAppStatus(statusResponse)
   }
 
   def kill(appId: ApplicationId) {
@@ -280,21 +282,29 @@ class ClientHelper(conf: Configuration) extends Logging {
     status match {
       case Some(status) => getAppsRsp
         .asScala
-        .filter(appRep => status.equals(toAppStatus(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
+        .filter(appRep => status.equals(toAppStatus(appRep).get))
         .toList
       case None => getAppsRsp.asScala.toList
     }
   }
 
   private def isActiveApplication(applicationReport: ApplicationReport): Boolean = {
-    (Running.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get)
-    || New.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get))
+    (Running.equals(toAppStatus(applicationReport).get)
+    || New.equals(toAppStatus(applicationReport).get))
   }
 
-  private def toAppStatus(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
+  def toAppStatus(applicationReport: ApplicationReport): Option[ApplicationStatus] = {
+    val state = applicationReport.getYarnApplicationState
+    val status = applicationReport.getFinalApplicationStatus
     (state, status) match {
       case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) | (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) => Some(SuccessfulFinish)
-      case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
+      case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) =>
+        val diagnostics = applicationReport.getDiagnostics
+        if (StringUtils.isEmpty(diagnostics)) {
+          Some(UnsuccessfulFinish)
+        } else {
+          Some(ApplicationStatus.unsuccessfulFinish(new SamzaException(diagnostics)))
+        }
       case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
       case _ => Some(Running)
     }
index ad8337b..ee947ae 100644 (file)
@@ -22,9 +22,15 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
 import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.records.ApplicationReport
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig}
+import org.apache.samza.job.ApplicationStatus
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNotNull
 import org.mockito.Mockito._
 import org.mockito.Matchers.any
 import org.scalatest.FunSuite
@@ -88,4 +94,31 @@ class TestClientHelper extends FunSuite {
     assert(ret.size == 1)
     assert(ret.contains("some.keytab"))
   }
+
+  test("test toAppStatus") {
+    val appReport = mock[ApplicationReport]
+    when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FAILED)
+    when(appReport.getDiagnostics).thenReturn("some yarn diagnostics")
+
+    var appStatus = clientHelper.toAppStatus(appReport).get
+    assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
+    assertNotNull(appStatus.getThrowable)
+
+    when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.NEW)
+    appStatus = clientHelper.toAppStatus(appReport).get
+    assertEquals(appStatus, ApplicationStatus.New)
+
+    when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+    when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.FAILED)
+    appStatus = clientHelper.toAppStatus(appReport).get
+    assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
+
+    when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.KILLED)
+    appStatus = clientHelper.toAppStatus(appReport).get
+    assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
+
+    when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+    appStatus = clientHelper.toAppStatus(appReport).get
+    assertEquals(appStatus, ApplicationStatus.SuccessfulFinish)
+  }
 }