TEZ-3954. Reduce Tez Shuffle Handler Memory needs for holding TezIndexRecords (Jonath...
authorKuhu Shukla <kshukla@yahoo-inc.com>
Fri, 6 Jul 2018 14:16:44 +0000 (09:16 -0500)
committerKuhu Shukla <kshukla@yahoo-inc.com>
Fri, 6 Jul 2018 14:16:44 +0000 (09:16 -0500)
tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java

index e22928e..24a821f 100644 (file)
@@ -1146,7 +1146,7 @@ public class ShuffleHandler extends AuxiliaryService {
         try {
           MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
           if (info == null) {
-            info = getMapOutputInfo(reduceContext.dagId, mapId,
+            info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(),
                 reduceContext.getJobId(),
                 reduceContext.getUser());
           }
@@ -1204,7 +1204,7 @@ public class ShuffleHandler extends AuxiliaryService {
     }
 
     protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                             String jobId,
+                                             Range reduceRange, String jobId,
                                              String user) throws IOException {
       AttemptPathInfo pathInfo;
       try {
@@ -1233,8 +1233,13 @@ public class ShuffleHandler extends AuxiliaryService {
             pathInfo.indexPath);
       }
 
+      MapOutputInfo outputInfo;
+      if (reduceRange.first == reduceRange.last) {
+        outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord.getIndex(reduceRange.first), reduceRange);
+      } else {
 
-      MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord);
+        outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord, reduceRange);
+      }
       return outputInfo;
     }
 
@@ -1262,12 +1267,12 @@ public class ShuffleHandler extends AuxiliaryService {
       int reduceCountVSize = WritableUtils.getVIntSize(reduceRange.getLast() - reduceRange.getFirst() + 1);
       for (String mapId : mapIds) {
         contentLength += reduceCountVSize;
-        MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user);
+        MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, reduceRange, jobId, user);
         if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
           mapOutputInfoMap.put(mapId, outputInfo);
         }
         for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
-          TezIndexRecord indexRecord = outputInfo.spillRecord.getIndex(reduce);
+          TezIndexRecord indexRecord = outputInfo.getIndex(reduce);
           ShuffleHeader header =
               new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce);
 
@@ -1295,12 +1300,37 @@ public class ShuffleHandler extends AuxiliaryService {
     }
 
     class MapOutputInfo {
-      final Path mapOutputFileName;
-      final TezSpillRecord spillRecord;
+      private final Path mapOutputFileName;
+      private TezSpillRecord spillRecord;
+      private TezIndexRecord indexRecord;
+      private final Range reduceRange;
+
+      MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord, Range reduceRange) {
+        this.mapOutputFileName = mapOutputFileName;
+        this.indexRecord = indexRecord;
+        this.reduceRange = reduceRange;
+      }
 
-      MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord) {
+      MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord, Range reduceRange) {
         this.mapOutputFileName = mapOutputFileName;
         this.spillRecord = spillRecord;
+        this.reduceRange = reduceRange;
+      }
+
+      TezIndexRecord getIndex(int index) {
+        if (index < reduceRange.first || index > reduceRange.last) {
+          throw new IllegalArgumentException("Reduce Index: " + index + " out of range for " + mapOutputFileName);
+        }
+        if (spillRecord != null) {
+          return spillRecord.getIndex(index);
+        } else {
+          return indexRecord;
+        }
+      }
+
+      public void finish() {
+        spillRecord = null;
+        indexRecord = null;
       }
     }
 
@@ -1356,7 +1386,7 @@ public class ShuffleHandler extends AuxiliaryService {
       WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1);
       ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
       for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
-        TezIndexRecord index = outputInfo.spillRecord.getIndex(reduce);
+        TezIndexRecord index = outputInfo.getIndex(reduce);
         // Records are only valid if they have a non-zero part length
         if (index.getPartLength() != 0) {
           if (firstIndex == null) {
@@ -1368,6 +1398,8 @@ public class ShuffleHandler extends AuxiliaryService {
         ShuffleHeader header = new ShuffleHeader(mapId, index.getPartLength(), index.getRawLength(), reduce);
         DataOutputBuffer dob = new DataOutputBuffer();
         header.write(dob);
+        // Free the memory needed to store the spill and index records
+        outputInfo.finish();
         ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
       }
 
index 11c92fb..7d53abc 100644 (file)
@@ -111,7 +111,7 @@ public class TestShuffleHandler {
         }
         @Override
         protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                 String jobId,
+                                                 Range reduceRange, String jobId,
                                                  String user)
             throws IOException {
           // Do nothing.
@@ -236,7 +236,7 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                   String jobId,
+                                                   Range reduceRange, String jobId,
                                                    String user)
               throws IOException {
             return null;
@@ -346,7 +346,7 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                   String jobId, String user)
+                                                   Range reduceRange, String jobId, String user)
               throws IOException {
             return null;
           }
@@ -568,7 +568,7 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                   String jobId,
+                                                   Range reduceRange, String jobId,
                                                    String user)
               throws IOException {
             // Do nothing.