SAMZA-1250: JobRunner.kill doesn't terminate cleanly with YarnJob.
authorJacob Maes <jmaes@linkedin.com>
Mon, 1 May 2017 20:44:54 +0000 (13:44 -0700)
committerJacob Maes <jmaes@linkedin.com>
Mon, 1 May 2017 20:44:54 +0000 (13:44 -0700)
1. The ClientHelper now checks inactive application IDs so it can get status for terminated jobs in addition to running jobs
2. JobRunner.kill() waits for any finish, not just successful finish.
3. A killed job is now considered successful.

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #152 from jmakes/samza-1250

samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala

index b2f5bd0..f34db99 100644 (file)
@@ -137,7 +137,7 @@ class JobRunner(config: Config) extends Logging {
     info("waiting for job to terminate")
 
     // Wait until the job has terminated, then exit.
-    Option(job.waitForStatus(SuccessfulFinish, 5000)) match {
+    Option(job.waitForFinish(5000)) match {
       case Some(appStatus) => {
         if (SuccessfulFinish.equals(appStatus)) {
           info("job terminated successfully - " + appStatus)
index dc1ead3..f4fc757 100644 (file)
@@ -236,22 +236,31 @@ class ClientHelper(conf: Configuration) extends Logging {
     * @return        the active application ids.
     */
   def getActiveApplicationIds(appName: String): List[ApplicationId] = {
-    val getAppsRsp = yarnClient.getApplications
+    val applicationReports = yarnClient.getApplications
 
-    getAppsRsp
+    applicationReports
       .asScala
-        .filter(appRep => ((
-            Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
-            || New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
-            )
-          && appName.equals(appRep.getName)))
-        .map(appRep => appRep.getApplicationId)
+        .filter(applicationReport => isActiveApplication(applicationReport)
+          && appName.equals(applicationReport.getName))
+        .map(applicationReport => applicationReport.getApplicationId)
         .toList
   }
 
+  def getPreviousApplicationIds(appName: String): List[ApplicationId] = {
+    val applicationReports = yarnClient.getApplications
+
+    applicationReports
+      .asScala
+      .filter(applicationReport => (!(isActiveApplication(applicationReport))
+        && appName.equals(applicationReport.getName)))
+      .map(applicationReport => applicationReport.getApplicationId)
+      .toList
+  }
+
   def status(appId: ApplicationId): Option[ApplicationStatus] = {
     val statusResponse = yarnClient.getApplicationReport(appId)
-    convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
+    info("Got state: %s, final status: %s".format(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus))
+    toAppStatus(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
   }
 
   def kill(appId: ApplicationId) {
@@ -271,15 +280,20 @@ class ClientHelper(conf: Configuration) extends Logging {
     status match {
       case Some(status) => getAppsRsp
         .asScala
-        .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
+        .filter(appRep => status.equals(toAppStatus(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
         .toList
       case None => getAppsRsp.asScala.toList
     }
   }
 
-  private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
+  private def isActiveApplication(applicationReport: ApplicationReport): Boolean = {
+    (Running.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get)
+    || New.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get))
+  }
+
+  private def toAppStatus(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
     (state, status) match {
-      case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
+      case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) | (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) => Some(SuccessfulFinish)
       case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
       case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
       case _ => Some(Running)
index 030f914..5230b0f 100644 (file)
@@ -140,7 +140,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
         client.status(appId).getOrElse(null)
       case None =>
         logger.info("Unable to report status because no applicationId could be found.")
-        null
+        ApplicationStatus.SuccessfulFinish
     }
   }
 
@@ -171,12 +171,17 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
             logger.info("Fetching status from YARN for application name %s" format applicationName)
             val applicationIds = client.getActiveApplicationIds(applicationName)
 
-            applicationIds.foreach(applicationId =>  {
-              logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName))
-            })
+            if (applicationIds.nonEmpty) {
+              // Only return latest one, because there should only be one.
+              logger.info("Matching active ids: " + applicationIds.sorted.reverse.toString())
+              applicationIds.sorted.reverse.headOption
+            } else {
+              // Couldn't find an active applicationID. Use one the latest finished ID.
+              val pastApplicationIds = client.getPreviousApplicationIds(applicationName)
+              // Don't log because there could be many, many previous app IDs for an application.
+              pastApplicationIds.sorted.reverse.headOption  // Get latest
+            }
 
-            // Only return one, because there should only be one.
-            applicationIds.headOption
           case None =>
             None
         }