[SPARK-24248][K8S] Use level triggering and state reconciliation in scheduling and...
authormcheah <mcheah@palantir.com>
Thu, 14 Jun 2018 22:56:21 +0000 (15:56 -0700)
committermcheah <mcheah@palantir.com>
Thu, 14 Jun 2018 22:56:21 +0000 (15:56 -0700)
## What changes were proposed in this pull request?

Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore, one had to keep track of multiple hash tables.

We can do better here by:

1. Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to follow a level-triggered mechanism. This means that the controller will continuously monitor the API server via watches and polling, and on periodic passes, the controller will reconcile the current state of the cluster with the desired state. We implement this by introducing the concept of a pod snapshot, which is a given state of the executors in the Kubernetes cluster. We operate periodically on snapshots. To prevent overloading the API server with polling requests to get the state of the cluster (particularly for executor allocation where we want to be checking frequently to get executors to launch without unbearably bad latency), we use watches to populate snapshots by applying observed events to a previous snapshot to get a new snapshot. Whenever we do poll the cluster, the polled state replaces any existing snapshot - this ensures eventual consistency and mirroring of the cluster, as is desired in a level triggered architecture.

2. Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the snapshots.

## How was this patch tested?

Integration tests should test there's no regressions end to end. Unit tests to be updated, in particular focusing on different orderings of events, particularly accounting for when events come in unexpected ordering.

Author: mcheah <mcheah@palantir.com>

Closes #21366 from mccheah/event-queue-driven-scheduling.

27 files changed:
LICENSE
core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
licenses/LICENSE-jmock.txt [new file with mode: 0644]
pom.xml
resource-managers/kubernetes/core/pom.xml
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStore.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala [new file with mode: 0644]
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

