[SPARK-21501] Change CacheLoader to limit entries based on memory footprint
authorSanket Chintapalli <schintap@yahoo-inc.com>
Wed, 23 Aug 2017 16:51:11 +0000 (11:51 -0500)
committerTom Graves <tgraves@yahoo-inc.com>
Wed, 23 Aug 2017 16:51:11 +0000 (11:51 -0500)
Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.

https://issues.apache.org/jira/browse/SPARK-21501

Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.

Author: Sanket Chintapalli <schintap@yahoo-inc.com>

Closes #18940 from redsanket/SPARK-21501.

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
core/src/main/scala/org/apache/spark/SparkConf.scala
docs/configuration.md

index 88256b8..fa2ff42 100644 (file)
@@ -67,6 +67,10 @@ public class TransportConf {
     return conf.getInt(name, defaultValue);
   }
 
+  public String get(String name, String defaultValue) {
+    return conf.get(name, defaultValue);
+  }
+
   private String getConfKey(String suffix) {
     return "spark." + module + "." + suffix;
   }
index d7ec0e2..e639989 100644 (file)
@@ -33,6 +33,7 @@ import com.google.common.base.Objects;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
 import com.google.common.collect.Maps;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
@@ -104,7 +105,7 @@ public class ExternalShuffleBlockResolver {
       Executor directoryCleaner) throws IOException {
     this.conf = conf;
     this.registeredExecutorFile = registeredExecutorFile;
-    int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
+    String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
     CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
         new CacheLoader<File, ShuffleIndexInformation>() {
           public ShuffleIndexInformation load(File file) throws IOException {
@@ -112,7 +113,13 @@ public class ExternalShuffleBlockResolver {
           }
         };
     shuffleIndexCache = CacheBuilder.newBuilder()
-                                    .maximumSize(indexCacheEntries).build(indexCacheLoader);
+      .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
+      .weigher(new Weigher<File, ShuffleIndexInformation>() {
+        public int weigh(File file, ShuffleIndexInformation indexInfo) {
+          return indexInfo.getSize();
+        }
+      })
+      .build(indexCacheLoader);
     db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
     if (db != null) {
       executors = reloadRegisteredExecutors(db);
index 39ca9ba..386738e 100644 (file)
@@ -31,9 +31,10 @@ import java.nio.file.Files;
 public class ShuffleIndexInformation {
   /** offsets as long buffer */
   private final LongBuffer offsets;
+  private int size;
 
   public ShuffleIndexInformation(File indexFile) throws IOException {
-    int size = (int)indexFile.length();
+    size = (int)indexFile.length();
     ByteBuffer buffer = ByteBuffer.allocate(size);
     offsets = buffer.asLongBuffer();
     DataInputStream dis = null;
@@ -48,6 +49,14 @@ public class ShuffleIndexInformation {
   }
 
   /**
+   * Size of the index file
+   * @return size
+   */
+  public int getSize() {
+    return size;
+  }
+
+  /**
    * Get index offset for a particular reducer.
    */
   public ShuffleIndexRecord getIndex(int reduceId) {
index 715cfdc..e61f943 100644 (file)
@@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
       DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
         "Please use the new blacklisting options, spark.blacklist.*"),
       DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
-      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
+      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"),
+      DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
+        "Not used any more. Please use spark.shuffle.service.index.cache.size")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
index e7c0306..6e9fe59 100644 (file)
@@ -627,10 +627,10 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.shuffle.service.index.cache.entries</code></td>
-  <td>1024</td>
+  <td><code>spark.shuffle.service.index.cache.size</code></td>
+  <td>100m</td>
   <td>
-    Max number of entries to keep in the index cache of the shuffle service.
+    Cache entries limited to the specified memory footprint.
   </td>
 </tr>
 <tr>