[SPARK-24691][SQL] Dispatch the type support check in FileFormat implementation
[spark.git] / sql / core / src / test / scala / org / apache / spark / sql / FileBasedDataSourceSuite.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql
19
20 import java.io.{File, FileNotFoundException}
21 import java.util.Locale
22
23 import org.apache.hadoop.fs.Path
24 import org.scalatest.BeforeAndAfterAll
25
26 import org.apache.spark.SparkException
27 import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT}
28 import org.apache.spark.sql.functions._
29 import org.apache.spark.sql.internal.SQLConf
30 import org.apache.spark.sql.test.SharedSQLContext
31 import org.apache.spark.sql.types._
32
33
34 class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll {
35   import testImplicits._
36
37   override def beforeAll(): Unit = {
38     super.beforeAll()
39     spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
40   }
41
42   override def afterAll(): Unit = {
43     try {
44       spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
45     } finally {
46       super.afterAll()
47     }
48   }
49
50   private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
51   private val nameWithSpecialChars = "sp&cial%c hars"
52
53   allFileBasedDataSources.foreach { format =>
54     test(s"Writing empty datasets should not fail - $format") {
55       withTempPath { dir =>
56         Seq("str").toDS().limit(0).write.format(format).save(dir.getCanonicalPath)
57       }
58     }
59   }
60
61   // `TEXT` data source always has a single column whose name is `value`.
62   allFileBasedDataSources.filterNot(_ == "text").foreach { format =>
63     test(s"SPARK-23072 Write and read back unicode column names - $format") {
64       withTempPath { path =>
65         val dir = path.getCanonicalPath
66
67         // scalastyle:off nonascii
68         val df = Seq("a").toDF("한글")
69         // scalastyle:on nonascii
70
71         df.write.format(format).option("header", "true").save(dir)
72         val answerDf = spark.read.format(format).option("header", "true").load(dir)
73
74         assert(df.schema.sameType(answerDf.schema))
75         checkAnswer(df, answerDf)
76       }
77     }
78   }
79
80   // Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
81   // `TEXT` data source always has a single column whose name is `value`.
82   Seq("orc", "parquet").foreach { format =>
83     test(s"SPARK-15474 Write and read back non-empty schema with empty dataframe - $format") {
84       withTempPath { file =>
85         val path = file.getCanonicalPath
86         val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
87         emptyDf.write.format(format).save(path)
88
89         val df = spark.read.format(format).load(path)
90         assert(df.schema.sameType(emptyDf.schema))
91         checkAnswer(df, emptyDf)
92       }
93     }
94   }
95
96   Seq("orc", "parquet").foreach { format =>
97     test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") {
98       withTempPath { outputPath =>
99         val df = spark.emptyDataFrame.select(lit(1).as("i"))
100         df.write.format(format).save(outputPath.toString)
101         val partFiles = outputPath.listFiles()
102           .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
103         assert(partFiles.length === 1)
104
105         // Now read the file.
106         val df1 = spark.read.format(format).load(outputPath.toString)
107         checkAnswer(df1, Seq.empty[Row])
108         assert(df1.schema.equals(df.schema.asNullable))
109       }
110     }
111   }
112
113   allFileBasedDataSources.foreach { format =>
114     test(s"SPARK-23372 error while writing empty schema files using $format") {
115       withTempPath { outputPath =>
116         val errMsg = intercept[AnalysisException] {
117           spark.emptyDataFrame.write.format(format).save(outputPath.toString)
118         }
119         assert(errMsg.getMessage.contains(
120           "Datasource does not support writing empty or nested empty schemas"))
121       }
122
123       // Nested empty schema
124       withTempPath { outputPath =>
125         val schema = StructType(Seq(
126           StructField("a", IntegerType),
127           StructField("b", StructType(Nil)),
128           StructField("c", IntegerType)
129         ))
130         val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
131         val errMsg = intercept[AnalysisException] {
132           df.write.format(format).save(outputPath.toString)
133         }
134         assert(errMsg.getMessage.contains(
135           "Datasource does not support writing empty or nested empty schemas"))
136       }
137     }
138   }
139
140   allFileBasedDataSources.foreach { format =>
141     test(s"SPARK-22146 read files containing special characters using $format") {
142       withTempDir { dir =>
143         val tmpFile = s"$dir/$nameWithSpecialChars"
144         spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
145         val fileContent = spark.read.format(format).load(tmpFile)
146         checkAnswer(fileContent, Seq(Row("a"), Row("b")))
147       }
148     }
149   }
150
151   // Separate test case for formats that support multiLine as an option.
152   Seq("json", "csv").foreach { format =>
153     test("SPARK-23148 read files containing special characters " +
154       s"using $format with multiline enabled") {
155       withTempDir { dir =>
156         val tmpFile = s"$dir/$nameWithSpecialChars"
157         spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
158         val reader = spark.read.format(format).option("multiLine", true)
159         val fileContent = reader.load(tmpFile)
160         checkAnswer(fileContent, Seq(Row("a"), Row("b")))
161       }
162     }
163   }
164
165   allFileBasedDataSources.foreach { format =>
166     testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
167       def testIgnoreMissingFiles(): Unit = {
168         withTempDir { dir =>
169           val basePath = dir.getCanonicalPath
170
171           Seq("0").toDF("a").write.format(format).save(new Path(basePath, "first").toString)
172           Seq("1").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
173
174           val thirdPath = new Path(basePath, "third")
175           val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
176           Seq("2").toDF("a").write.format(format).save(thirdPath.toString)
177           val files = fs.listStatus(thirdPath).filter(_.isFile).map(_.getPath)
178
179           val df = spark.read.format(format).load(
180             new Path(basePath, "first").toString,
181             new Path(basePath, "second").toString,
182             new Path(basePath, "third").toString)
183
184           // Make sure all data files are deleted and can't be opened.
185           files.foreach(f => fs.delete(f, false))
186           assert(fs.delete(thirdPath, true))
187           for (f <- files) {
188             intercept[FileNotFoundException](fs.open(f))
189           }
190
191           checkAnswer(df, Seq(Row("0"), Row("1")))
192         }
193       }
194
195       withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
196         testIgnoreMissingFiles()
197       }
198
199       withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
200         val exception = intercept[SparkException] {
201           testIgnoreMissingFiles()
202         }
203         assert(exception.getMessage().contains("does not exist"))
204       }
205     }
206   }
207
208   // Text file format only supports string type
209   test("SPARK-24691 error handling for unsupported types - text") {
210     withTempDir { dir =>
211       // write path
212       val textDir = new File(dir, "text").getCanonicalPath
213       var msg = intercept[AnalysisException] {
214         Seq(1).toDF.write.text(textDir)
215       }.getMessage
216       assert(msg.contains("Text data source does not support int data type"))
217
218       msg = intercept[AnalysisException] {
219         Seq(1.2).toDF.write.text(textDir)
220       }.getMessage
221       assert(msg.contains("Text data source does not support double data type"))
222
223       msg = intercept[AnalysisException] {
224         Seq(true).toDF.write.text(textDir)
225       }.getMessage
226       assert(msg.contains("Text data source does not support boolean data type"))
227
228       msg = intercept[AnalysisException] {
229         Seq(1).toDF("a").selectExpr("struct(a)").write.text(textDir)
230       }.getMessage
231       assert(msg.contains("Text data source does not support struct<a:int> data type"))
232
233       msg = intercept[AnalysisException] {
234         Seq((Map("Tesla" -> 3))).toDF("cars").write.mode("overwrite").text(textDir)
235       }.getMessage
236       assert(msg.contains("Text data source does not support map<string,int> data type"))
237
238       msg = intercept[AnalysisException] {
239         Seq((Array("Tesla", "Chevy", "Ford"))).toDF("brands")
240           .write.mode("overwrite").text(textDir)
241       }.getMessage
242       assert(msg.contains("Text data source does not support array<string> data type"))
243
244       // read path
245       Seq("aaa").toDF.write.mode("overwrite").text(textDir)
246       msg = intercept[AnalysisException] {
247         val schema = StructType(StructField("a", IntegerType, true) :: Nil)
248         spark.read.schema(schema).text(textDir).collect()
249       }.getMessage
250       assert(msg.contains("Text data source does not support int data type"))
251
252       msg = intercept[AnalysisException] {
253         val schema = StructType(StructField("a", DoubleType, true) :: Nil)
254         spark.read.schema(schema).text(textDir).collect()
255       }.getMessage
256       assert(msg.contains("Text data source does not support double data type"))
257
258       msg = intercept[AnalysisException] {
259         val schema = StructType(StructField("a", BooleanType, true) :: Nil)
260         spark.read.schema(schema).text(textDir).collect()
261       }.getMessage
262       assert(msg.contains("Text data source does not support boolean data type"))
263     }
264   }
265
266   // Unsupported data types of csv, json, orc, and parquet are as follows;
267   //  csv -> R/W: Null, Array, Map, Struct
268   //  json -> R/W: Interval
269   //  orc -> R/W: Interval, W: Null
270   //  parquet -> R/W: Interval, Null
271   test("SPARK-24204 error handling for unsupported Array/Map/Struct types - csv") {
272     withTempDir { dir =>
273       val csvDir = new File(dir, "csv").getCanonicalPath
274       var msg = intercept[AnalysisException] {
275         Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
276       }.getMessage
277       assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
278
279       msg = intercept[AnalysisException] {
280         val schema = StructType.fromDDL("a struct<b: Int>")
281         spark.range(1).write.mode("overwrite").csv(csvDir)
282         spark.read.schema(schema).csv(csvDir).collect()
283       }.getMessage
284       assert(msg.contains("CSV data source does not support struct<b:int> data type"))
285
286       msg = intercept[AnalysisException] {
287         Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.mode("overwrite").csv(csvDir)
288       }.getMessage
289       assert(msg.contains("CSV data source does not support map<string,int> data type"))
290
291       msg = intercept[AnalysisException] {
292         val schema = StructType.fromDDL("a map<int, int>")
293         spark.range(1).write.mode("overwrite").csv(csvDir)
294         spark.read.schema(schema).csv(csvDir).collect()
295       }.getMessage
296       assert(msg.contains("CSV data source does not support map<int,int> data type"))
297
298       msg = intercept[AnalysisException] {
299         Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands")
300           .write.mode("overwrite").csv(csvDir)
301       }.getMessage
302       assert(msg.contains("CSV data source does not support array<string> data type"))
303
304       msg = intercept[AnalysisException] {
305          val schema = StructType.fromDDL("a array<int>")
306          spark.range(1).write.mode("overwrite").csv(csvDir)
307          spark.read.schema(schema).csv(csvDir).collect()
308        }.getMessage
309       assert(msg.contains("CSV data source does not support array<int> data type"))
310
311       msg = intercept[AnalysisException] {
312         Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors")
313           .write.mode("overwrite").csv(csvDir)
314       }.getMessage
315       assert(msg.contains("CSV data source does not support mydensevector data type"))
316
317       msg = intercept[AnalysisException] {
318         val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil)
319         spark.range(1).write.mode("overwrite").csv(csvDir)
320         spark.read.schema(schema).csv(csvDir).collect()
321       }.getMessage
322       assert(msg.contains("CSV data source does not support mydensevector data type."))
323     }
324   }
325
326   test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") {
327     withTempDir { dir =>
328       val tempDir = new File(dir, "files").getCanonicalPath
329
330       // write path
331       Seq("csv", "json", "parquet", "orc").foreach { format =>
332         var msg = intercept[AnalysisException] {
333           sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir)
334         }.getMessage
335         assert(msg.contains("Cannot save interval data type into external storage."))
336
337         msg = intercept[AnalysisException] {
338           spark.udf.register("testType", () => new IntervalData())
339           sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
340         }.getMessage
341         assert(msg.toLowerCase(Locale.ROOT)
342           .contains(s"$format data source does not support interval data type."))
343       }
344
345       // read path
346       Seq("parquet", "csv").foreach { format =>
347         var msg = intercept[AnalysisException] {
348           val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
349           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
350           spark.read.schema(schema).format(format).load(tempDir).collect()
351         }.getMessage
352         assert(msg.toLowerCase(Locale.ROOT)
353           .contains(s"$format data source does not support calendarinterval data type."))
354
355         msg = intercept[AnalysisException] {
356           val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil)
357           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
358           spark.read.schema(schema).format(format).load(tempDir).collect()
359         }.getMessage
360         assert(msg.toLowerCase(Locale.ROOT)
361           .contains(s"$format data source does not support interval data type."))
362       }
363     }
364   }
365
366   test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") {
367     withTempDir { dir =>
368       val tempDir = new File(dir, "files").getCanonicalPath
369
370       Seq("orc").foreach { format =>
371         // write path
372         var msg = intercept[AnalysisException] {
373           sql("select null").write.format(format).mode("overwrite").save(tempDir)
374         }.getMessage
375         assert(msg.toLowerCase(Locale.ROOT)
376           .contains(s"$format data source does not support null data type."))
377
378         msg = intercept[AnalysisException] {
379           spark.udf.register("testType", () => new NullData())
380           sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
381         }.getMessage
382         assert(msg.toLowerCase(Locale.ROOT)
383           .contains(s"$format data source does not support null data type."))
384
385         // read path
386         // We expect the types below should be passed for backward-compatibility
387
388         // Null type
389         var schema = StructType(StructField("a", NullType, true) :: Nil)
390         spark.range(1).write.format(format).mode("overwrite").save(tempDir)
391         spark.read.schema(schema).format(format).load(tempDir).collect()
392
393         // UDT having null data
394         schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
395         spark.range(1).write.format(format).mode("overwrite").save(tempDir)
396         spark.read.schema(schema).format(format).load(tempDir).collect()
397       }
398
399       Seq("parquet", "csv").foreach { format =>
400         // write path
401         var msg = intercept[AnalysisException] {
402           sql("select null").write.format(format).mode("overwrite").save(tempDir)
403         }.getMessage
404         assert(msg.toLowerCase(Locale.ROOT)
405           .contains(s"$format data source does not support null data type."))
406
407         msg = intercept[AnalysisException] {
408           spark.udf.register("testType", () => new NullData())
409           sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
410         }.getMessage
411         assert(msg.toLowerCase(Locale.ROOT)
412           .contains(s"$format data source does not support null data type."))
413
414         // read path
415         msg = intercept[AnalysisException] {
416           val schema = StructType(StructField("a", NullType, true) :: Nil)
417           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
418           spark.read.schema(schema).format(format).load(tempDir).collect()
419         }.getMessage
420         assert(msg.toLowerCase(Locale.ROOT)
421           .contains(s"$format data source does not support null data type."))
422
423         msg = intercept[AnalysisException] {
424           val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
425           spark.range(1).write.format(format).mode("overwrite").save(tempDir)
426           spark.read.schema(schema).format(format).load(tempDir).collect()
427         }.getMessage
428         assert(msg.toLowerCase(Locale.ROOT)
429           .contains(s"$format data source does not support null data type."))
430       }
431     }
432   }
433 }
434
435 object TestingUDT {
436
437   @SQLUserDefinedType(udt = classOf[IntervalUDT])
438   class IntervalData extends Serializable
439
440   class IntervalUDT extends UserDefinedType[IntervalData] {
441
442     override def sqlType: DataType = CalendarIntervalType
443     override def serialize(obj: IntervalData): Any =
444       throw new NotImplementedError("Not implemented")
445     override def deserialize(datum: Any): IntervalData =
446       throw new NotImplementedError("Not implemented")
447     override def userClass: Class[IntervalData] = classOf[IntervalData]
448   }
449
450   @SQLUserDefinedType(udt = classOf[NullUDT])
451   private[sql] class NullData extends Serializable
452
453   private[sql] class NullUDT extends UserDefinedType[NullData] {
454
455     override def sqlType: DataType = NullType
456     override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented")
457     override def deserialize(datum: Any): NullData =
458       throw new NotImplementedError("Not implemented")
459     override def userClass: Class[NullData] = classOf[NullData]
460   }
461 }