diff --git a/LICENSE b/LICENSE
index 820f14d..cc1f580 100644 (file)
--- a/LICENSE
+++ b/LICENSE
@@ -237,6 +237,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
 
      (BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
      (BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
+     (BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
      (BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
      (BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
      (BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
index 165a15c..0f08a2b 100644 (file)
@@ -19,13 +19,12 @@ package org.apache.spark.util
 
 import java.util.concurrent._
 
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
 import scala.util.control.NonFatal
 
-import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
-
 import org.apache.spark.SparkException
 
 private[spark] object ThreadUtils {
@@ -104,6 +103,22 @@ private[spark] object ThreadUtils {
   }
 
   /**
+   * Wrapper over ScheduledThreadPoolExecutor.
+   */
+  def newDaemonThreadPoolScheduledExecutor(threadNamePrefix: String, numThreads: Int)
+      : ScheduledExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat(s"$threadNamePrefix-%d")
+      .build()
+    val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory)
+    // By default, a cancelled task is not automatically removed from the work queue until its delay
+    // elapses. We have to enable it manually.
+    executor.setRemoveOnCancelPolicy(true)
+    executor
+  }
+
+  /**
    * Run a piece of code in a new thread and return the result. Exception in the new thread is
    * thrown in the caller thread with an adjusted stack trace that removes references to this
    * method for clarity. The exception stack traces will be like the following
@@ -229,4 +244,14 @@ private[spark] object ThreadUtils {
     }
   }
   // scalastyle:on awaitready
+
+  def shutdown(
+      executor: ExecutorService,
+      gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
+    executor.shutdown()
+    executor.awaitTermination(gracePeriod.toMillis, TimeUnit.MILLISECONDS)
+    if (!executor.isShutdown) {
+      executor.shutdownNow()
+    }
+  }
 }
diff --git a/licenses/LICENSE-jmock.txt b/licenses/LICENSE-jmock.txt
new file mode 100644 (file)
index 0000000..ed7964f
--- /dev/null
@@ -0,0 +1,28 @@
+Copyright (c) 2000-2017, jMock.org
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer. Redistributions
+in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+Neither the name of jMock nor the names of its contributors may be
+used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pom.xml b/pom.xml
index 23bbd3b..4b4e6c1 100644 (file)
--- a/pom.xml
+++ b/pom.xml
         <scope>test</scope>
       </dependency>
       <dependency>
+        <groupId>org.jmock</groupId>
+        <artifactId>jmock-junit4</artifactId>
+        <scope>test</scope>
+        <version>2.8.4</version>
+      </dependency>
+      <dependency>
         <groupId>org.scalacheck</groupId>
         <artifactId>scalacheck_${scala.binary.version}</artifactId>
         <version>1.13.5</version>
index a62f271..a6dd47a 100644 (file)
     <!-- End of shaded deps. -->
 
     <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>okhttp</artifactId>
+      <version>3.8.1</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
 
     <dependency>
-      <groupId>com.squareup.okhttp3</groupId>
-      <artifactId>okhttp</artifactId>
-      <version>3.8.1</version>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock-junit4</artifactId>
+      <scope>test</scope>
     </dependency>
 
   </dependencies>
index 590deaa..bf33179 100644 (file)
@@ -176,6 +176,24 @@ private[spark] object Config extends Logging {
       .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
       .createWithDefaultString("1s")
 
+  val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
+    ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
+      .doc("Interval between polls against the Kubernetes API server to inspect the " +
+        "state of executors.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(interval => interval > 0, s"API server polling interval must be a" +
+        " positive time value.")
+      .createWithDefaultString("30s")
+
+  val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL =
+    ConfigBuilder("spark.kubernetes.executor.eventProcessingInterval")
+      .doc("Interval between successive inspection of executor events sent from the" +
+        " Kubernetes API.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(interval => interval > 0, s"Event processing interval must be a positive" +
+        " time value.")
+      .createWithDefaultString("1s")
+
   val MEMORY_OVERHEAD_FACTOR =
     ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
       .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
@@ -193,7 +211,6 @@ private[spark] object Config extends Logging {
         "Ensure that major Python version is either Python2 or Python3")
       .createWithDefault("2")
 
-
   val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
     "spark.kubernetes.authenticate.submission"
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala
new file mode 100644 (file)
index 0000000..83daddf
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+sealed trait ExecutorPodState {
+  def pod: Pod
+}
+
+case class PodRunning(pod: Pod) extends ExecutorPodState
+
+case class PodPending(pod: Pod) extends ExecutorPodState
+
+sealed trait FinalPodState extends ExecutorPodState
+
+case class PodSucceeded(pod: Pod) extends FinalPodState
+
+case class PodFailed(pod: Pod) extends FinalPodState
+
+case class PodDeleted(pod: Pod) extends FinalPodState
+
+case class PodUnknown(pod: Pod) extends ExecutorPodState
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
new file mode 100644 (file)
index 0000000..5a143ad
--- /dev/null
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, Utils}
+
+private[spark] class ExecutorPodsAllocator(
+    conf: SparkConf,
+    executorBuilder: KubernetesExecutorBuilder,
+    kubernetesClient: KubernetesClient,
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    clock: Clock) extends Logging {
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000)
+
+  private val kubernetesDriverPodName = conf
+    .get(KUBERNETES_DRIVER_POD_NAME)
+    .getOrElse(throw new SparkException("Must specify the driver pod name"))
+
+  private val driverPod = kubernetesClient.pods()
+    .withName(kubernetesDriverPodName)
+    .get()
+
+  // Executor IDs that have been requested from Kubernetes but have not been detected in any
+  // snapshot yet. Mapped to the timestamp when they were created.
+  private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
+
+  def start(applicationId: String): Unit = {
+    snapshotsStore.addSubscriber(podAllocationDelay) {
+      onNewSnapshots(applicationId, _)
+    }
+  }
+
+  def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total)
+
+  private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+    newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
+    // For all executors we've created against the API but have not seen in a snapshot
+    // yet - check the current time. If the current time has exceeded some threshold,
+    // assume that the pod was either never created (the API server never properly
+    // handled the creation request), or the API server created the pod but we missed
+    // both the creation and deletion events. In either case, delete the missing pod
+    // if possible, and mark such a pod to be rescheduled below.
+    newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
+      val currentTime = clock.getTimeMillis()
+      if (currentTime - timeCreated > podCreationTimeout) {
+        logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
+          s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
+          " previous allocation attempt tried to create it. The executor may have been" +
+          " deleted but the application missed the deletion event.")
+        Utils.tryLogNonFatalError {
+          kubernetesClient
+            .pods()
+            .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
+            .delete()
+        }
+        newlyCreatedExecutors -= execId
+      } else {
+        logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
+          s" was created ${currentTime - timeCreated} milliseconds ago.")
+      }
+    }
+
+    if (snapshots.nonEmpty) {
+      // Only need to examine the cluster as of the latest snapshot, the "current" state, to see if
+      // we need to allocate more executors or not.
+      val latestSnapshot = snapshots.last
+      val currentRunningExecutors = latestSnapshot.executorPods.values.count {
+        case PodRunning(_) => true
+        case _ => false
+      }
+      val currentPendingExecutors = latestSnapshot.executorPods.values.count {
+        case PodPending(_) => true
+        case _ => false
+      }
+      val currentTotalExpectedExecutors = totalExpectedExecutors.get
+      logDebug(s"Currently have $currentRunningExecutors running executors and" +
+        s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
+        s" have been requested but are pending appearance in the cluster.")
+      if (newlyCreatedExecutors.isEmpty
+        && currentPendingExecutors == 0
+        && currentRunningExecutors < currentTotalExpectedExecutors) {
+        val numExecutorsToAllocate = math.min(
+          currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
+        logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
+        for ( _ <- 0 until numExecutorsToAllocate) {
+          val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
+          val executorConf = KubernetesConf.createExecutorConf(
+            conf,
+            newExecutorId.toString,
+            applicationId,
+            driverPod)
+          val executorPod = executorBuilder.buildFromFeatures(executorConf)
+          val podWithAttachedContainer = new PodBuilder(executorPod.pod)
+            .editOrNewSpec()
+            .addToContainers(executorPod.container)
+            .endSpec()
+            .build()
+          kubernetesClient.pods().create(podWithAttachedContainer)
+          newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
+          logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+        }
+      } else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
+        // TODO handle edge cases if we end up with more running executors than expected.
+        logDebug("Current number of running executors is equal to the number of requested" +
+          " executors. Not scaling up further.")
+      } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
+        logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
+          s" executors to begin running before requesting for more executors. # of executors in" +
+          s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
+          s" created but we have not observed as being present in the cluster yet:" +
+          s" ${newlyCreatedExecutors.size}.")
+      }
+    }
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
new file mode 100644 (file)
index 0000000..b28d939
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.Cache
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsLifecycleManager(
+    conf: SparkConf,
+    executorBuilder: KubernetesExecutorBuilder,
+    kubernetesClient: KubernetesClient,
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    // Use a best-effort to track which executors have been removed already. It's not generally
+    // job-breaking if we remove executors more than once but it's ideal if we make an attempt
+    // to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
+    // bounds.
+    removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends Logging {
+
+  import ExecutorPodsLifecycleManager._
+
+  private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
+
+  def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+    snapshotsStore.addSubscriber(eventProcessingInterval) {
+      onNewSnapshots(schedulerBackend, _)
+    }
+  }
+
+  private def onNewSnapshots(
+      schedulerBackend: KubernetesClusterSchedulerBackend,
+      snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+    val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
+    snapshots.foreach { snapshot =>
+      snapshot.executorPods.foreach { case (execId, state) =>
+        state match {
+          case deleted@PodDeleted(_) =>
+            logDebug(s"Snapshot reported deleted executor with id $execId," +
+              s" pod name ${state.pod.getMetadata.getName}")
+            removeExecutorFromSpark(schedulerBackend, deleted, execId)
+            execIdsRemovedInThisRound += execId
+          case failed@PodFailed(_) =>
+            logDebug(s"Snapshot reported failed executor with id $execId," +
+              s" pod name ${state.pod.getMetadata.getName}")
+            onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
+          case succeeded@PodSucceeded(_) =>
+            logDebug(s"Snapshot reported succeeded executor with id $execId," +
+              s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
+              s" unusual unless Spark specifically informed the executor to exit.")
+            onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
+          case _ =>
+        }
+      }
+    }
+
+    // Reconcile the case where Spark claims to know about an executor but the corresponding pod
+    // is missing from the cluster. This would occur if we miss a deletion event and the pod
+    // transitions immediately from running io absent. We only need to check against the latest
+    // snapshot for this, and we don't do this for executors in the deleted executors cache or
+    // that we just removed in this round.
+    if (snapshots.nonEmpty) {
+      val latestSnapshot = snapshots.last
+      (schedulerBackend.getExecutorIds().map(_.toLong).toSet
+        -- latestSnapshot.executorPods.keySet
+        -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
+        if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
+          val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
+            s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
+            s" executor may have been deleted but the driver missed the deletion event."
+          logDebug(exitReasonMessage)
+          val exitReason = ExecutorExited(
+            UNKNOWN_EXIT_CODE,
+            exitCausedByApp = false,
+            exitReasonMessage)
+          schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
+          execIdsRemovedInThisRound += missingExecutorId
+        }
+      }
+    }
+    logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
+      s" from Spark that were either found to be deleted or non-existent in the cluster.")
+  }
+
+  private def onFinalNonDeletedState(
+      podState: FinalPodState,
+      execId: Long,
+      schedulerBackend: KubernetesClusterSchedulerBackend,
+      execIdsRemovedInRound: mutable.Set[Long]): Unit = {
+    removeExecutorFromK8s(podState.pod)
+    removeExecutorFromSpark(schedulerBackend, podState, execId)
+    execIdsRemovedInRound += execId
+  }
+
+  private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
+    // If deletion failed on a previous try, we can try again if resync informs us the pod
+    // is still around.
+    // Delete as best attempt - duplicate deletes will throw an exception but the end state
+    // of getting rid of the pod is what matters.
+    Utils.tryLogNonFatalError {
+      kubernetesClient
+        .pods()
+        .withName(updatedPod.getMetadata.getName)
+        .delete()
+    }
+  }
+
+  private def removeExecutorFromSpark(
+      schedulerBackend: KubernetesClusterSchedulerBackend,
+      podState: FinalPodState,
+      execId: Long): Unit = {
+    if (removedExecutorsCache.getIfPresent(execId) == null) {
+      removedExecutorsCache.put(execId, execId)
+      val exitReason = findExitReason(podState, execId)
+      schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
+    }
+  }
+
+  private def findExitReason(podState: FinalPodState, execId: Long): ExecutorExited = {
+    val exitCode = findExitCode(podState)
+    val (exitCausedByApp, exitMessage) = podState match {
+      case PodDeleted(_) =>
+        (false, s"The executor with id $execId was deleted by a user or the framework.")
+      case _ =>
+        val msg = exitReasonMessage(podState, execId, exitCode)
+        (true, msg)
+    }
+    ExecutorExited(exitCode, exitCausedByApp, exitMessage)
+  }
+
+  private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = {
+    val pod = podState.pod
+    s"""
+       |The executor with id $execId exited with exit code $exitCode.
+       |The API gave the following brief reason: ${pod.getStatus.getReason}
+       |The API gave the following message: ${pod.getStatus.getMessage}
+       |The API gave the following container statuses:
+       |
+       |${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+      """.stripMargin
+  }
+
+  private def findExitCode(podState: FinalPodState): Int = {
+    podState.pod.getStatus.getContainerStatuses.asScala.find { containerStatus =>
+      containerStatus.getState.getTerminated != null
+    }.map { terminatedContainer =>
+      terminatedContainer.getState.getTerminated.getExitCode.toInt
+    }.getOrElse(UNKNOWN_EXIT_CODE)
+  }
+}
+
+private object ExecutorPodsLifecycleManager {
+  val UNKNOWN_EXIT_CODE = -1
+}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
new file mode 100644 (file)
index 0000000..e77e604
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+    conf: SparkConf,
+    kubernetesClient: KubernetesClient,
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    pollingExecutor: ScheduledExecutorService) extends Logging {
+
+  private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+    require(pollingFuture == null, "Cannot start polling more than once.")
+    logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
+    pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+      new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+    if (pollingFuture != null) {
+      pollingFuture.cancel(true)
+      pollingFuture = null
+    }
+    ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+    override def run(): Unit = {
+      logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
+      snapshotsStore.replaceSnapshot(kubernetesClient
+        .pods()
+        .withLabel(SPARK_APP_ID_LABEL, applicationId)
+        .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .list()
+        .getItems
+        .asScala)
+    }
+  }
+
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
new file mode 100644 (file)
index 0000000..26be918
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+
+/**
+ * An immutable view of the current executor pods that are running in the cluster.
+ */
+private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {
+
+  import ExecutorPodsSnapshot._
+
+  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
+    val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
+    new ExecutorPodsSnapshot(newExecutorPods)
+  }
+}
+
+object ExecutorPodsSnapshot extends Logging {
+
+  def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
+    ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
+  }
+
+  def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, ExecutorPodState])
+
+  private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = {
+    executorPods.map { pod =>
+      (pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod))
+    }.toMap
+  }
+
+  private def toState(pod: Pod): ExecutorPodState = {
+    if (isDeleted(pod)) {
+      PodDeleted(pod)
+    } else {
+      val phase = pod.getStatus.getPhase.toLowerCase
+      phase match {
+        case "pending" =>
+          PodPending(pod)
+        case "running" =>
+          PodRunning(pod)
+        case "failed" =>
+          PodFailed(pod)
+        case "succeeded" =>
+          PodSucceeded(pod)
+        case _ =>
+          logWarning(s"Received unknown phase $phase for executor pod with name" +
+            s" ${pod.getMetadata.getName} in namespace ${pod.getMetadata.getNamespace}")
+          PodUnknown(pod)
+      }
+    }
+  }
+
+  private def isDeleted(pod: Pod): Boolean = pod.getMetadata.getDeletionTimestamp != null
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStore.scala
new file mode 100644 (file)
index 0000000..dd26433
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+private[spark] trait ExecutorPodsSnapshotsStore {
+
+  def addSubscriber
+      (processBatchIntervalMillis: Long)
+      (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
+
+  def stop(): Unit
+
+  def updatePod(updatedPod: Pod): Unit
+
+  def replaceSnapshot(newSnapshot: Seq[Pod]): Unit
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
new file mode 100644 (file)
index 0000000..5583b46
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent._
+
+import io.fabric8.kubernetes.api.model.Pod
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Controls the propagation of the Spark application's executor pods state to subscribers that
+ * react to that state.
+ * <br>
+ * Roughly follows a producer-consumer model. Producers report states of executor pods, and these
+ * states are then published to consumers that can perform any actions in response to these states.
+ * <br>
+ * Producers push updates in one of two ways. An incremental update sent by updatePod() represents
+ * a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that
+ * the passed pods are all of the most up to date states of all executor pods for the application.
+ * The combination of the states of all executor pods for the application is collectively known as
+ * a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that
+ * most recent snapshot - either by incrementally updating the snapshot with a single new pod state,
+ * or by replacing the snapshot entirely on a full sync.
+ * <br>
+ * Consumers, or subscribers, register that they want to be informed about all snapshots of the
+ * executor pods. Every time the store replaces its most up to date snapshot from either an
+ * incremental update or a full sync, the most recent snapshot after the update is posted to the
+ * subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
+ * time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
+ * time intervals.
+ */
+private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber]
+  private val pollingTasks = mutable.Buffer.empty[Future[_]]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+      processBatchIntervalMillis: Long)
+      (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
+    val newSubscriber = SnapshotsSubscriber(
+        new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
+    SNAPSHOT_LOCK.synchronized {
+      newSubscriber.snapshotsBuffer.add(currentSnapshot)
+    }
+    subscribers += newSubscriber
+    pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
+      toRunnable(() => callSubscriber(newSubscriber)),
+      0L,
+      processBatchIntervalMillis,
+      TimeUnit.MILLISECONDS)
+  }
+
+  override def stop(): Unit = {
+    pollingTasks.foreach(_.cancel(true))
+    ThreadUtils.shutdown(subscribersExecutor)
+  }
+
+  override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized {
+    currentSnapshot = currentSnapshot.withUpdate(updatedPod)
+    addCurrentSnapshotToSubscribers()
+  }
+
+  override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = SNAPSHOT_LOCK.synchronized {
+    currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+    addCurrentSnapshotToSubscribers()
+  }
+
+  private def addCurrentSnapshotToSubscribers(): Unit = {
+    subscribers.foreach { subscriber =>
+      subscriber.snapshotsBuffer.add(currentSnapshot)
+    }
+  }
+
+  private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = {
+    Utils.tryLogNonFatalError {
+      val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava
+      subscriber.snapshotsBuffer.drainTo(currentSnapshots)
+      subscriber.onNewSnapshots(currentSnapshots.asScala)
+    }
+  }
+
+  private def toRunnable[T](runnable: () => Unit): Runnable = new Runnable {
+    override def run(): Unit = runnable()
+  }
+
+  private case class SnapshotsSubscriber(
+      snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
+      onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
new file mode 100644 (file)
index 0000000..a6749a6
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsWatchSnapshotSource(
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    kubernetesClient: KubernetesClient) extends Logging {
+
+  private var watchConnection: Closeable = _
+
+  def start(applicationId: String): Unit = {
+    require(watchConnection == null, "Cannot start the watcher twice.")
+    logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
+      s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
+    watchConnection = kubernetesClient.pods()
+      .withLabel(SPARK_APP_ID_LABEL, applicationId)
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+      .watch(new ExecutorPodsWatcher())
+  }
+
+  def stop(): Unit = {
+    if (watchConnection != null) {
+      Utils.tryLogNonFatalError {
+        watchConnection.close()
+      }
+      watchConnection = null
+    }
+  }
+
+  private class ExecutorPodsWatcher extends Watcher[Pod] {
+    override def eventReceived(action: Action, pod: Pod): Unit = {
+      val podName = pod.getMetadata.getName
+      logDebug(s"Received executor pod update for pod named $podName, action $action")
+      snapshotsStore.updatePod(pod)
+    }
+
+    override def onClose(e: KubernetesClientException): Unit = {
+      logWarning("Kubernetes client has been closed (this is expected if the application is" +
+        " shutting down.)", e)
+    }
+  }
+
+}
index 0ea80df..c6e931a 100644 (file)
@@ -17,7 +17,9 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.io.File
+import java.util.concurrent.TimeUnit
 
+import com.google.common.cache.CacheBuilder
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.{SparkContext, SparkException}
@@ -26,7 +28,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{SystemClock, ThreadUtils}
 
 private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
 
@@ -56,17 +58,45 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-    val allocatorExecutor = ThreadUtils
-      .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
     val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
       "kubernetes-executor-requests")
+
+    val subscribersExecutor = ThreadUtils
+      .newDaemonThreadPoolScheduledExecutor(
+        "kubernetes-executor-snapshots-subscribers", 2)
+    val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
+    val removedExecutorsCache = CacheBuilder.newBuilder()
+      .expireAfterWrite(3, TimeUnit.MINUTES)
+      .build[java.lang.Long, java.lang.Long]()
+    val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
+      sc.conf,
+      new KubernetesExecutorBuilder(),
+      kubernetesClient,
+      snapshotsStore,
+      removedExecutorsCache)
+
+    val executorPodsAllocator = new ExecutorPodsAllocator(
+      sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())
+
+    val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
+      snapshotsStore,
+      kubernetesClient)
+
+    val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+      "kubernetes-executor-pod-polling-sync")
+    val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(
+      sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)
+
     new KubernetesClusterSchedulerBackend(
       scheduler.asInstanceOf[TaskSchedulerImpl],
       sc.env.rpcEnv,
-      new KubernetesExecutorBuilder,
       kubernetesClient,
-      allocatorExecutor,
-      requestExecutorsService)
+      requestExecutorsService,
+      snapshotsStore,
+      executorPodsAllocator,
+      executorPodsLifecycleEventHandler,
+      podsWatchEventSource,
+      podsPollingEventSource)
   }
 
   override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
