[SPARK-25295][K8S] Fix executor names collision
authorStavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Thu, 13 Sep 2018 05:02:59 +0000 (22:02 -0700)
committerYinan Li <ynli@google.com>
Thu, 13 Sep 2018 05:02:59 +0000 (22:02 -0700)
## What changes were proposed in this pull request?
Fixes the collision issue with spark executor names in client mode, see SPARK-25295 for the details.
It follows the cluster name convention as app-name will be used as the prefix and if that is not defined we use "spark" as the default prefix. Eg. `spark-pi-1536781360723-exec-1` where spark-pi is the name of the app passed at the config side or transformed if it contains illegal characters.

Also fixes the issue with spark app name having spaces in cluster mode.
If you run the Spark Pi test in client mode it passes.
The tricky part is the user may set the app name:
https://github.com/apache/spark/blob/3030b82c89d3e45a2e361c469fbc667a1e43b854/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala#L30
If i do:

```
./bin/spark-submit
...
 --deploy-mode cluster --name "spark pi"
...
```
it will fail as the app name is used for the prefix of driver's pod name and it cannot have spaces (according to k8s conventions).

## How was this patch tested?

Manually by running spark job in client mode.
To reproduce do:
```
kubectl create -f service.yaml
kubectl create -f pod.yaml
```
 service.yaml :
```
kind: Service
apiVersion: v1
metadata:
  name: spark-test-app-1-svc
spec:
  clusterIP: None
  selector:
    spark-app-selector: spark-test-app-1
  ports:
  - protocol: TCP
    name: driver-port
    port: 7077
    targetPort: 7077
  - protocol: TCP
    name: block-manager
    port: 10000
    targetPort: 10000
```
pod.yaml:

```
apiVersion: v1
kind: Pod
metadata:
  name: spark-test-app-1
  labels:
    spark-app-selector: spark-test-app-1
spec:
  containers:
  - name: spark-test
    image: skonto/spark:k8s-client-fix
    imagePullPolicy: Always
    command:
      - 'sh'
      - '-c'
      -  "/opt/spark/bin/spark-submit
              --verbose
              --master k8s://https://kubernetes.default.svc
              --deploy-mode client
              --class org.apache.spark.examples.SparkPi
              --conf spark.app.name=spark
              --conf spark.executor.instances=1
              --conf spark.kubernetes.container.image=skonto/spark:k8s-client-fix
              --conf spark.kubernetes.container.image.pullPolicy=Always
              --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token
              --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
              --conf spark.executor.memory=500m
              --conf spark.executor.cores=1
              --conf spark.executor.instances=1
              --conf spark.driver.host=spark-test-app-1-svc.default.svc
              --conf spark.driver.port=7077
              --conf spark.driver.blockManager.port=10000
              local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 1000000"
```

Closes #22405 from skonto/fix-k8s-client-mode-executor-names.

Authored-by: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Signed-off-by: Yinan Li <ynli@google.com>
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

index 3aa35d4..cae6e7d 100644 (file)
@@ -24,6 +24,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
 import org.apache.spark.internal.config.ConfigEntry
 
 
@@ -220,10 +221,20 @@ private[spark] object KubernetesConf {
     val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
       sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
 
+    // If no prefix is defined then we are in pure client mode
+    // (not the one used by cluster mode inside the container)
+    val appResourceNamePrefix = {
+      if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
+        getResourceNamePrefix(getAppName(sparkConf))
+      } else {
+        sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+      }
+    }
+
     KubernetesConf(
       sparkConf.clone(),
       KubernetesExecutorSpecificConf(executorId, driverPod),
-      sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
+      appResourceNamePrefix,
       appId,
       executorLabels,
       executorAnnotations,
index 986c950..edeaa38 100644 (file)
@@ -211,11 +211,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
     // a unique app ID (captured by spark.app.id) in the format below.
     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
-    val launchTime = System.currentTimeMillis()
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
-    val kubernetesResourceNamePrefix = {
-      s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
-    }
+    val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName)
     sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse(""))
     val kubernetesConf = KubernetesConf.createDriverConf(
       sparkConf,
@@ -254,3 +251,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     }
   }
 }
+
+private[spark] object KubernetesClientApplication {
+
+  def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark")
+
+  def getResourceNamePrefix(appName: String): String = {
+    val launchTime = System.currentTimeMillis()
+    s"$appName-$launchTime"
+      .trim
+      .toLowerCase
+      .replaceAll("\\s+", "-")
+      .replaceAll("\\.", "-")
+      .replaceAll("[^a-z0-9\\-]", "")
+      .replaceAll("-+", "-")
+  }
+}
index e847f85..0e617b0 100644 (file)
@@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
             executorSpecificConf.executorId,
             TEST_SPARK_APP_ID,
             Some(driverPod))
-          k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
+
+          // Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX
+          // has not be set for the tests and thus KubernetesConf will use a random
+          // string for the prefix, based on the app name, and this comparison here will fail.
+          val k8sConfCopy = k8sConf
+            .copy(appResourceNamePrefix = "")
+            .copy(sparkConf = conf)
+          val expectedK8sConfCopy = expectedK8sConf
+            .copy(appResourceNamePrefix = "")
+            .copy(sparkConf = conf)
+
+            k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
             // Since KubernetesConf.createExecutorConf clones the SparkConf object, force
             // deep equality comparison for the SparkConf object and use object equality
             // comparison on all other fields.
-            k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf)
+            k8sConfCopy == expectedK8sConfCopy
         }
       }
     })
-
 }