[SPARK-24610] fix reading small files via wholeTextFiles
authorDhruve Ashar <dhruveashar@gmail.com>
Thu, 12 Jul 2018 20:36:02 +0000 (15:36 -0500)
committerThomas Graves <tgraves@apache.org>
Thu, 12 Jul 2018 20:36:02 +0000 (15:36 -0500)
## What changes were proposed in this pull request?
The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s being read using the `wholeTextFiles` method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form of `mapreduce.input.fileinputformat.split.minsize.per.node` or `mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an exception.

```java
java.io.IOException: Minimum split size pernode 123456 cannot be larger than maximum split size 9962
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
... 48 elided
`

This change checks the maxSplitSize against the minSplitSizePerNode and minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack`

## How was this patch tested?
Test manually setting the conf while launching the job and added unit test.

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #21601 from dhruve/bug/SPARK-24610.

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala [new file with mode: 0644]

index f47cd38..04c5c4b 100644 (file)
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
     val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
     val maxSplitSize = Math.ceil(totalLen * 1.0 /
       (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+    // For small files we need to ensure the min split size per node & rack <= maxSplitSize
+    val config = context.getConfiguration
+    val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+    val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+    if (maxSplitSize < minSplitSizePerNode) {
+      super.setMinSplitSizeNode(maxSplitSize)
+    }
+
+    if (maxSplitSize < minSplitSizePerRack) {
+      super.setMinSplitSizeRack(maxSplitSize)
+    }
     super.setMaxSplitSize(maxSplitSize)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
new file mode 100644 (file)
index 0000000..817dc08
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.{DataOutputStream, File, FileOutputStream}
+
+import scala.collection.immutable.IndexedSeq
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Tests the correctness of
+ * [[org.apache.spark.input.WholeTextFileInputFormat WholeTextFileInputFormat]]. A temporary
+ * directory containing files is created as fake input which is deleted in the end.
+ */
+class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
+  private var sc: SparkContext = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    val conf = new SparkConf()
+    sc = new SparkContext("local", "test", conf)
+  }
+
+  override def afterAll() {
+    try {
+      sc.stop()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
+                               compress: Boolean) = {
+    val path = s"${inputDir.toString}/$fileName"
+    val out = new DataOutputStream(new FileOutputStream(path))
+    out.write(contents, 0, contents.length)
+    out.close()
+  }
+
+  test("for small files minimum split size per node and per rack should be less than or equal to " +
+    "maximum split size.") {
+    var dir : File = null;
+    try {
+      dir = Utils.createTempDir()
+      logInfo(s"Local disk address is ${dir.toString}.")
+
+      // Set the minsize per node and rack to be larger than the size of the input file.
+      sc.hadoopConfiguration.setLong(
+        "mapreduce.input.fileinputformat.split.minsize.per.node", 123456)
+      sc.hadoopConfiguration.setLong(
+        "mapreduce.input.fileinputformat.split.minsize.per.rack", 123456)
+
+      WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) =>
+        createNativeFile(dir, filename, contents, false)
+      }
+      // ensure spark job runs successfully without exceptions from the CombineFileInputFormat
+      assert(sc.wholeTextFiles(dir.toString).count == 3)
+    } finally {
+      Utils.deleteRecursively(dir)
+    }
+  }
+}
+
+/**
+ * Files to be tested are defined here.
+ */
+object WholeTextFileInputFormatSuite {
+  private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
+
+  private val fileNames = Array("part-00000", "part-00001", "part-00002")
+  private val fileLengths = Array(10, 100, 1000)
+
+  private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
+    filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+  }.toMap
+}