index d86664c..fa6dc2c 100644 (file)
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import java.io.Closeable
-import java.net.InetAddress
-import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
-import javax.annotation.concurrent.GuardedBy
+import java.util.concurrent.ExecutorService
 
-import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
-import io.fabric8.kubernetes.client.Watcher.Action
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import io.fabric8.kubernetes.client.KubernetesClient
 import scala.concurrent.{ExecutionContext, Future}
 
-import org.apache.spark.SparkException
-import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesConf
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
-import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     rpcEnv: RpcEnv,
-    executorBuilder: KubernetesExecutorBuilder,
     kubernetesClient: KubernetesClient,
-    allocatorExecutor: ScheduledExecutorService,
-    requestExecutorsService: ExecutorService)
+    requestExecutorsService: ExecutorService,
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    podAllocator: ExecutorPodsAllocator,
+    lifecycleEventHandler: ExecutorPodsLifecycleManager,
+    watchEvents: ExecutorPodsWatchSnapshotSource,
+    pollEvents: ExecutorPodsPollingSnapshotSource)
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
-  import KubernetesClusterSchedulerBackend._
-
-  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
-  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
-  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
-  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
-  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
-  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]()
-  private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]()
-
-  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
-
-  private val kubernetesDriverPodName = conf
-    .get(KUBERNETES_DRIVER_POD_NAME)
-    .getOrElse(throw new SparkException("Must specify the driver pod name"))
   private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
     requestExecutorsService)
 
-  private val driverPod = kubernetesClient.pods()
-    .inNamespace(kubernetesNamespace)
-    .withName(kubernetesDriverPodName)
-    .get()
-
   protected override val minRegisteredRatio =
     if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
       0.8
@@ -77,372 +49,93 @@ private[spark] class KubernetesClusterSchedulerBackend(
       super.minRegisteredRatio
     }
 
