[SPARK-23984][K8S][TEST] Added Integration Tests for PySpark on Kubernetes
authorIlan Filonenko <if56@cornell.edu>
Sat, 14 Jul 2018 00:19:28 +0000 (17:19 -0700)
committermcheah <mcheah@palantir.com>
Sat, 14 Jul 2018 00:19:28 +0000 (17:19 -0700)
## What changes were proposed in this pull request?

I added integration tests for PySpark ( + checking JVM options + RemoteFileTest) which wasn't properly merged in the initial integration test PR.

## How was this patch tested?

I tested this with integration tests using:

`dev/dev-run-integration-tests.sh --spark-tgz spark-2.4.0-SNAPSHOT-bin-2.7.3.tgz`

Author: Ilan Filonenko <if56@cornell.edu>

Closes #21583 from ifilonenko/master.

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala

index 6e334c8..774c393 100644 (file)
@@ -22,7 +22,7 @@ import java.util.UUID
 import java.util.regex.Pattern
 
 import com.google.common.io.PatternFilenameFilter
-import io.fabric8.kubernetes.api.model.{Container, Pod}
+import io.fabric8.kubernetes.api.model.Pod
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.time.{Minutes, Seconds, Span}
@@ -43,6 +43,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
   private var kubernetesTestComponents: KubernetesTestComponents = _
   private var sparkAppConf: SparkAppConf = _
   private var image: String = _
+  private var pyImage: String = _
   private var containerLocalSparkDistroExamplesJar: String = _
   private var appLocator: String = _
   private var driverPodName: String = _
@@ -65,6 +66,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     val imageTag = getTestImageTag
     val imageRepo = getTestImageRepo
     image = s"$imageRepo/spark:$imageTag"
+    pyImage = s"$imageRepo/spark-py:$imageTag"
 
     val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
       .toFile
@@ -156,22 +158,77 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       })
   }
 
-  // TODO(ssuchter): Enable the below after debugging
-  // test("Run PageRank using remote data file") {
-  //   sparkAppConf
-  //     .set("spark.kubernetes.mountDependencies.filesDownloadDir",
-  //       CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
-  //     .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
-  //   runSparkPageRankAndVerifyCompletion(
-  //     appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
-  // }
+  test("Run extraJVMOptions check on driver") {
+    sparkAppConf
+      .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
+    runSparkJVMCheckAndVerifyCompletion(
+      expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
+  }
+
+  test("Run SparkRemoteFileTest using a remote data file") {
+    sparkAppConf
+      .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
+    runSparkRemoteCheckAndVerifyCompletion(
+      appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
+  }
+
+  test("Run PySpark on simple pi.py example") {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_PI,
+      mainClass = "",
+      expectedLogOnCompletion = Seq("Pi is roughly 3"),
+      appArgs = Array("5"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false)
+  }
+
+  test("Run PySpark with Python2 to test a pyfiles example") {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.pyspark.pythonversion", "2")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_FILES,
+      mainClass = "",
+      expectedLogOnCompletion = Seq(
+        "Python runtime version check is: True",
+        "Python environment version check is: True"),
+      appArgs = Array("python"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+  }
+
+  test("Run PySpark with Python3 to test a pyfiles example") {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.pyspark.pythonversion", "3")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_FILES,
+      mainClass = "",
+      expectedLogOnCompletion = Seq(
+        "Python runtime version check is: True",
+        "Python environment version check is: True"),
+      appArgs = Array("python3"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+  }
 
   private def runSparkPiAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
       appArgs: Array[String] = Array.empty[String],
-      appLocator: String = appLocator): Unit = {
+      appLocator: String = appLocator,
+      isJVM: Boolean = true ): Unit = {
     runSparkApplicationAndVerifyCompletion(
       appResource,
       SPARK_PI_MAIN_CLASS,
@@ -179,10 +236,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       appArgs,
       driverPodChecker,
       executorPodChecker,
-      appLocator)
+      appLocator,
+      isJVM)
   }
 
-  private def runSparkPageRankAndVerifyCompletion(
+  private def runSparkRemoteCheckAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
@@ -190,12 +248,50 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       appLocator: String = appLocator): Unit = {
     runSparkApplicationAndVerifyCompletion(
       appResource,
-      SPARK_PAGE_RANK_MAIN_CLASS,
-      Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
+      SPARK_REMOTE_MAIN_CLASS,
+      Seq(s"Mounting of ${appArgs.head} was true"),
       appArgs,
       driverPodChecker,
       executorPodChecker,
-      appLocator)
+      appLocator,
+      true)
+  }
+
+  private def runSparkJVMCheckAndVerifyCompletion(
+      appResource: String = containerLocalSparkDistroExamplesJar,
+      mainClass: String = SPARK_DRIVER_MAIN_CLASS,
+      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+      appArgs: Array[String] = Array("5"),
+      expectedJVMValue: Seq[String]): Unit = {
+    val appArguments = SparkAppArguments(
+      mainAppResource = appResource,
+      mainClass = mainClass,
+      appArgs = appArgs)
+    SparkAppLauncher.launch(
+      appArguments,
+      sparkAppConf,
+      TIMEOUT.value.toSeconds.toInt,
+      sparkHomeDir,
+      true)
+
+    val driverPod = kubernetesTestComponents.kubernetesClient
+      .pods()
+      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-role", "driver")
+      .list()
+      .getItems
+      .get(0)
+    doBasicDriverPodCheck(driverPod)
+
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      expectedJVMValue.foreach { e =>
+        assert(kubernetesTestComponents.kubernetesClient
+          .pods()
+          .withName(driverPod.getMetadata.getName)
+          .getLog
+          .contains(e), "The application did not complete.")
+      }
+    }
   }
 
   private def runSparkApplicationAndVerifyCompletion(
@@ -205,12 +301,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       appArgs: Array[String],
       driverPodChecker: Pod => Unit,
       executorPodChecker: Pod => Unit,
-      appLocator: String): Unit = {
+      appLocator: String,
+      isJVM: Boolean,
+      pyFiles: Option[String] = None): Unit = {
     val appArguments = SparkAppArguments(
       mainAppResource = appResource,
       mainClass = mainClass,
       appArgs = appArgs)
-    SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
+    SparkAppLauncher.launch(
+      appArguments,
+      sparkAppConf,
+      TIMEOUT.value.toSeconds.toInt,
+      sparkHomeDir,
+      isJVM,
+      pyFiles)
 
     val driverPod = kubernetesTestComponents.kubernetesClient
       .pods()
@@ -248,11 +352,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
   }
 
