[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released...
authorShixiong Zhu <zsxwing@gmail.com>
Fri, 10 Aug 2018 17:53:44 +0000 (10:53 -0700)
committerShixiong Zhu <zsxwing@gmail.com>
Fri, 10 Aug 2018 17:53:44 +0000 (10:53 -0700)
## What changes were proposed in this pull request?

This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).

"allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I  have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its general contract!
- java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.

Closes #22062 from zsxwing/SPARK-25081.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala [new file with mode: 0644]

index 8f49859..4b48599 100644 (file)
@@ -65,7 +65,7 @@ final class ShuffleInMemorySorter {
    */
   private int usableCapacity = 0;
 
-  private int initialSize;
+  private final int initialSize;
 
   ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
     this.consumer = consumer;
@@ -94,12 +94,20 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+    // Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
+    pos = 0;
     if (consumer != null) {
       consumer.freeArray(array);
+      // As `array` has been released, we should set it to  `null` to avoid accessing it before
+      // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
+      // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
+      // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
+      // `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError).
+      array = null;
+      usableCapacity = 0;
       array = consumer.allocateArray(initialSize);
       usableCapacity = getUsableCapacity();
     }
-    pos = 0;
   }
 
   public void expandPointerArray(LongArray newArray) {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
new file mode 100644 (file)
index 0000000..b9f0e87
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+    val conf = new SparkConf()
+      .setMaster("local[1]")
+      .setAppName("ShuffleExternalSorterSuite")
+      .set("spark.testing", "true")
+      .set("spark.testing.memory", "1600")
+      .set("spark.memory.fraction", "1")
+    sc = new SparkContext(conf)
+
+    val memoryManager = UnifiedMemoryManager(conf, 1)
+
+    var shouldAllocate = false
+
+    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
+    // This will trigger a nested spill and expose issues if we don't handle this case properly.
+    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+      override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
+        // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
+        // So we leave 400 bytes for the task.
+        if (shouldAllocate &&
+          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
+          val acquireExecutionMemoryMethod =
+            memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
+          acquireExecutionMemoryMethod.invoke(
+            memoryManager,
+            JLong.valueOf(
+              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
+            JLong.valueOf(1L), // taskAttemptId
+            MemoryMode.ON_HEAP
+          ).asInstanceOf[java.lang.Long]
+        }
+        super.acquireExecutionMemory(required, consumer)
+      }
+    }
+    val taskContext = mock[TaskContext]
+    val taskMetrics = new TaskMetrics
+    when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+    val sorter = new ShuffleExternalSorter(
+      taskMemoryManager,
+      sc.env.blockManager,
+      taskContext,
+      100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
+      1, // numPartitions
+      conf,
+      new ShuffleWriteMetrics)
+    val inMemSorter = {
+      val field = sorter.getClass.getDeclaredField("inMemSorter")
+      field.setAccessible(true)
+      field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+    }
+    // Allocate memory to make the next "insertRecord" call triggers a spill.
+    val bytes = new Array[Byte](1)
+    while (inMemSorter.hasSpaceForAnotherRecord) {
+      sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
+    }
+
+    // This flag will make the mocked TaskMemoryManager acquire free memory released by spill to
+    // trigger a nested spill.
+    shouldAllocate = true
+
+    // Should throw `SparkOutOfMemoryError` as there is no enough memory: `ShuffleInMemorySorter`
+    // will try to acquire 800 bytes but there are only 400 bytes available.
+    //
+    // Before the fix, a nested spill may use a released page and this causes two tasks access the
+    // same memory page. When a task reads memory written by another task, many types of failures
+    // may happen. Here are some examples we have seen:
+    //
+    // - JVM crash. (This is easy to reproduce in the unit test as we fill newly allocated and
+    //   deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory
+    //   address)
+    // - java.lang.IllegalArgumentException: Comparison method violates its general contract!
+    // - java.lang.NullPointerException
+    //     at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
+    // - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912
+    //     because the size after growing exceeds size limitation 2147483632
+    intercept[SparkOutOfMemoryError] {
+      sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
+    }
+  }
+}