-  private val executorWatchResource = new AtomicReference[Closeable]
-  private val totalExpectedExecutors = new AtomicInteger(0)
-
-  private val driverUrl = RpcEndpointAddress(
-    conf.get("spark.driver.host"),
-    conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
-    CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-
   private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
 
-  private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
-
-  private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
-
-  private val executorLostReasonCheckMaxAttempts = conf.get(
-    KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
-
-  private val allocatorRunnable = new Runnable {
-
-    // Maintains a map of executor id to count of checks performed to learn the loss reason
-    // for an executor.
-    private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int]
-
-    override def run(): Unit = {
-      handleDisconnectedExecutors()
-
-      val executorsToAllocate = mutable.Map[String, Pod]()
-      val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
-      val currentTotalExpectedExecutors = totalExpectedExecutors.get
-      val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
-      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-        if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
-          logDebug("Waiting for pending executors before scaling")
-        } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
-          logDebug("Maximum allowed executor limit reached. Not scaling up further.")
-        } else {
-          for (_ <- 0 until math.min(
-            currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
-            val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
-            val executorConf = KubernetesConf.createExecutorConf(
-              conf,
-              executorId,
-              applicationId(),
-              driverPod)
-            val executorPod = executorBuilder.buildFromFeatures(executorConf)
-            val podWithAttachedContainer = new PodBuilder(executorPod.pod)
-              .editOrNewSpec()
-                .addToContainers(executorPod.container)
-                .endSpec()
-              .build()
-
-            executorsToAllocate(executorId) = podWithAttachedContainer
-            logInfo(
-              s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
-          }
-        }
-      }
-
-      val allocatedExecutors = executorsToAllocate.mapValues { pod =>
-        Utils.tryLog {
-          kubernetesClient.pods().create(pod)
-        }
-      }
-
-      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-        allocatedExecutors.map {
-          case (executorId, attemptedAllocatedExecutor) =>
-            attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
-              runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
-            }
-        }
-      }
-    }
-
-    def handleDisconnectedExecutors(): Unit = {
-      // For each disconnected executor, synchronize with the loss reasons that may have been found
-      // by the executor pod watcher. If the loss reason was discovered by the watcher,
-      // inform the parent class with removeExecutor.
-      disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach {
-        case (executorId, executorPod) =>
-          val knownExitReason = Option(podsWithKnownExitReasons.remove(
-            executorPod.getMetadata.getName))
-          knownExitReason.fold {
-            removeExecutorOrIncrementLossReasonCheckCount(executorId)
-          } { executorExited =>
-            logWarning(s"Removing executor $executorId with loss reason " + executorExited.message)
-            removeExecutor(executorId, executorExited)
-            // We don't delete the pod running the executor that has an exit condition caused by
-            // the application from the Kubernetes API server. This allows users to debug later on
-            // through commands such as "kubectl logs <pod name>" and
-            // "kubectl describe pod <pod name>". Note that exited containers have terminated and
-            // therefore won't take CPU and memory resources.
-            // Otherwise, the executor pod is marked to be deleted from the API server.
-            if (executorExited.exitCausedByApp) {
-              logInfo(s"Executor $executorId exited because of the application.")
-              deleteExecutorFromDataStructures(executorId)
-            } else {
-              logInfo(s"Executor $executorId failed because of a framework error.")
-              deleteExecutorFromClusterAndDataStructures(executorId)
-            }
-          }
-      }
-    }
-
-    def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
-      val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
-      if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
-        removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
-        deleteExecutorFromClusterAndDataStructures(executorId)
-      } else {
-        executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1)
-      }
-    }
-
-    def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
-      deleteExecutorFromDataStructures(executorId).foreach { pod =>
-        kubernetesClient.pods().delete(pod)
-      }
-    }
-
-    def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
-      disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
-      executorReasonCheckAttemptCounts -= executorId
-      podsWithKnownExitReasons.remove(executorId)
-      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-        runningExecutorsToPods.remove(executorId).orElse {
-          logWarning(s"Unable to remove pod for unknown executor $executorId")
-          None
-        }
-      }
-    }
-  }
-
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
+  // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
+  private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+    removeExecutor(executorId, reason)
   }
 
   override def start(): Unit = {
     super.start()
-    executorWatchResource.set(
-      kubernetesClient
-        .pods()
-        .withLabel(SPARK_APP_ID_LABEL, applicationId())
-        .watch(new ExecutorPodsWatcher()))
-
-    allocatorExecutor.scheduleWithFixedDelay(
-      allocatorRunnable, 0L, podAllocationInterval, TimeUnit.MILLISECONDS)
-
     if (!Utils.isDynamicAllocationEnabled(conf)) {
-      doRequestTotalExecutors(initialExecutors)
+      podAllocator.setTotalExpectedExecutors(initialExecutors)
     }
+    lifecycleEventHandler.start(this)
+    podAllocator.start(applicationId())
+    watchEvents.start(applicationId())
+    pollEvents.start(applicationId())
   }
 
   override def stop(): Unit = {
-    // stop allocation of new resources and caches.
-    allocatorExecutor.shutdown()
-    allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS)
-
-    // send stop message to executors so they shut down cleanly
     super.stop()
 
-    try {
-      val resource = executorWatchResource.getAndSet(null)
-      if (resource != null) {
-        resource.close()
-      }
-    } catch {
-      case e: Throwable => logWarning("Failed to close the executor pod watcher", e)
+    Utils.tryLogNonFatalError {
+      snapshotsStore.stop()
     }
 
-    // then delete the executor pods
     Utils.tryLogNonFatalError {
-      deleteExecutorPodsOnStop()
-      executorPodsByIPs.clear()
+      watchEvents.stop()
     }
+
     Utils.tryLogNonFatalError {
-      logInfo("Closing kubernetes client")
-      kubernetesClient.close()
+      pollEvents.stop()
     }
-  }
 
-  /**
-   * @return A map of K8s cluster nodes to the number of tasks that could benefit from data
-   *         locality if an executor launches on the cluster node.
-   */
-  private def getNodesWithLocalTaskCounts() : Map[String, Int] = {
-    val nodeToLocalTaskCount = synchronized {
-      mutable.Map[String, Int]() ++ hostToLocalTaskCount
+    Utils.tryLogNonFatalError {
+      kubernetesClient.pods()
+        .withLabel(SPARK_APP_ID_LABEL, applicationId())
+        .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .delete()
     }
 
-    for (pod <- executorPodsByIPs.values().asScala) {
-      // Remove cluster nodes that are running our executors already.
-      // TODO: This prefers spreading out executors across nodes. In case users want
-      // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
-      // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
-      nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
-        nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
-        nodeToLocalTaskCount.remove(
-          InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
+    Utils.tryLogNonFatalError {
+      ThreadUtils.shutdown(requestExecutorsService)
+    }
+
+    Utils.tryLogNonFatalError {
+      kubernetesClient.close()
     }
-    nodeToLocalTaskCount.toMap[String, Int]
   }
 
   override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
-    totalExpectedExecutors.set(requestedTotal)
+    // TODO when we support dynamic allocation, the pod allocator should be told to process the
+    // current snapshot in order to decrease/increase the number of executors accordingly.
+    podAllocator.setTotalExpectedExecutors(requestedTotal)
     true
   }
 
-  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
-    val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-      executorIds.flatMap { executorId =>
-        runningExecutorsToPods.remove(executorId) match {
-          case Some(pod) =>
-            disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
-            Some(pod)
-
-          case None =>
-            logWarning(s"Unable to remove pod for unknown executor $executorId")
-            None
-        }
-      }
-    }
-
-    kubernetesClient.pods().delete(podsToDelete: _*)
-    true
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
   }
 
-  private def deleteExecutorPodsOnStop(): Unit = {
-    val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-      val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
-      runningExecutorsToPods.clear()
-      runningExecutorPodsCopy
-    }
-    kubernetesClient.pods().delete(executorPodsToDelete: _*)
+  override def getExecutorIds(): Seq[String] = synchronized {
+    super.getExecutorIds()
   }
 
