SAMZA-562: Make Samza minimum supported Yarn version 2.6
authorAleksandar Pejakovic <a.pejakovic@levi9.com>
Fri, 25 Sep 2015 17:26:32 +0000 (10:26 -0700)
committerYi Pan (Data Infrastructure) <yipan@linkedin.com>
Fri, 25 Sep 2015 17:28:10 +0000 (10:28 -0700)
Squashed commit of the following:

commit ccfca5ec369317fa8dab8deac84f3e9a32f5cb58
Author: Yi Pan (Data Infrastructure) <yipan@linkedin.com>
Date:   Wed Sep 23 12:42:52 2015 -0700

    SAMZA-563: Remove Yarn 2.4 and 2.5 in bin/check-all.sh

commit c409a103d3e764550a6da452ca53042cbd5e3b6c
Author: Yi Pan (Data Infrastructure) <yipan@linkedin.com>
Date:   Tue Sep 22 18:39:20 2015 -0700

    SAMZA-563: Make Samza's minimum supported YARN to 2.6

README.md
bin/check-all.sh
docs/learn/tutorials/versioned/run-in-multi-node-yarn.md
gradle/dependency-versions.gradle
samza-test/src/main/python/configs/downloads.json
samza-test/src/main/python/configs/yarn.json
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala

index c736669..ce111a2 100644 (file)
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ After the bootstrap script has completed, the regular gradlew instructions below
 
 #### Scala and YARN
 
