[SPARK-24149][YARN] Retrieve all federated namespaces tokens
authorMarco Gaido <marcogaido91@gmail.com>
Fri, 18 May 2018 20:04:00 +0000 (13:04 -0700)
committerMarcelo Vanzin <vanzin@cloudera.com>
Fri, 18 May 2018 20:04:00 +0000 (13:04 -0700)
## What changes were proposed in this pull request?

Hadoop 3 introduces HDFS federation. This means that multiple namespaces are allowed on the same HDFS cluster. In Spark, we need to ask the delegation token for all the namenodes (for each namespace), otherwise accessing any other namespace different from the default one (for which we already fetch the delegation token) fails.

The PR adds the automatic discovery of all the namenodes related to all the namespaces available according to the configs in hdfs-site.xml.

## How was this patch tested?

manual tests in dockerized env

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21216 from mgaido91/SPARK-24149.

docs/running-on-yarn.md
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

index c9e68c3..4dbcbea 100644 (file)
@@ -424,9 +424,12 @@ To use a custom metrics.properties for the application master and executors, upd
 
 Standard Kerberos support in Spark is covered in the [Security](security.html#kerberos) page.
 
-In YARN mode, when accessing Hadoop file systems, aside from the service hosting the user's home
-directory, Spark will also automatically obtain delegation tokens for the service hosting the
-staging directory of the Spark application.
+In YARN mode, when accessing Hadoop filesystems, Spark will automatically obtain delegation tokens
+for:
+
+- the filesystem hosting the staging directory of the Spark application (which is the default
+  filesystem if `spark.yarn.stagingDir` is not set);
+- if Hadoop federation is enabled, all the federated filesystems in the configuration.
 
 If an application needs to interact with other secure Hadoop filesystems, their URIs need to be
 explicitly provided to Spark at launch time. This is done by listing them in the
index 8eda6cb..7250e58 100644 (file)
@@ -200,7 +200,29 @@ object YarnSparkHadoopUtil {
       .map(new Path(_).getFileSystem(hadoopConf))
       .getOrElse(FileSystem.get(hadoopConf))
 
-    filesystemsToAccess + stagingFS
+    // Add the list of available namenodes for all namespaces in HDFS federation.
+    // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
+    // namespaces.
+    val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+      Set.empty
+    } else {
+      val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
+      // Retrieving the filesystem for the nameservices where HA is not enabled
+      val filesystemsWithoutHA = nameservices.flatMap { ns =>
+        Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
+          new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
+        }
+      }
+      // Retrieving the filesystem for the nameservices where HA is enabled
+      val filesystemsWithHA = nameservices.flatMap { ns =>
+        Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
+          new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
+        }
+      }
+      (filesystemsWithoutHA ++ filesystemsWithHA).toSet
+    }
+
+    filesystemsToAccess ++ hadoopFilesystems + stagingFS
   }
 
 }
index f21353a..61c0c43 100644 (file)
@@ -21,7 +21,8 @@ import java.io.{File, IOException}
 import java.nio.charset.StandardCharsets
 
 import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.io.Text
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.Matchers
@@ -141,4 +142,66 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
 
   }
 
+  test("SPARK-24149: retrieve all namenodes from HDFS") {
+    val sparkConf = new SparkConf()
+    val basicFederationConf = new Configuration()
+    basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
+    basicFederationConf.set("dfs.nameservices", "ns1,ns2")
+    basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
+    basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
+    val basicFederationExpected = Set(
+      new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
+      new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
+    val basicFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(
+      sparkConf, basicFederationConf)
+    basicFederationResult should be (basicFederationExpected)
+
+    // when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them
+    val viewFsConf = new Configuration()
+    viewFsConf.addResource(basicFederationConf)
+    viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
+    viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
+    val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
+    YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)
+
+    // invalid config should not throw NullPointerException
+    val invalidFederationConf = new Configuration()
+    invalidFederationConf.addResource(basicFederationConf)
+    invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
+    val invalidFederationExpected = Set(
+      new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
+    val invalidFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(
+      sparkConf, invalidFederationConf)
+    invalidFederationResult should be (invalidFederationExpected)
+
+    // no namespaces defined, ie. old case
+    val noFederationConf = new Configuration()
+    noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
+    val noFederationExpected = Set(
+      new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
+    val noFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, noFederationConf)
+    noFederationResult should be (noFederationExpected)
+
+    // federation and HA enabled
+    val federationAndHAConf = new Configuration()
+    federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
+    federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
+    federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
+    federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
+    federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
+      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
+    federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
+      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
+
+    val federationAndHAExpected = Set(
+      new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
+      new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
+    val federationAndHAResult = YarnSparkHadoopUtil.hadoopFSsToAccess(
+      sparkConf, federationAndHAConf)
+    federationAndHAResult should be (federationAndHAExpected)
+  }
 }