-  private class ExecutorPodsWatcher extends Watcher[Pod] {
-
-    private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
-
-    override def eventReceived(action: Action, pod: Pod): Unit = {
-      val podName = pod.getMetadata.getName
-      val podIP = pod.getStatus.getPodIP
-
-      action match {
-        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
-            && pod.getMetadata.getDeletionTimestamp == null) =>
-          val clusterNodeName = pod.getSpec.getNodeName
-          logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
-          executorPodsByIPs.put(podIP, pod)
-
-        case Action.DELETED | Action.ERROR =>
-          val executorId = getExecutorId(pod)
-          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-          if (podIP != null) {
-            executorPodsByIPs.remove(podIP)
-          }
-
-          val executorExitReason = if (action == Action.ERROR) {
-            logWarning(s"Received error event of executor pod $podName. Reason: " +
-              pod.getStatus.getReason)
-            executorExitReasonOnError(pod)
-          } else if (action == Action.DELETED) {
-            logWarning(s"Received delete event of executor pod $podName. Reason: " +
-              pod.getStatus.getReason)
-            executorExitReasonOnDelete(pod)
-          } else {
-            throw new IllegalStateException(
-              s"Unknown action that should only be DELETED or ERROR: $action")
-          }
-          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
-
-          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
-              s"watch received an event of type $action for this executor. The executor may " +
-              "have failed to start in the first place and never registered with the driver.")
-          }
-          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
-
-        case _ => logDebug(s"Received event of executor pod $podName: " + action)
-      }
-    }
-
-    override def onClose(cause: KubernetesClientException): Unit = {
-      logDebug("Executor pod watch closed.", cause)
-    }
-
-    private def getExecutorExitStatus(pod: Pod): Int = {
-      val containerStatuses = pod.getStatus.getContainerStatuses
-      if (!containerStatuses.isEmpty) {
-        // we assume the first container represents the pod status. This assumption may not hold
-        // true in the future. Revisit this if side-car containers start running inside executor
-        // pods.
-        getExecutorExitStatus(containerStatuses.get(0))
-      } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
-    }
-
-    private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
-      Option(containerStatus.getState).map { containerState =>
-        Option(containerState.getTerminated).map { containerStateTerminated =>
-          containerStateTerminated.getExitCode.intValue()
-        }.getOrElse(UNKNOWN_EXIT_CODE)
-      }.getOrElse(UNKNOWN_EXIT_CODE)
-    }
-
-    private def isPodAlreadyReleased(pod: Pod): Boolean = {
-      val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
-      RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-        !runningExecutorsToPods.contains(executorId)
-      }
-    }
-
-    private def executorExitReasonOnError(pod: Pod): ExecutorExited = {
-      val containerExitStatus = getExecutorExitStatus(pod)
-      // container was probably actively killed by the driver.
-      if (isPodAlreadyReleased(pod)) {
-        ExecutorExited(containerExitStatus, exitCausedByApp = false,
-          s"Container in pod ${pod.getMetadata.getName} exited from explicit termination " +
-            "request.")
-      } else {
-        val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " +
-          s"exited with exit status code $containerExitStatus."
-        ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
-      }
-    }
-
-    private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = {
-      val exitMessage = if (isPodAlreadyReleased(pod)) {
-        s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
-      } else {
-        s"Pod ${pod.getMetadata.getName} deleted or lost."
-      }
-      ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
-    }
-
-    private def getExecutorId(pod: Pod): String = {
-      val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
-      require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
-        s"to have label $SPARK_EXECUTOR_ID_LABEL.")
-      executorId
-    }
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
+    kubernetesClient.pods()
+      .withLabel(SPARK_APP_ID_LABEL, applicationId())
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+      .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
+      .delete()
+    // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
     new KubernetesDriverEndpoint(rpcEnv, properties)
   }
 
-  private class KubernetesDriverEndpoint(
-      rpcEnv: RpcEnv,
-      sparkProperties: Seq[(String, String)])
+  private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends DriverEndpoint(rpcEnv, sparkProperties) {
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
-      addressToExecutorId.get(rpcAddress).foreach { executorId =>
-        if (disableExecutor(executorId)) {
-          RUNNING_EXECUTOR_PODS_LOCK.synchronized {
-            runningExecutorsToPods.get(executorId).foreach { pod =>
-              disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
-            }
-          }
-        }
-      }
+      // Don't do anything besides disabling the executor - allow the Kubernetes API events to
+      // drive the rest of the lifecycle decisions
+      // TODO what if we disconnect from a networking issue? Probably want to mark the executor
+      // to be deleted eventually.
+      addressToExecutorId.get(rpcAddress).foreach(disableExecutor)
     }
   }
-}
 
-private object KubernetesClusterSchedulerBackend {
-  private val UNKNOWN_EXIT_CODE = -1
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
new file mode 100644 (file)
index 0000000..527fc6b
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, HasMetadata, Pod, PodList}
+import io.fabric8.kubernetes.client.{Watch, Watcher}
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
+
+object Fabric8Aliases {
+  type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+  type LABELED_PODS = FilterWatchListDeletable[
+    Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
+  type SINGLE_POD = PodResource[Pod, DoneablePod]
+  type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
+    HasMetadata, Boolean]
+}
index a8a8218..d045d9a 100644 (file)
@@ -27,6 +27,7 @@ import org.scalatest.mockito.MockitoSugar._
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
 
 class ClientSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -103,15 +104,11 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
       .build()
   }
 
-  private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
-      HasMetadata, Boolean]
-  private type Pods = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
-
   @Mock
   private var kubernetesClient: KubernetesClient = _
 
   @Mock
-  private var podOperations: Pods = _
+  private var podOperations: PODS = _
 
   @Mock
   private var namedPods: PodResource[Pod, DoneablePod] = _
@@ -123,7 +120,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private var driverBuilder: KubernetesDriverBuilder = _
 
   @Mock