+  private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
+    assert(driverPod.getMetadata.getName === driverPodName)
+    assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
+    assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
+  }
+
   private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === image)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
   }
 
+  private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
+    assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
+    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+  }
+
   private def checkCustomSettings(pod: Pod): Unit = {
     assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
     assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
@@ -287,14 +402,22 @@ private[spark] object KubernetesSuite {
   val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
   val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
   val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+  val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
+  val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
   val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
+  val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
+  val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
+  val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
+  val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"
 
-  // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
+  val TEST_SECRET_NAME_PREFIX = "test-secret-"
+  val TEST_SECRET_KEY = "test-key"
+  val TEST_SECRET_VALUE = "test-data"
+  val TEST_SECRET_MOUNT_PATH = "/etc/secrets"
 
-  // val REMOTE_PAGE_RANK_DATA_FILE =
-  //   "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
-  // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
-  //   s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
+  val REMOTE_PAGE_RANK_DATA_FILE =
+    "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
+  val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
 
-  // case object ShuffleNotReadyException extends Exception
+  case object ShuffleNotReadyException extends Exception
 }
index b2471e5..a9b49a8 100644 (file)
@@ -97,21 +97,33 @@ private[spark] case class SparkAppArguments(
     appArgs: Array[String])
 
 private[spark] object SparkAppLauncher extends Logging {
-
   def launch(
       appArguments: SparkAppArguments,
       appConf: SparkAppConf,
       timeoutSecs: Int,
-      sparkHomeDir: Path): Unit = {
+      sparkHomeDir: Path,
+      isJVM: Boolean,
+      pyFiles: Option[String] = None): Unit = {
     val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
     logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
-    val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath,
+    val preCommandLine = if (isJVM) {
+      mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
       "--deploy-mode", "cluster",
       "--class", appArguments.mainClass,
-      "--master", appConf.get("spark.master")
-    ) ++ appConf.toStringArray :+
-      appArguments.mainAppResource) ++
-      appArguments.appArgs
-    ProcessUtils.executeProcess(commandLine, timeoutSecs)
+      "--master", appConf.get("spark.master"))
+    } else {
+      mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
+        "--deploy-mode", "cluster",
+        "--master", appConf.get("spark.master"))
+    }
+    val commandLine =
+      pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++
+        appConf.toStringArray :+ appArguments.mainAppResource
+
+    if (appArguments.appArgs.nonEmpty) {
+      commandLine += appArguments.appArgs.mkString(" ")
+    }
+    logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
+    ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
   }
 }