86f9647b4ac4cb60e7626d33d85cfc480396bb04
[spark.git] / sql / core / src / test / scala / org / apache / spark / sql / FileBasedDataSourceSuite.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql
19
20 import java.io.{File, FileNotFoundException}
21 import java.util.Locale
22
23 import org.apache.hadoop.fs.Path
24 import org.scalatest.BeforeAndAfterAll
25
26 import org.apache.spark.SparkException
27 import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT}
28 import org.apache.spark.sql.functions._
29 import org.apache.spark.sql.internal.SQLConf
30 import org.apache.spark.sql.test.SharedSQLContext
31 import org.apache.spark.sql.types._
32
33
34 class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll {
35   import testImplicits._
36
37   override def beforeAll(): Unit = {
38     super.beforeAll()
39     spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
40   }
41
42   override def afterAll(): Unit = {
43     try {
44       spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
45     } finally {
46       super.afterAll()
47     }
48   }
49
50   private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
51   private val nameWithSpecialChars = "sp&cial%c hars"
52
53   allFileBasedDataSources.foreach { format =>
54     test(s"Writing empty datasets should not fail - $format") {
55       withTempPath { dir =>
56         Seq("str").toDS().limit(0).write.format(format).save(dir.getCanonicalPath)
57       }
58     }
59   }
60
61   // `TEXT` data source always has a single column whose name is `value`.
62   allFileBasedDataSources.filterNot(_ == "text").foreach { format =>
63     test(s"SPARK-23072 Write and read back unicode column names - $format") {
64       withTempPath { path =>
65         val dir = path.getCanonicalPath
66
67         // scalastyle:off nonascii
68         val df = Seq("a").toDF("한글")
69         // scalastyle:on nonascii
70
71         df.write.format(format).option("header", "true").save(dir)
72         val answerDf = spark.read.format(format).option("header", "true").load(dir)
73
74         assert(df.schema.sameType(answerDf.schema))
75         checkAnswer(df, answerDf)
76       }
77     }
78   }
79
80   // Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
81   // `TEXT` data source always has a single column whose name is `value`.
82   Seq("orc", "parquet").foreach { format =>
83     test(s"SPARK-15474 Write and read back non-empty schema with empty dataframe - $format") {
84       withTempPath { file =>
85         val path = file.getCanonicalPath
86         val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
87         emptyDf.write.format(format).save(path)
88
89         val df = spark.read.format(format).load(path)
90         assert(df.schema.sameType(emptyDf.schema))
91         checkAnswer(df, emptyDf)
92       }
93     }
94   }
95
96   Seq("orc", "parquet").foreach { format =>
97     test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") {
98       withTempPath { outputPath =>
99         val df = spark.emptyDataFrame.select(lit(1).as("i"))
100         df.write.format(format).save(outputPath.toString)
101         val partFiles = outputPath.listFiles()
102           .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
103         assert(partFiles.length === 1)
104
105         // Now read the file.
106         val df1 = spark.read.format(format).load(outputPath.toString)
107         checkAnswer(df1, Seq.empty[Row])
108         assert(df1.schema.equals(df.schema.asNullable))
109       }
110     }
111   }
112
113   allFileBasedDataSources.foreach { format =>
114     test(s"SPARK-23372 error while writing empty schema files using $format") {
115       withTempPath { outputPath =>
116         val errMsg = intercept[AnalysisException] {
117           spark.emptyDataFrame.write.format(format).save(outputPath.toString)
118         }
119         assert(errMsg.getMessage.contains(
120           "Datasource does not support writing empty or nested empty schemas"))
121       }
122
123       // Nested empty schema
124       withTempPath { outputPath =>
125         val schema = StructType(Seq(
126           StructField("a", IntegerType),
127           StructField("b", StructType(Nil)),
128           StructField("c", IntegerType)
129         ))
130         val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
131         val errMsg = intercept[AnalysisException] {
132           df.write.format(format).save(outputPath.toString)
133         }
134         assert(errMsg.getMessage.contains(
135           "Datasource does not support writing empty or nested empty schemas"))
136       }
137     }
138   }
139
140   allFileBasedDataSources.foreach { format =>
141     test(s"SPARK-22146 read files containing special characters using $format") {
142       withTempDir { dir =>
143         val tmpFile = s"$dir/$nameWithSpecialChars"
144         spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
145         val fileContent = spark.read.format(format).load(tmpFile)
146         checkAnswer(fileContent, Seq(Row("a"), Row("b")))
147       }
148     }
149   }
150
151   // Separate test case for formats that support multiLine as an option.
152   Seq("json", "csv").foreach { format =>
153     test("SPARK-23148 read files containing special characters " +
154       s"using $format with multiline enabled") {
155       withTempDir { dir =>
156         val tmpFile = s"$dir/$nameWithSpecialChars"
157         spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
158         val reader = spark.read.format(format).option("multiLine", true)
159         val fileContent = reader.load(tmpFile)
160         checkAnswer(fileContent, Seq(Row("a"), Row("b")))
161       }
162     }
163   }
164
165   allFileBasedDataSources.foreach { format =>
166     testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
167       def testIgnoreMissingFiles(): Unit = {
168         withTempDir { dir =>
169           val basePath = dir.getCanonicalPath
170
171           Seq("0").toDF("a").write.format(format).save(new Path(basePath, "first").toString)
172           Seq("1").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
173
174           val thirdPath = new Path(basePath, "third")
175           val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
176           Seq("2").toDF("a").write.format(format).save(thirdPath.toString)
177           val files = fs.listStatus(thirdPath).filter(_.isFile).map(_.getPath)
178
179           val df = spark.read.format(format).load(
180             new Path(basePath, "first").toString,
181             new Path(basePath, "second").toString,
182             new Path(basePath, "third").toString)
183
184           // Make sure all data files are deleted and can't be opened.
185           files.foreach(f => fs.delete(f, false))
186           assert(fs.delete(thirdPath, true))
187           for (f <- files) {
188             intercept[FileNotFoundException](fs.open(f))
189           }
190
191           checkAnswer(df, Seq(Row("0"), Row("1")))
192         }
193       }
194
195       withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
196         testIgnoreMissingFiles()
197       }
198
199       withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
200         val exception = intercept[SparkException] {
201           testIgnoreMissingFiles()
202         }
203         assert(exception.getMessage().contains("does not exist"))
204       }
205     }
206   }
207
208   // Unsupported data types of csv, json, orc, and parquet are as follows;
209   //  csv -> R/W: Interval, Null, Array, Map, Struct
210   //  json -> W: Interval
211   //  orc -> W: Interval, Null
212   //  parquet -> R/W: Interval, Null
213   test("SPARK-24204 error handling for unsupported Array/Map/Struct types - csv") {
214     withTempDir { dir =>
215       val csvDir = new File(dir, "csv").getCanonicalPath
216       var msg = intercept[UnsupportedOperationException] {
217         Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
218       }.getMessage
219       assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
220
221       msg = intercept[UnsupportedOperationException] {
222         val schema = StructType.fromDDL("a struct<b: Int>")
223         spark.range(1).write.mode("overwrite").csv(csvDir)
224         spark.read.schema(schema).csv(csvDir).collect()
225       }.getMessage
226       assert(msg.contains("CSV data source does not support struct<b:int> data type"))
227
228       msg = intercept[UnsupportedOperationException] {
229         Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.mode("overwrite").csv(csvDir)
230       }.getMessage
231       assert(msg.contains("CSV data source does not support map<string,int> data type"))
232
233       msg = intercept[UnsupportedOperationException] {
234         val schema = StructType.fromDDL("a map<int, int>")
235         spark.range(1).write.mode("overwrite").csv(csvDir)
236         spark.read.schema(schema).csv(csvDir).collect()
237       }.getMessage
238       assert(msg.contains("CSV data source does not support map<int,int> data type"))
239
240       msg = intercept[UnsupportedOperationException] {
241         Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands")
242           .write.mode("overwrite").csv(csvDir)
243       }.getMessage
244       assert(msg.contains("CSV data source does not support array<string> data type"))
245
246       msg = intercept[UnsupportedOperationException] {
247          val schema = StructType.fromDDL("a array<int>")
248          spark.range(1).write.mode("overwrite").csv(csvDir)
249          spark.read.schema(schema).csv(csvDir).collect()
250        }.getMessage
251       assert(msg.contains("CSV data source does not support array<int> data type"))
252
253       msg = intercept[UnsupportedOperationException] {
254         Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors")
255           .write.mode("overwrite").csv(csvDir)
256       }.getMessage
257       assert(msg.contains("CSV data source does not support array<double> data type"))
258
259       msg = intercept[UnsupportedOperationException] {
260         val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil)
261         spark.range(1).write.mode("overwrite").csv(csvDir)
262         spark.read.schema(schema).csv(csvDir).collect()
263       }.getMessage
264       assert(msg.contains("CSV data source does not support array<double> data type."))
265     }
266   }
267
268   test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") {
269     withTempDir { dir =>
270       val tempDir = new File(dir, "files").getCanonicalPath
271
272       // write path
273       Seq("csv", "json", "parquet", "orc").foreach { format =>
274         var msg = intercept[AnalysisException] {
275           sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir)
276         }.getMessage
277         assert(msg.contains("Cannot save interval data type into external storage."))
278
279         msg = intercept[UnsupportedOperationException] {
280           spark.udf.register("testType", () => new IntervalData())
281           sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
282         }.getMessage
283         assert(msg.toLowerCase(Locale.ROOT)
284           .contains(s"$format data source does not support calendarinterval data type."))
285       }
286
287       // read path
288       Seq("parquet", "csv").foreach { format =>
289         var msg = intercept[UnsupportedOperationException] {
290           val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
291           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
292           spark.read.schema(schema).format(format).load(tempDir).collect()
293         }.getMessage
294         assert(msg.toLowerCase(Locale.ROOT)
295           .contains(s"$format data source does not support calendarinterval data type."))
296
297         msg = intercept[UnsupportedOperationException] {
298           val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil)
299           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
300           spark.read.schema(schema).format(format).load(tempDir).collect()
301         }.getMessage
302         assert(msg.toLowerCase(Locale.ROOT)
303           .contains(s"$format data source does not support calendarinterval data type."))
304       }
305
306       // We expect the types below should be passed for backward-compatibility
307       Seq("orc", "json").foreach { format =>
308         // Interval type
309         var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
310         spark.range(1).write.format(format).mode("overwrite").save(tempDir)
311         spark.read.schema(schema).format(format).load(tempDir).collect()
312
313         // UDT having interval data
314         schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil)
315         spark.range(1).write.format(format).mode("overwrite").save(tempDir)
316         spark.read.schema(schema).format(format).load(tempDir).collect()
317       }
318     }
319   }
320
321   test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") {
322     withTempDir { dir =>
323       val tempDir = new File(dir, "files").getCanonicalPath
324
325       Seq("orc").foreach { format =>
326         // write path
327         var msg = intercept[UnsupportedOperationException] {
328           sql("select null").write.format(format).mode("overwrite").save(tempDir)
329         }.getMessage
330         assert(msg.toLowerCase(Locale.ROOT)
331           .contains(s"$format data source does not support null data type."))
332
333         msg = intercept[UnsupportedOperationException] {
334           spark.udf.register("testType", () => new NullData())
335           sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
336         }.getMessage
337         assert(msg.toLowerCase(Locale.ROOT)
338           .contains(s"$format data source does not support null data type."))
339
340         // read path
341         // We expect the types below should be passed for backward-compatibility
342
343         // Null type
344         var schema = StructType(StructField("a", NullType, true) :: Nil)
345         spark.range(1).write.format(format).mode("overwrite").save(tempDir)
346         spark.read.schema(schema).format(format).load(tempDir).collect()
347
348         // UDT having null data
349         schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
350         spark.range(1).write.format(format).mode("overwrite").save(tempDir)
351         spark.read.schema(schema).format(format).load(tempDir).collect()
352       }
353
354       Seq("parquet", "csv").foreach { format =>
355         // write path
356         var msg = intercept[UnsupportedOperationException] {
357           sql("select null").write.format(format).mode("overwrite").save(tempDir)
358         }.getMessage
359         assert(msg.toLowerCase(Locale.ROOT)
360           .contains(s"$format data source does not support null data type."))
361
362         msg = intercept[UnsupportedOperationException] {
363           spark.udf.register("testType", () => new NullData())
364           sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
365         }.getMessage
366         assert(msg.toLowerCase(Locale.ROOT)
367           .contains(s"$format data source does not support null data type."))
368
369         // read path
370         msg = intercept[UnsupportedOperationException] {
371           val schema = StructType(StructField("a", NullType, true) :: Nil)
372           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
373           spark.read.schema(schema).format(format).load(tempDir).collect()
374         }.getMessage
375         assert(msg.toLowerCase(Locale.ROOT)
376           .contains(s"$format data source does not support null data type."))
377
378         msg = intercept[UnsupportedOperationException] {
379           val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
380           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
381           spark.read.schema(schema).format(format).load(tempDir).collect()
382         }.getMessage
383         assert(msg.toLowerCase(Locale.ROOT)
384           .contains(s"$format data source does not support null data type."))
385       }
386     }
387   }
388 }
389
390 object TestingUDT {
391
392   @SQLUserDefinedType(udt = classOf[IntervalUDT])
393   class IntervalData extends Serializable
394
395   class IntervalUDT extends UserDefinedType[IntervalData] {
396
397     override def sqlType: DataType = CalendarIntervalType
398     override def serialize(obj: IntervalData): Any =
399       throw new NotImplementedError("Not implemented")
400     override def deserialize(datum: Any): IntervalData =
401       throw new NotImplementedError("Not implemented")
402     override def userClass: Class[IntervalData] = classOf[IntervalData]
403   }
404
405   @SQLUserDefinedType(udt = classOf[NullUDT])
406   private[sql] class NullData extends Serializable
407
408   private[sql] class NullUDT extends UserDefinedType[NullData] {
409
410     override def sqlType: DataType = NullType
411     override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented")
412     override def deserialize(datum: Any): NullData =
413       throw new NotImplementedError("Not implemented")
414     override def userClass: Class[NullData] = classOf[NullData]
415   }
416 }