-  private var resourceList: ResourceList = _
+  private var resourceList: RESOURCE_LIST = _
 
   private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _
 
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
new file mode 100644 (file)
index 0000000..f7721e6
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import scala.collection.mutable
+
+class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore {
+
+  private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
+  private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => Unit]
+
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber
+      (processBatchIntervalMillis: Long)
+      (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
+    subscribers += onNewSnapshots
+  }
+
+  override def stop(): Unit = {}
+
+  def notifySubscribers(): Unit = {
+    subscribers.foreach(_(snapshotsBuffer))
+    snapshotsBuffer.clear()
+  }
+
+  override def updatePod(updatedPod: Pod): Unit = {
+    currentSnapshot = currentSnapshot.withUpdate(updatedPod)
+    snapshotsBuffer += currentSnapshot
+  }
+
+  override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = {
+    currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+    snapshotsBuffer += currentSnapshot
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
new file mode 100644 (file)
index 0000000..c6b667e
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkPod
+
+object ExecutorLifecycleTestUtils {
+
+  val TEST_SPARK_APP_ID = "spark-app-id"
+
+  def failedExecutorWithoutDeletion(executorId: Long): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId))
+      .editOrNewStatus()
+        .withPhase("failed")
+        .addNewContainerStatus()
+          .withName("spark-executor")
+          .withImage("k8s-spark")
+          .withNewState()
+            .withNewTerminated()
+              .withMessage("Failed")
+              .withExitCode(1)
+              .endTerminated()
+            .endState()
+          .endContainerStatus()
+        .addNewContainerStatus()
+          .withName("spark-executor-sidecar")
+          .withImage("k8s-spark-sidecar")
+          .withNewState()
+            .withNewTerminated()
+              .withMessage("Failed")
+              .withExitCode(1)
+              .endTerminated()
+            .endState()
+          .endContainerStatus()
+        .withMessage("Executor failed.")
+        .withReason("Executor failed because of a thrown error.")
+        .endStatus()
+      .build()
+  }
+
+  def pendingExecutor(executorId: Long): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId))
+      .editOrNewStatus()
+        .withPhase("pending")
+        .endStatus()
+      .build()
+  }
+
+  def runningExecutor(executorId: Long): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId))
+      .editOrNewStatus()
+        .withPhase("running")
+        .endStatus()
+      .build()
+  }
+
+  def succeededExecutor(executorId: Long): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId))
+      .editOrNewStatus()
+        .withPhase("succeeded")
+        .endStatus()
+      .build()
+  }
+
+  def deletedExecutor(executorId: Long): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId))
+      .editOrNewMetadata()
+        .withNewDeletionTimestamp("523012521")
+        .endMetadata()
+      .build()
+  }
+
+  def unknownExecutor(executorId: Long): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId))
+      .editOrNewStatus()
+        .withPhase("unknown")
+        .endStatus()
+      .build()
+  }
+
+  def podWithAttachedContainerForId(executorId: Long): Pod = {
+    val sparkPod = executorPodWithId(executorId)
+    val podWithAttachedContainer = new PodBuilder(sparkPod.pod)
+      .editOrNewSpec()
+        .addToContainers(sparkPod.container)
+        .endSpec()
+      .build()
+    podWithAttachedContainer
+  }
+
+  def executorPodWithId(executorId: Long): SparkPod = {
+    val pod = new PodBuilder()
+      .withNewMetadata()
+        .withName(s"spark-executor-$executorId")
+        .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+        .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+        .endMetadata()
+      .build()
+    val container = new ContainerBuilder()
+      .withName("spark-executor")
+      .withImage("k8s-spark")
+      .build()
+    SparkPod(pod, container)
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
new file mode 100644 (file)
index 0000000..0c19f59
--- /dev/null
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{never, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+import org.apache.spark.util.ManualClock
+
+class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val driverPodName = "driver"
+
+  private val driverPod = new PodBuilder()
+    .withNewMetadata()
+      .withName(driverPodName)
+      .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+      .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+      .withUid("driver-pod-uid")
+      .endMetadata()
+    .build()
+
+  private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+
+  private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+  private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+  private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
+
+  private var waitForExecutorPodsClock: ManualClock = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var labeledPods: LABELED_PODS = _
+
+  @Mock
+  private var driverPodOperations: PodResource[Pod, DoneablePod] = _
+
+  @Mock
+  private var executorBuilder: KubernetesExecutorBuilder = _
+
+  private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+  private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(driverPodOperations.get).thenReturn(driverPod)
+    when(executorBuilder.buildFromFeatures(kubernetesConfWithCorrectFields()))
+      .thenAnswer(executorPodAnswer())
+    snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+    waitForExecutorPodsClock = new ManualClock(0L)
+    podsAllocatorUnderTest = new ExecutorPodsAllocator(
+      conf, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
+  }
+
+  test("Initially request executors in batches. Do not request another batch if the" +
+    " first has not finished.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    for (nextId <- 1 to podAllocationSize) {
+      verify(podOperations).create(podWithAttachedContainerForId(nextId))
+    }
+    verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+  }
+
+  test("Request executors in batches. Allow another batch to be requested if" +
+    " all pending executors start running.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    for (execId <- 1 until podAllocationSize) {
+      snapshotsStore.updatePod(runningExecutor(execId))
+    }
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    snapshotsStore.updatePod(runningExecutor(podAllocationSize))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    snapshotsStore.updatePod(runningExecutor(podAllocationSize))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod]))
+  }
+
+  test("When a current batch reaches error states immediately, re-request" +
+    " them on the next batch.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    for (execId <- 1 until podAllocationSize) {
+      snapshotsStore.updatePod(runningExecutor(execId))
+    }
+    val failedPod = failedExecutorWithoutDeletion(podAllocationSize)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+  }
+
+  test("When an executor is requested but the API does not report it in a reasonable time, retry" +
+    " requesting that executor.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
+    when(podOperations.withLabel(SPARK_EXECUTOR_ID_LABEL, "1")).thenReturn(labeledPods)
+    snapshotsStore.notifySubscribers()
+    verify(labeledPods).delete()
+    verify(podOperations).create(podWithAttachedContainerForId(2))
+  }
+
+  private def executorPodAnswer(): Answer[SparkPod] = {
+    new Answer[SparkPod] {
+      override def answer(invocation: InvocationOnMock): SparkPod = {
+        val k8sConf = invocation.getArgumentAt(
+          0, classOf[KubernetesConf[KubernetesExecutorSpecificConf]])
+        executorPodWithId(k8sConf.roleSpecificConf.executorId.toInt)
+      }
+    }
+  }
+
+  private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] =
+    Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
+      override def matches(argument: scala.Any): Boolean = {
+        if (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
+          false
+        } else {
+          val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
+          val executorSpecificConf = k8sConf.roleSpecificConf
+          val expectedK8sConf = KubernetesConf.createExecutorConf(
+            conf,
+            executorSpecificConf.executorId,
+            TEST_SPARK_APP_ID,
+            driverPod)
+          k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
+            // Since KubernetesConf.createExecutorConf clones the SparkConf object, force
+            // deep equality comparison for the SparkConf object and use object equality
+            // comparison on all other fields.
+            k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf)
+        }
+      }
+    })
+
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
new file mode 100644 (file)
index 0000000..562ace9
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.CacheBuilder
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private var namedExecutorPods: mutable.Map[String, PodResource[Pod, DoneablePod]] = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var executorBuilder: KubernetesExecutorBuilder = _
+
+  @Mock
+  private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+  private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+  private var eventHandlerUnderTest: ExecutorPodsLifecycleManager = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    val removedExecutorsCache = CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long]
+    snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+    namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, DoneablePod]]
+    when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String])
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
+    eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
+      new SparkConf(),
+      executorBuilder,
+      kubernetesClient,
+      snapshotsStore,
+      removedExecutorsCache)
+    eventHandlerUnderTest.start(schedulerBackend)
+  }
+
+  test("When an executor reaches error states immediately, remove from the scheduler backend.") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    val msg = exitReasonMessage(1, failedPod)
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+    verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+    verify(namedExecutorPods(failedPod.getMetadata.getName)).delete()
+  }
+
+  test("Don't remove executors twice from Spark but remove from K8s repeatedly.") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    val msg = exitReasonMessage(1, failedPod)
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+    verify(schedulerBackend, times(1)).doRemoveExecutor("1", expectedLossReason)
+    verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
+  }
+
+  test("When the scheduler backend lists executor ids that aren't present in the cluster," +
+    " remove those executors from Spark.") {
+    when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1"))
+    val msg = s"The executor with ID 1 was not found in the cluster but we didn't" +
+      s" get a reason why. Marking the executor as failed. The executor may have been" +
+      s" deleted but the driver missed the deletion event."
+    val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+  }
+
+  private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
+    s"""
+       |The executor with id $failedExecutorId exited with exit code 1.
+       |The API gave the following brief reason: ${failedPod.getStatus.getReason}
+       |The API gave the following message: ${failedPod.getStatus.getMessage}
+       |The API gave the following container statuses:
+       |
+       |${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+      """.stripMargin
+  }
+
+  private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] = {
+    new Answer[PodResource[Pod, DoneablePod]] {
+      override def answer(invocation: InvocationOnMock): PodResource[Pod, DoneablePod] = {
+        val podName = invocation.getArgumentAt(0, classOf[String])
+        namedExecutorPods.getOrElseUpdate(
+          podName, mock(classOf[PodResource[Pod, DoneablePod]]))
+      }
+    }
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
new file mode 100644 (file)
index 0000000..1b26d6a
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import io.fabric8.kubernetes.api.model.PodListBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.{verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val sparkConf = new SparkConf
+
+  private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var appIdLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var executorRoleLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+  private var pollingExecutor: DeterministicScheduler = _
+  private var pollingSourceUnderTest: ExecutorPodsPollingSnapshotSource = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    pollingExecutor = new DeterministicScheduler()
+    pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
+      sparkConf,
+      kubernetesClient,
+      eventQueue,
+      pollingExecutor)
+    pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(appIdLabeledPods)
+    when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(executorRoleLabeledPods)
+  }
+
+  test("Items returned by the API should be pushed to the event queue") {
+    when(executorRoleLabeledPods.list())
+      .thenReturn(new PodListBuilder()
+        .addToItems(
+          runningExecutor(1),
+          runningExecutor(2))
+        .build())
+    pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
+    verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2)))
+
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
new file mode 100644 (file)
index 0000000..70e19c9
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsSnapshotSuite extends SparkFunSuite {
+
+  test("States are interpreted correctly from pod metadata.") {
+    val pods = Seq(
+      pendingExecutor(0),
+      runningExecutor(1),
+      succeededExecutor(2),
+      failedExecutorWithoutDeletion(3),
+      deletedExecutor(4),
+      unknownExecutor(5))
+    val snapshot = ExecutorPodsSnapshot(pods)
+    assert(snapshot.executorPods ===
+      Map(
+        0L -> PodPending(pods(0)),
+        1L -> PodRunning(pods(1)),
+        2L -> PodSucceeded(pods(2)),
+        3L -> PodFailed(pods(3)),
+        4L -> PodDeleted(pods(4)),
+        5L -> PodUnknown(pods(5))))
+  }
+
+  test("Updates add new pods for non-matching ids and edit existing pods for matching ids") {
+    val originalPods = Seq(
+      pendingExecutor(0),
+      runningExecutor(1))
+    val originalSnapshot = ExecutorPodsSnapshot(originalPods)
+    val snapshotWithUpdatedPod = originalSnapshot.withUpdate(succeededExecutor(1))
+    assert(snapshotWithUpdatedPod.executorPods ===
+      Map(
+        0L -> PodPending(originalPods(0)),
+        1L -> PodSucceeded(succeededExecutor(1))))
+    val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExecutor(2))
+    assert(snapshotWithNewPod.executorPods ===
+      Map(
+        0L -> PodPending(originalPods(0)),
+        1L -> PodSucceeded(succeededExecutor(1)),
+        2L -> PodPending(pendingExecutor(2))))
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
new file mode 100644 (file)
index 0000000..cf54b3c
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.scalatest.BeforeAndAfter
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants._
+
+class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private var eventBufferScheduler: DeterministicScheduler = _
+  private var eventQueueUnderTest: ExecutorPodsSnapshotsStoreImpl = _
+
+  before {
+    eventBufferScheduler = new DeterministicScheduler()
+    eventQueueUnderTest = new ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler)
+  }
+
+  test("Subscribers get notified of events periodically.") {
+    val receivedSnapshots1 = mutable.Buffer.empty[ExecutorPodsSnapshot]
+    val receivedSnapshots2 = mutable.Buffer.empty[ExecutorPodsSnapshot]
+    eventQueueUnderTest.addSubscriber(1000) {
+      receivedSnapshots1 ++= _
+    }
+    eventQueueUnderTest.addSubscriber(2000) {
+      receivedSnapshots2 ++= _
+    }
+
+    eventBufferScheduler.runUntilIdle()
+    assert(receivedSnapshots1 === Seq(ExecutorPodsSnapshot()))
+    assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
+
+    pushPodWithIndex(1)
+    // Force time to move forward so that the buffer is emitted, scheduling the
+    // processing task on the subscription executor...
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    // ... then actually execute the subscribers.
+
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
+
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+
+    // Don't repeat snapshots
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    assert(receivedSnapshots2 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    pushPodWithIndex(2)
+    pushPodWithIndex(3)
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
+    assert(receivedSnapshots2 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
+    assert(receivedSnapshots1 === receivedSnapshots2)
+  }
+
+  test("Even without sending events, initially receive an empty buffer.") {
+    val receivedInitialSnapshot = new AtomicReference[Seq[ExecutorPodsSnapshot]](null)
+    eventQueueUnderTest.addSubscriber(1000) {
+      receivedInitialSnapshot.set
+    }
+    assert(receivedInitialSnapshot.get == null)
+    eventBufferScheduler.runUntilIdle()
+    assert(receivedInitialSnapshot.get === Seq(ExecutorPodsSnapshot()))
+  }
+
+  test("Replacing the snapshot passes the new snapshot to subscribers.") {
+    val receivedSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot]
+    eventQueueUnderTest.addSubscriber(1000) {
+      receivedSnapshots ++= _
+    }
+    eventQueueUnderTest.updatePod(podWithIndex(1))
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    assert(receivedSnapshots === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    eventQueueUnderTest.replaceSnapshot(Seq(podWithIndex(2)))
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    assert(receivedSnapshots === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(2)))))
+  }
+
+  private def pushPodWithIndex(index: Int): Unit =
+    eventQueueUnderTest.updatePod(podWithIndex(index))
+
+  private def podWithIndex(index: Int): Pod =
+    new PodBuilder()
+      .editOrNewMetadata()
+        .withName(s"pod-$index")
+        .addToLabels(SPARK_EXECUTOR_ID_LABEL, index.toString)
+        .endMetadata()
+      .editOrNewStatus()
+        .withPhase("running")
+        .endStatus()
+      .build()
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
new file mode 100644 (file)
index 0000000..ac1968b
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Mockito.{verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
+
+  @Mock
+  private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var appIdLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var executorRoleLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var watchConnection: Watch = _
+
+  private var watch: ArgumentCaptor[Watcher[Pod]] = _
+
+  private var watchSourceUnderTest: ExecutorPodsWatchSnapshotSource = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(appIdLabeledPods)
+    when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(executorRoleLabeledPods)
+    when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
+    watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
+      eventQueue, kubernetesClient)
+    watchSourceUnderTest.start(TEST_SPARK_APP_ID)
+  }
+
+  test("Watch events should be pushed to the snapshots store as snapshot updates.") {
+    watch.getValue.eventReceived(Action.ADDED, runningExecutor(1))
+    watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2))
+    verify(eventQueue).updatePod(runningExecutor(1))
+    verify(eventQueue).updatePod(runningExecutor(2))
+  }
+}
index 96065e8..52e7a12 100644 (file)
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, Pod, PodBuilder, PodList}
-import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
-import io.fabric8.kubernetes.client.Watcher.Action
-import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
-import org.hamcrest.{BaseMatcher, Description, Matcher}
-import org.mockito.{AdditionalAnswers, ArgumentCaptor, Matchers, Mock, MockitoAnnotations}
-import org.mockito.Matchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{doNothing, never, times, verify, when}
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Matchers.{eq => mockitoEq}
+import org.mockito.Mockito.{never, verify, when}
 import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
 
 class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter {
 
-  private val APP_ID = "test-spark-app"
-  private val DRIVER_POD_NAME = "spark-driver-pod"
-  private val NAMESPACE = "test-namespace"
-  private val SPARK_DRIVER_HOST = "localhost"
-  private val SPARK_DRIVER_PORT = 7077
-  private val POD_ALLOCATION_INTERVAL = "1m"
-  private val FIRST_EXECUTOR_POD = new PodBuilder()
-    .withNewMetadata()
-      .withName("pod1")
-      .endMetadata()
-    .withNewSpec()
-      .withNodeName("node1")
-      .endSpec()
-    .withNewStatus()
-      .withHostIP("192.168.99.100")
-      .endStatus()
-    .build()
-  private val SECOND_EXECUTOR_POD = new PodBuilder()
-    .withNewMetadata()
-      .withName("pod2")
-      .endMetadata()
-    .withNewSpec()
-      .withNodeName("node2")
-      .endSpec()
-    .withNewStatus()
-      .withHostIP("192.168.99.101")
-      .endStatus()
-    .build()
-
-  private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
-  private type LABELED_PODS = FilterWatchListDeletable[
-    Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
-  private type IN_NAMESPACE_PODS = NonNamespaceOperation[
-    Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
-
-  @Mock
-  private var sparkContext: SparkContext = _
-
-  @Mock
-  private var listenerBus: LiveListenerBus = _
-
-  @Mock
-  private var taskSchedulerImpl: TaskSchedulerImpl = _
+  private val requestExecutorsService = new DeterministicScheduler()
+  private val sparkConf = new SparkConf(false)
+    .set("spark.executor.instances", "3")
 
   @Mock
-  private var allocatorExecutor: ScheduledExecutorService = _
+  private var sc: SparkContext = _
 
   @Mock
-  private var requestExecutorsService: ExecutorService = _
+  private var rpcEnv: RpcEnv = _
 
   @Mock
-  private var executorBuilder: KubernetesExecutorBuilder = _
+  private var driverEndpointRef: RpcEndpointRef = _
 
   @Mock
   private var kubernetesClient: KubernetesClient = _
@@ -103,347 +54,97 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private var podOperations: PODS = _
 
   @Mock
-  private var podsWithLabelOperations: LABELED_PODS = _
+  private var labeledPods: LABELED_PODS = _
 
   @Mock
-  private var podsInNamespace: IN_NAMESPACE_PODS = _
+  private var taskScheduler: TaskSchedulerImpl = _
 
   @Mock
-  private var podsWithDriverName: PodResource[Pod, DoneablePod] = _
+  private var eventQueue: ExecutorPodsSnapshotsStore = _
 
   @Mock
-  private var rpcEnv: RpcEnv = _
+  private var podAllocator: ExecutorPodsAllocator = _
 
   @Mock
-  private var driverEndpointRef: RpcEndpointRef = _
+  private var lifecycleEventHandler: ExecutorPodsLifecycleManager = _
 
   @Mock
-  private var executorPodsWatch: Watch = _
+  private var watchEvents: ExecutorPodsWatchSnapshotSource = _
 
   @Mock
-  private var successFuture: Future[Boolean] = _
+  private var pollEvents: ExecutorPodsPollingSnapshotSource = _
 
-  private var sparkConf: SparkConf = _
-  private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _
-  private var allocatorRunnable: ArgumentCaptor[Runnable] = _
-  private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _
   private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
-
-  private val driverPod = new PodBuilder()
-    .withNewMetadata()
-      .withName(DRIVER_POD_NAME)
-      .addToLabels(SPARK_APP_ID_LABEL, APP_ID)
-      .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
-      .endMetadata()
-    .build()
+  private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
 
   before {
     MockitoAnnotations.initMocks(this)
-    sparkConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
-      .set(KUBERNETES_NAMESPACE, NAMESPACE)
-      .set("spark.driver.host", SPARK_DRIVER_HOST)
-      .set("spark.driver.port", SPARK_DRIVER_PORT.toString)
-      .set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, POD_ALLOCATION_INTERVAL)
-    executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
-    allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
-    requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+    when(taskScheduler.sc).thenReturn(sc)
+    when(sc.conf).thenReturn(sparkConf)
     driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
-    when(sparkContext.conf).thenReturn(sparkConf)
-    when(sparkContext.listenerBus).thenReturn(listenerBus)
-    when(taskSchedulerImpl.sc).thenReturn(sparkContext)
-    when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations)
-    when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
-      .thenReturn(executorPodsWatch)
-    when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace)
-    when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName)
-    when(podsWithDriverName.get()).thenReturn(driverPod)
-    when(allocatorExecutor.scheduleWithFixedDelay(
-      allocatorRunnable.capture(),
-      mockitoEq(0L),
-      mockitoEq(TimeUnit.MINUTES.toMillis(1)),
-      mockitoEq(TimeUnit.MILLISECONDS))).thenReturn(null)
-    // Creating Futures in Scala backed by a Java executor service resolves to running
-    // ExecutorService#execute (as opposed to submit)
-    doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
     when(rpcEnv.setupEndpoint(
       mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
       .thenReturn(driverEndpointRef)
-
-    // Used by the CoarseGrainedSchedulerBackend when making RPC calls.
-    when(driverEndpointRef.ask[Boolean]
-      (any(classOf[Any]))
-      (any())).thenReturn(successFuture)
-    when(successFuture.failed).thenReturn(Future[Throwable] {
-      // emulate behavior of the Future.failed method.
-      throw new NoSuchElementException()
-    }(ThreadUtils.sameThread))
-  }
-
-  test("Basic lifecycle expectations when starting and stopping the scheduler.") {
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    assert(executorPodsWatcherArgument.getValue != null)
-    assert(allocatorRunnable.getValue != null)
-    scheduler.stop()
-    verify(executorPodsWatch).close()
-  }
-
-  test("Static allocation should request executors upon first allocator run.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    requestExecutorRunnable.getValue.run()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(firstResolvedPod)
-    verify(podOperations).create(secondResolvedPod)
-  }
-
-  test("Killing executors deletes the executor pods") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    requestExecutorRunnable.getValue.run()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    when(podOperations.create(any(classOf[Pod])))
-      .thenAnswer(AdditionalAnswers.returnsFirstArg())
-    allocatorRunnable.getValue.run()
-    scheduler.doKillExecutors(Seq("2"))
-    requestExecutorRunnable.getAllValues.asScala.last.run()
-    verify(podOperations).delete(secondResolvedPod)
-    verify(podOperations, never()).delete(firstResolvedPod)
-  }
-
-  test("Executors should be requested in batches.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    requestExecutorRunnable.getValue.run()
-    when(podOperations.create(any(classOf[Pod])))
-      .thenAnswer(AdditionalAnswers.returnsFirstArg())
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(firstResolvedPod)
-    verify(podOperations, never()).create(secondResolvedPod)
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(secondResolvedPod)
-  }
-
-  test("Scaled down executors should be cleaned up") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-
-    // The scheduler backend spins up one executor pod.
-    requestExecutorRunnable.getValue.run()
-    when(podOperations.create(any(classOf[Pod])))
-      .thenAnswer(AdditionalAnswers.returnsFirstArg())
-    val resolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    val executorEndpointRef = mock[RpcEndpointRef]
-    when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-
-    // Request that there are 0 executors and trigger deletion from driver.
-    scheduler.doRequestTotalExecutors(0)
-    requestExecutorRunnable.getAllValues.asScala.last.run()
-    scheduler.doKillExecutors(Seq("1"))
-    requestExecutorRunnable.getAllValues.asScala.last.run()
-    verify(podOperations, times(1)).delete(resolvedPod)
-    driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-
-    val exitedPod = exitPod(resolvedPod, 0)
-    executorPodsWatcherArgument.getValue.eventReceived(Action.DELETED, exitedPod)
-    allocatorRunnable.getValue.run()
-
-    // No more deletion attempts of the executors.
-    // This is graceful termination and should not be detected as a failure.
-    verify(podOperations, times(1)).delete(resolvedPod)
-    verify(driverEndpointRef, times(1)).send(
-      RemoveExecutor("1", ExecutorExited(
-        0,
-        exitCausedByApp = false,
-        s"Container in pod ${exitedPod.getMetadata.getName} exited from" +
-          s" explicit termination request.")))
-  }
-
-  test("Executors that fail should not be deleted.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    val executorEndpointRef = mock[RpcEndpointRef]
-    when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-    driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-    executorPodsWatcherArgument.getValue.eventReceived(
-      Action.ERROR, exitPod(firstResolvedPod, 1))
-
-    // A replacement executor should be created but the error pod should persist.
-    val replacementPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    scheduler.doRequestTotalExecutors(1)
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getAllValues.asScala.last.run()
-    verify(podOperations, never()).delete(firstResolvedPod)
-    verify(driverEndpointRef).send(
-      RemoveExecutor("1", ExecutorExited(
-        1,
-        exitCausedByApp = true,
-        s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" +
-          " exit status code 1.")))
-  }
-
-  test("Executors disconnected due to unknown reasons are deleted and replaced.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val executorLostReasonCheckMaxAttempts = sparkConf.get(
-      KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
-
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    val executorEndpointRef = mock[RpcEndpointRef]
-    when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-
-    driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-    1 to executorLostReasonCheckMaxAttempts foreach { _ =>
-      allocatorRunnable.getValue.run()
-      verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
+      taskScheduler,
+      rpcEnv,
+      kubernetesClient,
+      requestExecutorsService,
+      eventQueue,
+      podAllocator,
+      lifecycleEventHandler,
+      watchEvents,
+      pollEvents) {
+      override def applicationId(): String = TEST_SPARK_APP_ID
     }
-
-    val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).delete(firstResolvedPod)
-    verify(driverEndpointRef).send(
-      RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
   }
 
