[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution
authorEfim Poberezkin <efim@poberezkin.ru>
Fri, 18 May 2018 23:54:39 +0000 (16:54 -0700)
committerTathagata Das <tathagata.das1565@gmail.com>
Fri, 18 May 2018 23:54:39 +0000 (16:54 -0700)
## What changes were proposed in this pull request?

Made changes to EpochCoordinator so that it enforces a commit order. In case a message for epoch n is lost and epoch (n + 1) is ready for commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed first.

## How was this patch tested?

Existing tests in ContinuousSuite and EpochCoordinatorSuite.

Author: Efim Poberezkin <efim@poberezkin.ru>

Closes #20936 from efimpoberezkin/pr/sequence-commited-epochs.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala

index cc68080..8877ebe 100644 (file)
@@ -137,30 +137,71 @@ private[continuous] class EpochCoordinator(
   private val partitionOffsets =
     mutable.Map[(Long, Int), PartitionOffset]()
 
+  private var lastCommittedEpoch = startEpoch - 1
+  // Remembers epochs that have to wait for previous epochs to be committed first.
+  private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
+
   private def resolveCommitsAtEpoch(epoch: Long) = {
-    val thisEpochCommits =
-      partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
+    val thisEpochCommits = findPartitionCommitsForEpoch(epoch)
     val nextEpochOffsets =
       partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
 
     if (thisEpochCommits.size == numWriterPartitions &&
       nextEpochOffsets.size == numReaderPartitions) {
-      logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
-      // Sequencing is important here. We must commit to the writer before recording the commit
-      // in the query, or we will end up dropping the commit if we restart in the middle.
-      writer.commit(epoch, thisEpochCommits.toArray)
-      query.commit(epoch)
-
-      // Cleanup state from before this epoch, now that we know all partitions are forever past it.
-      for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) {
-        partitionCommits.remove(k)
-      }
-      for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) {
-        partitionOffsets.remove(k)
+
+      // Check that last committed epoch is the previous one for sequencing of committed epochs.
+      // If not, add the epoch being currently processed to epochs waiting to be committed,
+      // otherwise commit it.
+      if (lastCommittedEpoch != epoch - 1) {
+        logDebug(s"Epoch $epoch has received commits from all partitions " +
+          s"and is waiting for epoch ${epoch - 1} to be committed first.")
+        epochsWaitingToBeCommitted.add(epoch)
+      } else {
+        commitEpoch(epoch, thisEpochCommits)
+        lastCommittedEpoch = epoch
+
+        // Commit subsequent epochs that are waiting to be committed.
+        var nextEpoch = lastCommittedEpoch + 1
+        while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
+          val nextEpochCommits = findPartitionCommitsForEpoch(nextEpoch)
+          commitEpoch(nextEpoch, nextEpochCommits)
+
+          epochsWaitingToBeCommitted.remove(nextEpoch)
+          lastCommittedEpoch = nextEpoch
+          nextEpoch += 1
+        }
+
+        // Cleanup state from before last committed epoch,
+        // now that we know all partitions are forever past it.
+        for (k <- partitionCommits.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
+          partitionCommits.remove(k)
+        }
+        for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
+          partitionOffsets.remove(k)
+        }
       }
     }
   }
 
+  /**
+   * Collect per-partition commits for an epoch.
+   */
+  private def findPartitionCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = {
+    partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
+  }
+
+  /**
+   * Commit epoch to the offset log.
+   */
+  private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = {
+    logDebug(s"Epoch $epoch has received commits from all partitions " +
+      s"and is ready to be committed. Committing epoch $epoch.")
+    // Sequencing is important here. We must commit to the writer before recording the commit
+    // in the query, or we will end up dropping the commit if we restart in the middle.
+    writer.commit(epoch, messages.toArray)
+    query.commit(epoch)
+  }
+
   override def receive: PartialFunction[Any, Unit] = {
     // If we just drop these messages, we won't do any writes to the query. The lame duck tasks
     // won't shed errors or anything.
index 99e3056..82836dc 100644 (file)
@@ -120,7 +120,7 @@ class EpochCoordinatorSuite
     verifyCommitsInOrderOf(List(1, 2))
   }
 
-  ignore("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
+  test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
     setWriterPartitions(2)
     setReaderPartitions(2)
 
@@ -141,7 +141,7 @@ class EpochCoordinatorSuite
     verifyCommitsInOrderOf(List(1, 2))
   }
 
-  ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
+  test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
     setWriterPartitions(1)
     setReaderPartitions(1)
 
@@ -162,7 +162,7 @@ class EpochCoordinatorSuite
     verifyCommitsInOrderOf(List(1, 2, 3, 4))
   }
 
-  ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
+  test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
     setWriterPartitions(1)
     setReaderPartitions(1)