-Samza builds with [Scala](http://www.scala-lang.org/) 2.10 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.4.0, by default. Use the -PscalaVersion switches to change Scala versions. Samza supports building Scala with 2.10.
+Samza builds with [Scala](http://www.scala-lang.org/) 2.10 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.6.0, by default. Use the -PscalaVersion switches to change Scala versions. Samza supports building Scala with 2.10.
 
     ./gradlew -PscalaVersion=2.10 clean build
 
index 67bf776..b1f65e7 100755 (executable)
@@ -23,7 +23,7 @@ set -e
 
 SCALAs=( "2.10" )
 JDKs=( "JAVA7_HOME" "JAVA8_HOME" )
-YARNs=( "2.4.0" "2.5.0" )
+YARNs=( "2.6.0" "2.7.1" )
 
 # get base directory
 home_dir=`pwd`
index 7a9fdf4..b286226 100644 (file)
@@ -27,12 +27,12 @@ If you already have a multi-node YARN cluster (such as CDH5 cluster), you can sk
 
 ### Basic YARN Setting
 
-1\. Download [YARN 2.4](http://mirror.symnds.com/software/Apache/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz) to /tmp and untar it.
+1\. Download [YARN 2.6](http://mirror.symnds.com/software/Apache/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz) to /tmp and untar it.
 
 {% highlight bash %}
 cd /tmp
-tar -xvf hadoop-2.4.0.tar.gz
-cd hadoop-2.4.0
+tar -xvf hadoop-2.6.0.tar.gz
+cd hadoop-2.6.0
 {% endhighlight %}
 
 2\. Set up environment variables.
@@ -111,7 +111,7 @@ Add the following code:
 7\. Basically, you copy the hadoop file in your host machine to slave machines. (172.21.100.35, in my case):
 
 {% highlight bash %}
-scp -r . 172.21.100.35:/tmp/hadoop-2.4.0
+scp -r . 172.21.100.35:/tmp/hadoop-2.6.0
 echo 172.21.100.35 > conf/slaves
 sbin/start-yarn.sh
 {% endhighlight %}
index 36d564b..9c179e4 100644 (file)
@@ -30,7 +30,7 @@
   kafkaVersion = "0.8.2.1"
   commonsHttpClientVersion = "3.1"
   rocksdbVersion = "3.10.1"
-  yarnVersion = "2.4.0"
+  yarnVersion = "2.6.0"
   slf4jVersion = "1.6.2"
   log4jVersion = "1.2.17"
   guavaVersion = "17.0"
index a75756f..c890e70 100644 (file)
@@ -1,5 +1,5 @@
 {
   "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz",
   "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz",
-  "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz"
+  "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz"
 }
index 9b0143d..b1492bf 100644 (file)
@@ -3,34 +3,34 @@
   "yarn_rm_hosts": {
     "yarn_rm_instance_0": "localhost"
   },
-  "yarn_rm_start_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh start resourcemanager",
-  "yarn_rm_stop_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh stop resourcemanager",
+  "yarn_rm_start_cmd": "hadoop-2.6.0/sbin/yarn-daemon.sh start resourcemanager",
+  "yarn_rm_stop_cmd": "hadoop-2.6.0/sbin/yarn-daemon.sh stop resourcemanager",
   "yarn_rm_install_path": "deploy/yarn_rm",
   "yarn_rm_post_install_cmds": [
-    "sed -i.bak 's/<configuration>/<configuration><property><name>yarn.nodemanager.vmem-pmem-ratio<\\/name><value>10<\\/value><\\/property>/' hadoop-2.4.0/etc/hadoop/yarn-site.xml",
-    "mkdir -p hadoop-2.4.0/conf",
-    "chmod 755 hadoop-2.4.0/conf",
-    "cp hadoop-2.4.0/etc/hadoop/yarn-site.xml hadoop-2.4.0/conf/yarn-site.xml"
+    "sed -i.bak 's/<configuration>/<configuration><property><name>yarn.nodemanager.vmem-pmem-ratio<\\/name><value>10<\\/value><\\/property>/' hadoop-2.6.0/etc/hadoop/yarn-site.xml",
+    "mkdir -p hadoop-2.6.0/conf",
+    "chmod 755 hadoop-2.6.0/conf",
+    "cp hadoop-2.6.0/etc/hadoop/yarn-site.xml hadoop-2.6.0/conf/yarn-site.xml"
   ],
-  "yarn_rm_executable": "hadoop-2.4.0.tar.gz",
+  "yarn_rm_executable": "hadoop-2.6.0.tar.gz",
   "yarn_rm_logs": [
-    "hadoop-2.4.0/logs"
+    "hadoop-2.6.0/logs"
   ],
   "yarn_nm_hosts": {
     "yarn_nm_instance_0": "localhost"
   },
-  "yarn_nm_start_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh start nodemanager",
-  "yarn_nm_stop_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh stop nodemanager",
+  "yarn_nm_start_cmd": "hadoop-2.6.0/sbin/yarn-daemon.sh start nodemanager",
+  "yarn_nm_stop_cmd": "hadoop-2.6.0/sbin/yarn-daemon.sh stop nodemanager",
   "yarn_nm_install_path": "deploy/yarn_nm",
   "yarn_nm_post_install_cmds": [
-    "sed -i.bak 's/<configuration>/<configuration><property><name>yarn.nodemanager.vmem-pmem-ratio<\\/name><value>10<\\/value><\\/property>/' hadoop-2.4.0/etc/hadoop/yarn-site.xml",
-    "mkdir -p hadoop-2.4.0/conf",
-    "chmod 755 hadoop-2.4.0/conf",
-    "cp hadoop-2.4.0/etc/hadoop/yarn-site.xml hadoop-2.4.0/conf/yarn-site.xml"
+    "sed -i.bak 's/<configuration>/<configuration><property><name>yarn.nodemanager.vmem-pmem-ratio<\\/name><value>10<\\/value><\\/property>/' hadoop-2.6.0/etc/hadoop/yarn-site.xml",
+    "mkdir -p hadoop-2.6.0/conf",
+    "chmod 755 hadoop-2.6.0/conf",
+    "cp hadoop-2.6.0/etc/hadoop/yarn-site.xml hadoop-2.6.0/conf/yarn-site.xml"
   ],
-  "yarn_nm_executable": "hadoop-2.4.0.tar.gz",
+  "yarn_nm_executable": "hadoop-2.6.0.tar.gz",
   "yarn_nm_logs": [
-    "hadoop-2.4.0/logs"
+    "hadoop-2.6.0/logs"
   ],
   "yarn_driver_configs": {
     "yarn.resourcemanager.hostname": "localhost"
index df5992e..881ffd7 100644 (file)
 
 package org.apache.samza.job.yarn
 
+import java.net.URL
 import java.nio.ByteBuffer
+import java.util
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.SamzaException
-import org.mockito.Mockito
+import org.apache.samza.coordinator.JobCoordinator
 import org.junit.Assert._
 import org.junit.Test
-import scala.annotation.elidable
-import scala.annotation.elidable.ASSERTION
-import java.net.URL
-import org.apache.samza.coordinator.JobCoordinator
+import org.mockito.Mockito
 
 class TestSamzaAppMasterLifecycle {
   val coordinator = new JobCoordinator(null, null, null)
@@ -64,6 +65,9 @@ class TestSamzaAppMasterLifecycle {
         override def setContainersFromPreviousAttempts(containers: java.util.List[Container]): Unit = Unit
         override def setNMTokensFromPreviousAttempts(nmTokens: java.util.List[NMToken]): Unit = Unit
         override def setQueue(queue: String): Unit = Unit
+
+        override def setSchedulerResourceTypes(types: util.EnumSet[SchedulerResourceTypes]): Unit = {}
+        override def getSchedulerResourceTypes: util.EnumSet[SchedulerResourceTypes] = null
       }
     }
     override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
index 2eec65f..444d9a2 100644 (file)
 
 package org.apache.samza.job.yarn
 
+import java.net.URL
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.yarn.api.protocolrecords.{AllocateResponse, RegisterApplicationMasterResponse}
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.Partition
-import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.YarnConfig.Config2Yarn
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{Config, MapConfig}
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
+import org.apache.samza.job.yarn.TestSamzaAppMasterTaskManager._
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemAdmin, SystemFactory, SystemStreamMetadata, SystemStreamPartition}
 import org.junit.Test
+
 import scala.collection.JavaConversions._
-import TestSamzaAppMasterTaskManager._
-import java.net.URL
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.coordinator.JobCoordinator
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.container.TaskName
-import org.apache.samza.job.model.TaskModel
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -100,7 +95,10 @@ object TestSamzaAppMasterTaskManager {
     def getRelease = release
     def resetRelease = release.clear
     override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null
-    override def allocate(progressIndicator: Float): AllocateResponse = response
+    override def allocate(progressIndicator: Float): AllocateResponse = {
+      response.getAMCommand
+      response
+    }
     override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = ()
     override def addContainerRequest(req: ContainerRequest) { requests ::= req }
     override def removeContainerRequest(req: ContainerRequest) {}
@@ -135,10 +133,13 @@ object TestSamzaAppMasterTaskManager {
       override def setIncreasedContainers(increase: java.util.List[ContainerResourceIncrease]): Unit = Unit
 
       override def getAMCommand = if (reboot) {
-        AMCommand.AM_RESYNC
+        throw new ApplicationAttemptNotFoundException("Test - out of sync")
       } else {
         null
       }
+
+      override def getAMRMToken: Token = null
+      override def setAMRMToken(amRMToken: Token): Unit = {}
     }
 }