-  test("Executors that fail to start on the Kubernetes API call rebuild in the next batch.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    when(podOperations.create(firstResolvedPod))
-      .thenThrow(new RuntimeException("test"))
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    verify(podOperations, times(1)).create(firstResolvedPod)
-    val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(recreatedResolvedPod)
+  test("Start all components") {
+    schedulerBackendUnderTest.start()
+    verify(podAllocator).setTotalExpectedExecutors(3)
+    verify(podAllocator).start(TEST_SPARK_APP_ID)
+    verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
+    verify(watchEvents).start(TEST_SPARK_APP_ID)
+    verify(pollEvents).start(TEST_SPARK_APP_ID)
   }
 
-  test("Executors that are initially created but the watch notices them fail are rebuilt" +
-    " in the next batch.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    when(podOperations.create(FIRST_EXECUTOR_POD)).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    verify(podOperations, times(1)).create(firstResolvedPod)
-    executorPodsWatcherArgument.getValue.eventReceived(Action.ERROR, firstResolvedPod)
-    val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(recreatedResolvedPod)
+  test("Stop all components") {
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+    schedulerBackendUnderTest.stop()
+    verify(eventQueue).stop()
+    verify(watchEvents).stop()
+    verify(pollEvents).stop()
+    verify(labeledPods).delete()
+    verify(kubernetesClient).close()
   }
 
-  private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = {
-    new KubernetesClusterSchedulerBackend(
-      taskSchedulerImpl,
-      rpcEnv,
-      executorBuilder,
-      kubernetesClient,
-      allocatorExecutor,
-      requestExecutorsService) {
-
-      override def applicationId(): String = APP_ID
-    }
+  test("Remove executor") {
+    schedulerBackendUnderTest.start()
+    schedulerBackendUnderTest.doRemoveExecutor(
+      "1", ExecutorKilled)
+    verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
   }
 
-  private def exitPod(basePod: Pod, exitCode: Int): Pod = {
-    new PodBuilder(basePod)
-      .editStatus()
-        .addNewContainerStatus()
-          .withNewState()
-            .withNewTerminated()
-              .withExitCode(exitCode)
-              .endTerminated()
-            .endState()
-          .endContainerStatus()
-        .endStatus()
-      .build()
+  test("Kill executors") {
+    schedulerBackendUnderTest.start()
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+    when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
+    schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
+    verify(labeledPods, never()).delete()
+    requestExecutorsService.runNextPendingCommand()
+    verify(labeledPods).delete()
   }
 
-  private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Pod = {
-    val resolvedPod = new PodBuilder(expectedPod)
-      .editMetadata()
-        .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
-        .endMetadata()
-      .build()
-    val resolvedContainer = new ContainerBuilder().build()
-    when(executorBuilder.buildFromFeatures(Matchers.argThat(
-      new BaseMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
-        override def matches(argument: scala.Any)
-          : Boolean = {
-          argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] &&
-            argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
-              .roleSpecificConf.executorId == executorId.toString
-        }
-
-        override def describeTo(description: Description): Unit = {}
-      }))).thenReturn(SparkPod(resolvedPod, resolvedContainer))
-    new PodBuilder(resolvedPod)
-      .editSpec()
-        .addToContainers(resolvedContainer)
-        .endSpec()
-      .build()
+  test("Request total executors") {
+    schedulerBackendUnderTest.start()
+    schedulerBackendUnderTest.doRequestTotalExecutors(5)
+    verify(podAllocator).setTotalExpectedExecutors(3)
+    verify(podAllocator, never()).setTotalExpectedExecutors(5)
+    requestExecutorsService.runNextPendingCommand()
+    verify(podAllocator).setTotalExpectedExecutors(5)
   }
+
 }