SAMZA-1089: Enable YarnJob and ClientHelper to kill a job by name rather than YARN...
authorJacob Maes <jmaes@linkedin.com>
Fri, 7 Apr 2017 21:32:02 +0000 (14:32 -0700)
committerJacob Maes <jmaes@linkedin.com>
Fri, 7 Apr 2017 21:32:02 +0000 (14:32 -0700)
Missed a couple files in the previous commit to enable YarnJob to kill and get status of a Job based on the job name rather than the YARN ApplicationName.

These changes have been manually verified in a Yarn cluster at LI.

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes #114 from jmakes/samza-1089-3

samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala

index c7b1b6d..dc1ead3 100644 (file)
@@ -220,6 +220,35 @@ class ClientHelper(conf: Configuration) extends Logging {
     appId
   }
 
+  /**
+    * Gets the list of Yarn [[org.apache.hadoop.yarn.api.records.ApplicationId]]
+    * corresponding to the specified appName and are "active".
+    * <p>
+    * In this context, "active" means that the application is starting or running
+    * and is not in any terminated state.
+    * <p>
+    * In Samza, an appName should be unique and there should only be one active
+    * applicationId for a given appName, but this can be violated in unusual cases
+    * like while troubleshooting a new application. So, this method returns as many
+    * active application ids as it finds.
+    *
+    * @param appName the app name as found in the Name column in the Yarn application list.
+    * @return        the active application ids.
+    */
+  def getActiveApplicationIds(appName: String): List[ApplicationId] = {
+    val getAppsRsp = yarnClient.getApplications
+
+    getAppsRsp
+      .asScala
+        .filter(appRep => ((
+            Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
+            || New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
+            )
+          && appName.equals(appRep.getName)))
+        .map(appRep => appRep.getApplicationId)
+        .toList
+  }
+
   def status(appId: ApplicationId): Option[ApplicationStatus] = {
     val statusResponse = yarnClient.getApplicationReport(appId)
     convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
@@ -251,7 +280,7 @@ class ClientHelper(conf: Configuration) extends Logging {
   private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
     (state, status) match {
       case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
-      case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
+      case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
       case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
       case _ => Some(Running)
     }
@@ -337,6 +366,8 @@ class ClientHelper(conf: Configuration) extends Logging {
     * Cleanup application staging directory.
     */
   def cleanupStagingDir(): Unit = {
-    YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
+    if (jobContext != null) {
+      YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
+    }
   }
 }
index 46dc4d1..030f914 100644 (file)
@@ -134,16 +134,22 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
   }
 
   def getStatus: ApplicationStatus = {
-    appId match {
-      case Some(appId) => client.status(appId).getOrElse(null)
-      case None => null
+    getAppId match {
+      case Some(appId) =>
+        logger.info("Getting status for applicationId %s" format appId)
+        client.status(appId).getOrElse(null)
+      case None =>
+        logger.info("Unable to report status because no applicationId could be found.")
+        null
     }
   }
 
   def kill: YarnJob = {
-    appId match {
+    // getAppId only returns one appID. Run multiple times to kill dupes (erroneous case)
+    getAppId match {
       case Some(appId) =>
         try {
+          logger.info("Killing applicationId {}", appId)
           client.kill(appId)
         } finally {
           client.cleanupStagingDir
@@ -152,4 +158,28 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
     }
     this
   }
+
+  private def getAppId: Option[ApplicationId] = {
+    appId match {
+      case Some(applicationId) =>
+       appId
+      case None =>
+        // Get by name
+        config.getName match {
+          case Some(jobName) =>
+            val applicationName = "%s_%s" format(jobName, config.getJobId.getOrElse(1))
+            logger.info("Fetching status from YARN for application name %s" format applicationName)
+            val applicationIds = client.getActiveApplicationIds(applicationName)
+
+            applicationIds.foreach(applicationId =>  {
+              logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName))
+            })
+
+            // Only return one, because there should only be one.
+            applicationIds.headOption
+          case None =>
+            None
+        }
+    }
+  }
 }