[SPARK-25069][CORE] Using UnsafeAlignedOffset to make the entire record of 8 byte...
author10129659 <chen.yanshan@zte.com.cn>
Mon, 13 Aug 2018 01:09:25 +0000 (09:09 +0800)
committerWenchen Fan <wenchen@databricks.com>
Mon, 13 Aug 2018 01:09:25 +0000 (09:09 +0800)
## What changes were proposed in this pull request?

The class of UnsafeExternalSorter used UnsafeAlignedOffset to make the entire record of 8 byte Items aligned, but ShuffleExternalSorter not.
The SPARC platform requires this because using a 4 byte Int for record lengths causes the entire record of 8 byte Items to become misaligned by 4 bytes. Using a 8 byte long for record length keeps things 8 byte aligned.

## How was this patch tested?
Existing Test.

Closes #22053 from eatoncys/UnsafeAlignedOffset.

Authored-by: 10129659 <chen.yanshan@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

index c3a07b2..c7d2db4 100644 (file)
@@ -43,6 +43,7 @@ import org.apache.spark.storage.DiskBlockObjectWriter;
 import org.apache.spark.storage.FileSegment;
 import org.apache.spark.storage.TempShuffleBlockId;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.Utils;
@@ -184,6 +185,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
 
     int currentPartition = -1;
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
     while (sortedRecords.hasNext()) {
       sortedRecords.loadNext();
       final int partition = sortedRecords.packedRecordPointer.getPartitionId();
@@ -200,8 +202,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
       final Object recordPage = taskMemoryManager.getPage(recordPointer);
       final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
-      int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
-      long recordReadPosition = recordOffsetInPage + 4; // skip over record length
+      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
+      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
       while (dataRemaining > 0) {
         final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
         Platform.copyMemory(
@@ -389,15 +391,16 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     }
 
     growPointerArrayIfNecessary();
-    // Need 4 bytes to store the record length.
-    final int required = length + 4;
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+    // Need 4 or 8 bytes to store the record length.
+    final int required = length + uaoSize;
     acquireNewPageIfNecessary(required);
 
     assert(currentPage != null);
     final Object base = currentPage.getBaseObject();
     final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
-    Platform.putInt(base, pageCursor, length);
-    pageCursor += 4;
+    UnsafeAlignedOffset.putSize(base, pageCursor, length);
+    pageCursor += uaoSize;
     Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
     pageCursor += length;
     inMemSorter.insertRecord(recordAddress, partitionId);
index 4fc19b1..399251b 100644 (file)
@@ -402,7 +402,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
 
     growPointerArrayIfNecessary();
     int uaoSize = UnsafeAlignedOffset.getUaoSize();
-    // Need 4 bytes to store the record length.
+    // Need 4 or 8 bytes to store the record length.
     final int required = length + uaoSize;
     acquireNewPageIfNecessary(required);