[SPARK-22938][SQL][FOLLOWUP] Assert that SQLConf.get is accessed only on the driver
[spark.git] / sql / core / src / test / scala / org / apache / spark / sql / execution / datasources / json / JsonSuite.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql.execution.datasources.json
19
20 import java.io.{File, FileOutputStream, StringWriter}
21 import java.nio.charset.{StandardCharsets, UnsupportedCharsetException}
22 import java.nio.file.{Files, Paths, StandardOpenOption}
23 import java.sql.{Date, Timestamp}
24 import java.util.Locale
25
26 import com.fasterxml.jackson.core.JsonFactory
27 import org.apache.hadoop.fs.{Path, PathFilter}
28 import org.apache.hadoop.io.SequenceFile.CompressionType
29 import org.apache.hadoop.io.compress.GzipCodec
30
31 import org.apache.spark.{SparkException, TestUtils}
32 import org.apache.spark.rdd.RDD
33 import org.apache.spark.sql.{functions => F, _}
34 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
35 import org.apache.spark.sql.catalyst.util.DateTimeUtils
36 import org.apache.spark.sql.execution.ExternalRDD
37 import org.apache.spark.sql.execution.datasources.DataSource
38 import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType
39 import org.apache.spark.sql.internal.SQLConf
40 import org.apache.spark.sql.test.SharedSQLContext
41 import org.apache.spark.sql.types._
42 import org.apache.spark.util.Utils
43
44 class TestFileFilter extends PathFilter {
45   override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
46 }
47
48 class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
49   import testImplicits._
50
51   def testFile(fileName: String): String = {
52     Thread.currentThread().getContextClassLoader.getResource(fileName).toString
53   }
54
55   test("Type promotion") {
56     def checkTypePromotion(expected: Any, actual: Any) {
57       assert(expected.getClass == actual.getClass,
58         s"Failed to promote ${actual.getClass} to ${expected.getClass}.")
59       assert(expected == actual,
60         s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " +
61           s"${expected}(${expected.getClass}).")
62     }
63
64     val factory = new JsonFactory()
65     def enforceCorrectType(value: Any, dataType: DataType): Any = {
66       val writer = new StringWriter()
67       Utils.tryWithResource(factory.createGenerator(writer)) { generator =>
68         generator.writeObject(value)
69         generator.flush()
70       }
71
72       val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
73       val dummySchema = StructType(Seq.empty)
74       val parser = new JacksonParser(dummySchema, dummyOption)
75
76       Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
77         jsonParser.nextToken()
78         val converter = parser.makeConverter(dataType)
79         converter.apply(jsonParser)
80       }
81     }
82
83     val intNumber: Int = 2147483647
84     checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
85     checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType))
86     checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType))
87     checkTypePromotion(
88       Decimal(intNumber), enforceCorrectType(intNumber, DecimalType.SYSTEM_DEFAULT))
89
90     val longNumber: Long = 9223372036854775807L
91     checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
92     checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType))
93     checkTypePromotion(
94       Decimal(longNumber), enforceCorrectType(longNumber, DecimalType.SYSTEM_DEFAULT))
95
96     val doubleNumber: Double = 1.7976931348623157E308d
97     checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
98
99     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber * 1000L)),
100         enforceCorrectType(intNumber, TimestampType))
101     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
102         enforceCorrectType(intNumber.toLong, TimestampType))
103     val strTime = "2014-09-30 12:34:56"
104     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
105         enforceCorrectType(strTime, TimestampType))
106
107     val strDate = "2014-10-15"
108     checkTypePromotion(
109       DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
110
111     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
112     val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
113     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
114         enforceCorrectType(ISO8601Time1, TimestampType))
115     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
116         enforceCorrectType(ISO8601Time2, TimestampType))
117
118     val ISO8601Date = "1970-01-01"
119     checkTypePromotion(DateTimeUtils.millisToDays(32400000),
120       enforceCorrectType(ISO8601Date, DateType))
121   }
122
123   test("Get compatible type") {
124     def checkDataType(t1: DataType, t2: DataType, expected: DataType) {
125       var actual = compatibleType(t1, t2, conf.caseSensitiveAnalysis)
126       assert(actual == expected,
127         s"Expected $expected as the most general data type for $t1 and $t2, found $actual")
128       actual = compatibleType(t2, t1, conf.caseSensitiveAnalysis)
129       assert(actual == expected,
130         s"Expected $expected as the most general data type for $t1 and $t2, found $actual")
131     }
132
133     // NullType
134     checkDataType(NullType, BooleanType, BooleanType)
135     checkDataType(NullType, IntegerType, IntegerType)
136     checkDataType(NullType, LongType, LongType)
137     checkDataType(NullType, DoubleType, DoubleType)
138     checkDataType(NullType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT)
139     checkDataType(NullType, StringType, StringType)
140     checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType))
141     checkDataType(NullType, StructType(Nil), StructType(Nil))
142     checkDataType(NullType, NullType, NullType)
143
144     // BooleanType
145     checkDataType(BooleanType, BooleanType, BooleanType)
146     checkDataType(BooleanType, IntegerType, StringType)
147     checkDataType(BooleanType, LongType, StringType)
148     checkDataType(BooleanType, DoubleType, StringType)
149     checkDataType(BooleanType, DecimalType.SYSTEM_DEFAULT, StringType)
150     checkDataType(BooleanType, StringType, StringType)
151     checkDataType(BooleanType, ArrayType(IntegerType), StringType)
152     checkDataType(BooleanType, StructType(Nil), StringType)
153
154     // IntegerType
155     checkDataType(IntegerType, IntegerType, IntegerType)
156     checkDataType(IntegerType, LongType, LongType)
157     checkDataType(IntegerType, DoubleType, DoubleType)
158     checkDataType(IntegerType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT)
159     checkDataType(IntegerType, StringType, StringType)
160     checkDataType(IntegerType, ArrayType(IntegerType), StringType)
161     checkDataType(IntegerType, StructType(Nil), StringType)
162
163     // LongType
164     checkDataType(LongType, LongType, LongType)
165     checkDataType(LongType, DoubleType, DoubleType)
166     checkDataType(LongType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT)
167     checkDataType(LongType, StringType, StringType)
168     checkDataType(LongType, ArrayType(IntegerType), StringType)
169     checkDataType(LongType, StructType(Nil), StringType)
170
171     // DoubleType
172     checkDataType(DoubleType, DoubleType, DoubleType)
173     checkDataType(DoubleType, DecimalType.SYSTEM_DEFAULT, DoubleType)
174     checkDataType(DoubleType, StringType, StringType)
175     checkDataType(DoubleType, ArrayType(IntegerType), StringType)
176     checkDataType(DoubleType, StructType(Nil), StringType)
177
178     // DecimalType
179     checkDataType(DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT,
180       DecimalType.SYSTEM_DEFAULT)
181     checkDataType(DecimalType.SYSTEM_DEFAULT, StringType, StringType)
182     checkDataType(DecimalType.SYSTEM_DEFAULT, ArrayType(IntegerType), StringType)
183     checkDataType(DecimalType.SYSTEM_DEFAULT, StructType(Nil), StringType)
184
185     // StringType
186     checkDataType(StringType, StringType, StringType)
187     checkDataType(StringType, ArrayType(IntegerType), StringType)
188     checkDataType(StringType, StructType(Nil), StringType)
189
190     // ArrayType
191     checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), ArrayType(IntegerType))
192     checkDataType(ArrayType(IntegerType), ArrayType(LongType), ArrayType(LongType))
193     checkDataType(ArrayType(IntegerType), ArrayType(StringType), ArrayType(StringType))
194     checkDataType(ArrayType(IntegerType), StructType(Nil), StringType)
195     checkDataType(
196       ArrayType(IntegerType, true), ArrayType(IntegerType), ArrayType(IntegerType, true))
197     checkDataType(
198       ArrayType(IntegerType, true), ArrayType(IntegerType, false), ArrayType(IntegerType, true))
199     checkDataType(
200       ArrayType(IntegerType, true), ArrayType(IntegerType, true), ArrayType(IntegerType, true))
201     checkDataType(
202       ArrayType(IntegerType, false), ArrayType(IntegerType), ArrayType(IntegerType, true))
203     checkDataType(
204       ArrayType(IntegerType, false), ArrayType(IntegerType, false), ArrayType(IntegerType, false))
205     checkDataType(
206       ArrayType(IntegerType, false), ArrayType(IntegerType, true), ArrayType(IntegerType, true))
207
208     // StructType
209     checkDataType(StructType(Nil), StructType(Nil), StructType(Nil))
210     checkDataType(
211       StructType(StructField("f1", IntegerType, true) :: Nil),
212       StructType(StructField("f1", IntegerType, true) :: Nil),
213       StructType(StructField("f1", IntegerType, true) :: Nil))
214     checkDataType(
215       StructType(StructField("f1", IntegerType, true) :: Nil),
216       StructType(Nil),
217       StructType(StructField("f1", IntegerType, true) :: Nil))
218     checkDataType(
219       StructType(
220         StructField("f1", IntegerType, true) ::
221         StructField("f2", IntegerType, true) :: Nil),
222       StructType(StructField("f1", LongType, true) :: Nil),
223       StructType(
224         StructField("f1", LongType, true) ::
225         StructField("f2", IntegerType, true) :: Nil))
226     checkDataType(
227       StructType(
228         StructField("f1", IntegerType, true) :: Nil),
229       StructType(
230         StructField("f2", IntegerType, true) :: Nil),
231       StructType(
232         StructField("f1", IntegerType, true) ::
233         StructField("f2", IntegerType, true) :: Nil))
234     checkDataType(
235       StructType(
236         StructField("f1", IntegerType, true) :: Nil),
237       DecimalType.SYSTEM_DEFAULT,
238       StringType)
239   }
240
241   test("Complex field and type inferring with null in sampling") {
242     val jsonDF = spark.read.json(jsonNullStruct)
243     val expectedSchema = StructType(
244       StructField("headers", StructType(
245         StructField("Charset", StringType, true) ::
246           StructField("Host", StringType, true) :: Nil)
247         , true) ::
248         StructField("ip", StringType, true) ::
249         StructField("nullstr", StringType, true):: Nil)
250
251     assert(expectedSchema === jsonDF.schema)
252     jsonDF.createOrReplaceTempView("jsonTable")
253
254     checkAnswer(
255       sql("select nullstr, headers.Host from jsonTable"),
256       Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
257     )
258   }
259
260   test("Primitive field and type inferring") {
261     val jsonDF = spark.read.json(primitiveFieldAndType)
262
263     val expectedSchema = StructType(
264       StructField("bigInteger", DecimalType(20, 0), true) ::
265       StructField("boolean", BooleanType, true) ::
266       StructField("double", DoubleType, true) ::
267       StructField("integer", LongType, true) ::
268       StructField("long", LongType, true) ::
269       StructField("null", StringType, true) ::
270       StructField("string", StringType, true) :: Nil)
271
272     assert(expectedSchema === jsonDF.schema)
273
274     jsonDF.createOrReplaceTempView("jsonTable")
275
276     checkAnswer(
277       sql("select * from jsonTable"),
278       Row(new java.math.BigDecimal("92233720368547758070"),
279         true,
280         1.7976931348623157E308,
281         10,
282         21474836470L,
283         null,
284         "this is a simple string.")
285     )
286   }
287
288   test("Complex field and type inferring") {
289     val jsonDF = spark.read.json(complexFieldAndType1)
290
291     val expectedSchema = StructType(
292       StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) ::
293       StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), true), true) ::
294       StructField("arrayOfBigInteger", ArrayType(DecimalType(21, 0), true), true) ::
295       StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) ::
296       StructField("arrayOfDouble", ArrayType(DoubleType, true), true) ::
297       StructField("arrayOfInteger", ArrayType(LongType, true), true) ::
298       StructField("arrayOfLong", ArrayType(LongType, true), true) ::
299       StructField("arrayOfNull", ArrayType(StringType, true), true) ::
300       StructField("arrayOfString", ArrayType(StringType, true), true) ::
301       StructField("arrayOfStruct", ArrayType(
302         StructType(
303           StructField("field1", BooleanType, true) ::
304           StructField("field2", StringType, true) ::
305           StructField("field3", StringType, true) :: Nil), true), true) ::
306       StructField("struct", StructType(
307         StructField("field1", BooleanType, true) ::
308         StructField("field2", DecimalType(20, 0), true) :: Nil), true) ::
309       StructField("structWithArrayFields", StructType(
310         StructField("field1", ArrayType(LongType, true), true) ::
311         StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil)
312
313     assert(expectedSchema === jsonDF.schema)
314
315     jsonDF.createOrReplaceTempView("jsonTable")
316
317     // Access elements of a primitive array.
318     checkAnswer(
319       sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"),
320       Row("str1", "str2", null)
321     )
322
323     // Access an array of null values.
324     checkAnswer(
325       sql("select arrayOfNull from jsonTable"),
326       Row(Seq(null, null, null, null))
327     )
328
329     // Access elements of a BigInteger array (we use DecimalType internally).
330     checkAnswer(
331       sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"),
332       Row(new java.math.BigDecimal("922337203685477580700"),
333         new java.math.BigDecimal("-922337203685477580800"), null)
334     )
335
336     // Access elements of an array of arrays.
337     checkAnswer(
338       sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"),
339       Row(Seq("1", "2", "3"), Seq("str1", "str2"))
340     )
341
342     // Access elements of an array of arrays.
343     checkAnswer(
344       sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"),
345       Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1))
346     )
347
348     // Access elements of an array inside a filed with the type of ArrayType(ArrayType).
349     checkAnswer(
350       sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"),
351       Row("str2", 2.1)
352     )
353
354     // Access elements of an array of structs.
355     checkAnswer(
356       sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " +
357         "from jsonTable"),
358       Row(
359         Row(true, "str1", null),
360         Row(false, null, null),
361         Row(null, null, null),
362         null)
363     )
364
365     // Access a struct and fields inside of it.
366     checkAnswer(
367       sql("select struct, struct.field1, struct.field2 from jsonTable"),
368       Row(
369         Row(true, new java.math.BigDecimal("92233720368547758070")),
370         true,
371         new java.math.BigDecimal("92233720368547758070")) :: Nil
372     )
373
374     // Access an array field of a struct.
375     checkAnswer(
376       sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"),
377       Row(Seq(4, 5, 6), Seq("str1", "str2"))
378     )
379
380     // Access elements of an array field of a struct.
381     checkAnswer(
382       sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"),
383       Row(5, null)
384     )
385   }
386
387   test("GetField operation on complex data type") {
388     val jsonDF = spark.read.json(complexFieldAndType1)
389     jsonDF.createOrReplaceTempView("jsonTable")
390
391     checkAnswer(
392       sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
393       Row(true, "str1")
394     )
395
396     // Getting all values of a specific field from an array of structs.
397     checkAnswer(
398       sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"),
399       Row(Seq(true, false, null), Seq("str1", null, null))
400     )
401   }
402
403   test("Type conflict in primitive field values") {
404     val jsonDF = spark.read.json(primitiveFieldValueTypeConflict)
405
406     val expectedSchema = StructType(
407       StructField("num_bool", StringType, true) ::
408       StructField("num_num_1", LongType, true) ::
409       StructField("num_num_2", DoubleType, true) ::
410       StructField("num_num_3", DoubleType, true) ::
411       StructField("num_str", StringType, true) ::
412       StructField("str_bool", StringType, true) :: Nil)
413
414     assert(expectedSchema === jsonDF.schema)
415
416     jsonDF.createOrReplaceTempView("jsonTable")
417
418     checkAnswer(
419       sql("select * from jsonTable"),
420       Row("true", 11L, null, 1.1, "13.1", "str1") ::
421         Row("12", null, 21474836470.9, null, null, "true") ::
422         Row("false", 21474836470L, 92233720368547758070d, 100, "str1", "false") ::
423         Row(null, 21474836570L, 1.1, 21474836470L, "92233720368547758070", null) :: Nil
424     )
425
426     // Number and Boolean conflict: resolve the type as number in this query.
427     checkAnswer(
428       sql("select num_bool - 10 from jsonTable where num_bool > 11"),
429       Row(2)
430     )
431
432     // Widening to LongType
433     checkAnswer(
434       sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"),
435       Row(21474836370L) :: Row(21474836470L) :: Nil
436     )
437
438     checkAnswer(
439       sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"),
440       Row(-89) :: Row(21474836370L) :: Row(21474836470L) :: Nil
441     )
442
443     // Widening to DecimalType
444     checkAnswer(
445       sql("select num_num_2 + 1.3 from jsonTable where num_num_2 > 1.1"),
446       Row(21474836472.2) ::
447         Row(92233720368547758071.3) :: Nil
448     )
449
450     // Widening to Double
451     checkAnswer(
452       sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"),
453       Row(101.2) :: Row(21474836471.2) :: Nil
454     )
455
456     // Number and String conflict: resolve the type as number in this query.
457     checkAnswer(
458       sql("select num_str + 1.2 from jsonTable where num_str > 14d"),
459       Row(92233720368547758071.2)
460     )
461
462     // Number and String conflict: resolve the type as number in this query.
463     checkAnswer(
464       sql("select num_str + 1.2 from jsonTable where num_str >= 92233720368547758060"),
465       Row(new java.math.BigDecimal("92233720368547758071.2").doubleValue)
466     )
467
468     // String and Boolean conflict: resolve the type as string.
469     checkAnswer(
470       sql("select * from jsonTable where str_bool = 'str1'"),
471       Row("true", 11L, null, 1.1, "13.1", "str1")
472     )
473   }
474
475   ignore("Type conflict in primitive field values (Ignored)") {
476     val jsonDF = spark.read.json(primitiveFieldValueTypeConflict)
477     jsonDF.createOrReplaceTempView("jsonTable")
478
479     // Right now, the analyzer does not promote strings in a boolean expression.
480     // Number and Boolean conflict: resolve the type as boolean in this query.
481     checkAnswer(
482       sql("select num_bool from jsonTable where NOT num_bool"),
483       Row(false)
484     )
485
486     checkAnswer(
487       sql("select str_bool from jsonTable where NOT str_bool"),
488       Row(false)
489     )
490
491     // Right now, the analyzer does not know that num_bool should be treated as a boolean.
492     // Number and Boolean conflict: resolve the type as boolean in this query.
493     checkAnswer(
494       sql("select num_bool from jsonTable where num_bool"),
495       Row(true)
496     )
497
498     checkAnswer(
499       sql("select str_bool from jsonTable where str_bool"),
500       Row(false)
501     )
502
503     // The plan of the following DSL is
504     // Project [(CAST(num_str#65:4, DoubleType) + 1.2) AS num#78]
505     //  Filter (CAST(CAST(num_str#65:4, DoubleType), DecimalType) > 92233720368547758060)
506     //    ExistingRdd [num_bool#61,num_num_1#62L,num_num_2#63,num_num_3#64,num_str#65,str_bool#66]
507     // We should directly cast num_str to DecimalType and also need to do the right type promotion
508     // in the Project.
509     checkAnswer(
510       jsonDF.
511         where('num_str >= BigDecimal("92233720368547758060")).
512         select(('num_str + 1.2).as("num")),
513       Row(new java.math.BigDecimal("92233720368547758071.2").doubleValue())
514     )
515
516     // The following test will fail. The type of num_str is StringType.
517     // So, to evaluate num_str + 1.2, we first need to use Cast to convert the type.
518     // In our test data, one value of num_str is 13.1.
519     // The result of (CAST(num_str#65:4, DoubleType) + 1.2) for this value is 14.299999999999999,
520     // which is not 14.3.
521     // Number and String conflict: resolve the type as number in this query.
522     checkAnswer(
523       sql("select num_str + 1.2 from jsonTable where num_str > 13"),
524       Row(BigDecimal("14.3")) :: Row(BigDecimal("92233720368547758071.2")) :: Nil
525     )
526   }
527
528   test("Type conflict in complex field values") {
529     val jsonDF = spark.read.json(complexFieldValueTypeConflict)
530
531     val expectedSchema = StructType(
532       StructField("array", ArrayType(LongType, true), true) ::
533       StructField("num_struct", StringType, true) ::
534       StructField("str_array", StringType, true) ::
535       StructField("struct", StructType(
536         StructField("field", StringType, true) :: Nil), true) ::
537       StructField("struct_array", StringType, true) :: Nil)
538
539     assert(expectedSchema === jsonDF.schema)
540
541     jsonDF.createOrReplaceTempView("jsonTable")
542
543     checkAnswer(
544       sql("select * from jsonTable"),
545       Row(Seq(), "11", "[1,2,3]", Row(null), "[]") ::
546         Row(null, """{"field":false}""", null, null, "{}") ::
547         Row(Seq(4, 5, 6), null, "str", Row(null), "[7,8,9]") ::
548         Row(Seq(7), "{}", """["str1","str2",33]""", Row("str"), """{"field":true}""") :: Nil
549     )
550   }
551
552   test("Type conflict in array elements") {
553     val jsonDF = spark.read.json(arrayElementTypeConflict)
554
555     val expectedSchema = StructType(
556       StructField("array1", ArrayType(StringType, true), true) ::
557       StructField("array2", ArrayType(StructType(
558         StructField("field", LongType, true) :: Nil), true), true) ::
559       StructField("array3", ArrayType(StringType, true), true) :: Nil)
560
561     assert(expectedSchema === jsonDF.schema)
562
563     jsonDF.createOrReplaceTempView("jsonTable")
564
565     checkAnswer(
566       sql("select * from jsonTable"),
567       Row(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]",
568         """{"field":"str"}"""), Seq(Row(214748364700L), Row(1)), null) ::
569       Row(null, null, Seq("""{"field":"str"}""", """{"field":1}""")) ::
570       Row(null, null, Seq("1", "2", "3")) :: Nil
571     )
572
573     // Treat an element as a number.
574     checkAnswer(
575       sql("select array1[0] + 1 from jsonTable where array1 is not null"),
576       Row(2)
577     )
578   }
579
580   test("Handling missing fields") {
581     val jsonDF = spark.read.json(missingFields)
582
583     val expectedSchema = StructType(
584       StructField("a", BooleanType, true) ::
585       StructField("b", LongType, true) ::
586       StructField("c", ArrayType(LongType, true), true) ::
587       StructField("d", StructType(
588         StructField("field", BooleanType, true) :: Nil), true) ::
589       StructField("e", StringType, true) :: Nil)
590
591     assert(expectedSchema === jsonDF.schema)
592
593     jsonDF.createOrReplaceTempView("jsonTable")
594   }
595
596   test("Loading a JSON dataset from a text file") {
597     val dir = Utils.createTempDir()
598     dir.delete()
599     val path = dir.getCanonicalPath
600     primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
601     val jsonDF = spark.read.json(path)
602
603     val expectedSchema = StructType(
604       StructField("bigInteger", DecimalType(20, 0), true) ::
605       StructField("boolean", BooleanType, true) ::
606       StructField("double", DoubleType, true) ::
607       StructField("integer", LongType, true) ::
608       StructField("long", LongType, true) ::
609       StructField("null", StringType, true) ::
610       StructField("string", StringType, true) :: Nil)
611
612     assert(expectedSchema === jsonDF.schema)
613
614     jsonDF.createOrReplaceTempView("jsonTable")
615
616     checkAnswer(
617       sql("select * from jsonTable"),
618       Row(new java.math.BigDecimal("92233720368547758070"),
619       true,
620       1.7976931348623157E308,
621       10,
622       21474836470L,
623       null,
624       "this is a simple string.")
625     )
626   }
627
628   test("Loading a JSON dataset primitivesAsString returns schema with primitive types as strings") {
629     val dir = Utils.createTempDir()
630     dir.delete()
631     val path = dir.getCanonicalPath
632     primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
633     val jsonDF = spark.read.option("primitivesAsString", "true").json(path)
634
635     val expectedSchema = StructType(
636       StructField("bigInteger", StringType, true) ::
637       StructField("boolean", StringType, true) ::
638       StructField("double", StringType, true) ::
639       StructField("integer", StringType, true) ::
640       StructField("long", StringType, true) ::
641       StructField("null", StringType, true) ::
642       StructField("string", StringType, true) :: Nil)
643
644     assert(expectedSchema === jsonDF.schema)
645
646     jsonDF.createOrReplaceTempView("jsonTable")
647
648     checkAnswer(
649       sql("select * from jsonTable"),
650       Row("92233720368547758070",
651       "true",
652       "1.7976931348623157E308",
653       "10",
654       "21474836470",
655       null,
656       "this is a simple string.")
657     )
658   }
659
660   test("Loading a JSON dataset primitivesAsString returns complex fields as strings") {
661     val jsonDF = spark.read.option("primitivesAsString", "true").json(complexFieldAndType1)
662
663     val expectedSchema = StructType(
664       StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) ::
665       StructField("arrayOfArray2", ArrayType(ArrayType(StringType, true), true), true) ::
666       StructField("arrayOfBigInteger", ArrayType(StringType, true), true) ::
667       StructField("arrayOfBoolean", ArrayType(StringType, true), true) ::
668       StructField("arrayOfDouble", ArrayType(StringType, true), true) ::
669       StructField("arrayOfInteger", ArrayType(StringType, true), true) ::
670       StructField("arrayOfLong", ArrayType(StringType, true), true) ::
671       StructField("arrayOfNull", ArrayType(StringType, true), true) ::
672       StructField("arrayOfString", ArrayType(StringType, true), true) ::
673       StructField("arrayOfStruct", ArrayType(
674         StructType(
675           StructField("field1", StringType, true) ::
676           StructField("field2", StringType, true) ::
677           StructField("field3", StringType, true) :: Nil), true), true) ::
678       StructField("struct", StructType(
679         StructField("field1", StringType, true) ::
680         StructField("field2", StringType, true) :: Nil), true) ::
681       StructField("structWithArrayFields", StructType(
682         StructField("field1", ArrayType(StringType, true), true) ::
683         StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil)
684
685     assert(expectedSchema === jsonDF.schema)
686
687     jsonDF.createOrReplaceTempView("jsonTable")
688
689     // Access elements of a primitive array.
690     checkAnswer(
691       sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"),
692       Row("str1", "str2", null)
693     )
694
695     // Access an array of null values.
696     checkAnswer(
697       sql("select arrayOfNull from jsonTable"),
698       Row(Seq(null, null, null, null))
699     )
700
701     // Access elements of a BigInteger array (we use DecimalType internally).
702     checkAnswer(
703       sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"),
704       Row("922337203685477580700", "-922337203685477580800", null)
705     )
706
707     // Access elements of an array of arrays.
708     checkAnswer(
709       sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"),
710       Row(Seq("1", "2", "3"), Seq("str1", "str2"))
711     )
712
713     // Access elements of an array of arrays.
714     checkAnswer(
715       sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"),
716       Row(Seq("1", "2", "3"), Seq("1.1", "2.1", "3.1"))
717     )
718
719     // Access elements of an array inside a filed with the type of ArrayType(ArrayType).
720     checkAnswer(
721       sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"),
722       Row("str2", "2.1")
723     )
724
725     // Access elements of an array of structs.
726     checkAnswer(
727       sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " +
728         "from jsonTable"),
729       Row(
730         Row("true", "str1", null),
731         Row("false", null, null),
732         Row(null, null, null),
733         null)
734     )
735
736     // Access a struct and fields inside of it.
737     checkAnswer(
738       sql("select struct, struct.field1, struct.field2 from jsonTable"),
739       Row(
740         Row("true", "92233720368547758070"),
741         "true",
742         "92233720368547758070") :: Nil
743     )
744
745     // Access an array field of a struct.
746     checkAnswer(
747       sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"),
748       Row(Seq("4", "5", "6"), Seq("str1", "str2"))
749     )
750
751     // Access elements of an array field of a struct.
752     checkAnswer(
753       sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"),
754       Row("5", null)
755     )
756   }
757
758   test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") {
759     val jsonDF = spark.read.option("prefersDecimal", "true").json(primitiveFieldAndType)
760
761     val expectedSchema = StructType(
762       StructField("bigInteger", DecimalType(20, 0), true) ::
763         StructField("boolean", BooleanType, true) ::
764         StructField("double", DecimalType(17, -292), true) ::
765         StructField("integer", LongType, true) ::
766         StructField("long", LongType, true) ::
767         StructField("null", StringType, true) ::
768         StructField("string", StringType, true) :: Nil)
769
770     assert(expectedSchema === jsonDF.schema)
771
772     jsonDF.createOrReplaceTempView("jsonTable")
773
774     checkAnswer(
775       sql("select * from jsonTable"),
776       Row(BigDecimal("92233720368547758070"),
777         true,
778         BigDecimal("1.7976931348623157E308"),
779         10,
780         21474836470L,
781         null,
782         "this is a simple string.")
783     )
784   }
785
786   test("Find compatible types even if inferred DecimalType is not capable of other IntegralType") {
787     val mixedIntegerAndDoubleRecords = Seq(
788       """{"a": 3, "b": 1.1}""",
789       s"""{"a": 3.1, "b": 0.${"0" * 38}1}""").toDS()
790     val jsonDF = spark.read
791       .option("prefersDecimal", "true")
792       .json(mixedIntegerAndDoubleRecords)
793
794     // The values in `a` field will be decimals as they fit in decimal. For `b` field,
795     // they will be doubles as `1.0E-39D` does not fit.
796     val expectedSchema = StructType(
797       StructField("a", DecimalType(21, 1), true) ::
798       StructField("b", DoubleType, true) :: Nil)
799
800     assert(expectedSchema === jsonDF.schema)
801     checkAnswer(
802       jsonDF,
803       Row(BigDecimal("3"), 1.1D) ::
804       Row(BigDecimal("3.1"), 1.0E-39D) :: Nil
805     )
806   }
807
808   test("Infer big integers correctly even when it does not fit in decimal") {
809     val jsonDF = spark.read
810       .json(bigIntegerRecords)
811
812     // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
813     // it will be a decimal as `92233720368547758070`.
814     val expectedSchema = StructType(
815       StructField("a", DoubleType, true) ::
816       StructField("b", DecimalType(20, 0), true) :: Nil)
817
818     assert(expectedSchema === jsonDF.schema)
819     checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070")))
820   }
821
822   test("Infer floating-point values correctly even when it does not fit in decimal") {
823     val jsonDF = spark.read
824       .option("prefersDecimal", "true")
825       .json(floatingValueRecords)
826
827     // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
828     // it will be a decimal as `0.01` by having a precision equal to the scale.
829     val expectedSchema = StructType(
830       StructField("a", DoubleType, true) ::
831       StructField("b", DecimalType(2, 2), true):: Nil)
832
833     assert(expectedSchema === jsonDF.schema)
834     checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal("0.01")))
835
836     val mergedJsonDF = spark.read
837       .option("prefersDecimal", "true")
838       .json(floatingValueRecords.union(bigIntegerRecords))
839
840     val expectedMergedSchema = StructType(
841       StructField("a", DoubleType, true) ::
842       StructField("b", DecimalType(22, 2), true):: Nil)
843
844     assert(expectedMergedSchema === mergedJsonDF.schema)
845     checkAnswer(
846       mergedJsonDF,
847       Row(1.0E-39D, BigDecimal("0.01")) ::
848       Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
849     )
850   }
851
852   test("Loading a JSON dataset from a text file with SQL") {
853     val dir = Utils.createTempDir()
854     dir.delete()
855     val path = dir.toURI.toString
856     primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
857
858     sql(
859       s"""
860         |CREATE TEMPORARY VIEW jsonTableSQL
861         |USING org.apache.spark.sql.json
862         |OPTIONS (
863         |  path '$path'
864         |)
865       """.stripMargin)
866
867     checkAnswer(
868       sql("select * from jsonTableSQL"),
869       Row(new java.math.BigDecimal("92233720368547758070"),
870         true,
871         1.7976931348623157E308,
872         10,
873         21474836470L,
874         null,
875         "this is a simple string.")
876     )
877   }
878
879   test("Applying schemas") {
880     val dir = Utils.createTempDir()
881     dir.delete()
882     val path = dir.getCanonicalPath
883     primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
884
885     val schema = StructType(
886       StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) ::
887       StructField("boolean", BooleanType, true) ::
888       StructField("double", DoubleType, true) ::
889       StructField("integer", IntegerType, true) ::
890       StructField("long", LongType, true) ::
891       StructField("null", StringType, true) ::
892       StructField("string", StringType, true) :: Nil)
893
894     val jsonDF1 = spark.read.schema(schema).json(path)
895
896     assert(schema === jsonDF1.schema)
897
898     jsonDF1.createOrReplaceTempView("jsonTable1")
899
900     checkAnswer(
901       sql("select * from jsonTable1"),
902       Row(new java.math.BigDecimal("92233720368547758070"),
903       true,
904       1.7976931348623157E308,
905       10,
906       21474836470L,
907       null,
908       "this is a simple string.")
909     )
910
911     val jsonDF2 = spark.read.schema(schema).json(primitiveFieldAndType)
912
913     assert(schema === jsonDF2.schema)
914
915     jsonDF2.createOrReplaceTempView("jsonTable2")
916
917     checkAnswer(
918       sql("select * from jsonTable2"),
919       Row(new java.math.BigDecimal("92233720368547758070"),
920       true,
921       1.7976931348623157E308,
922       10,
923       21474836470L,
924       null,
925       "this is a simple string.")
926     )
927   }
928
929   test("Applying schemas with MapType") {
930     val schemaWithSimpleMap = StructType(
931       StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
932     val jsonWithSimpleMap = spark.read.schema(schemaWithSimpleMap).json(mapType1)
933
934     jsonWithSimpleMap.createOrReplaceTempView("jsonWithSimpleMap")
935
936     checkAnswer(
937       sql("select `map` from jsonWithSimpleMap"),
938       Row(Map("a" -> 1)) ::
939       Row(Map("b" -> 2)) ::
940       Row(Map("c" -> 3)) ::
941       Row(Map("c" -> 1, "d" -> 4)) ::
942       Row(Map("e" -> null)) :: Nil
943     )
944
945     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
946       checkAnswer(
947         sql("select `map`['c'] from jsonWithSimpleMap"),
948         Row(null) ::
949         Row(null) ::
950         Row(3) ::
951         Row(1) ::
952         Row(null) :: Nil
953       )
954     }
955
956     val innerStruct = StructType(
957       StructField("field1", ArrayType(IntegerType, true), true) ::
958       StructField("field2", IntegerType, true) :: Nil)
959     val schemaWithComplexMap = StructType(
960       StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
961
962     val jsonWithComplexMap = spark.read.schema(schemaWithComplexMap).json(mapType2)
963
964     jsonWithComplexMap.createOrReplaceTempView("jsonWithComplexMap")
965
966     checkAnswer(
967       sql("select `map` from jsonWithComplexMap"),
968       Row(Map("a" -> Row(Seq(1, 2, 3, null), null))) ::
969       Row(Map("b" -> Row(null, 2))) ::
970       Row(Map("c" -> Row(Seq(), 4))) ::
971       Row(Map("c" -> Row(null, 3), "d" -> Row(Seq(null), null))) ::
972       Row(Map("e" -> null)) ::
973       Row(Map("f" -> Row(null, null))) :: Nil
974     )
975
976     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
977       checkAnswer(
978         sql("select `map`['a'].field1, `map`['c'].field2 from jsonWithComplexMap"),
979         Row(Seq(1, 2, 3, null), null) ::
980         Row(null, null) ::
981         Row(null, 4) ::
982         Row(null, 3) ::
983         Row(null, null) ::
984         Row(null, null) :: Nil
985       )
986     }
987   }
988
989   test("SPARK-2096 Correctly parse dot notations") {
990     val jsonDF = spark.read.json(complexFieldAndType2)
991     jsonDF.createOrReplaceTempView("jsonTable")
992
993     checkAnswer(
994       sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
995       Row(true, "str1")
996     )
997     checkAnswer(
998       sql(
999         """
1000           |select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1]
1001           |from jsonTable
1002         """.stripMargin),
1003       Row("str2", 6)
1004     )
1005   }
1006
1007   test("SPARK-3390 Complex arrays") {
1008     val jsonDF = spark.read.json(complexFieldAndType2)
1009     jsonDF.createOrReplaceTempView("jsonTable")
1010
1011     checkAnswer(
1012       sql(
1013         """
1014           |select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], arrayOfArray1[1][1][0]
1015           |from jsonTable
1016         """.stripMargin),
1017       Row(5, 7, 8)
1018     )
1019     checkAnswer(
1020       sql(
1021         """
1022           |select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0],
1023           |arrayOfArray2[1][1][1].inner2[0], arrayOfArray2[2][0][0].inner3[0][0].inner4
1024           |from jsonTable
1025         """.stripMargin),
1026       Row("str1", Nil, "str4", 2)
1027     )
1028   }
1029
1030   test("SPARK-3308 Read top level JSON arrays") {
1031     val jsonDF = spark.read.json(jsonArray)
1032     jsonDF.createOrReplaceTempView("jsonTable")
1033
1034     checkAnswer(
1035       sql(
1036         """
1037           |select a, b, c
1038           |from jsonTable
1039         """.stripMargin),
1040       Row("str_a_1", null, null) ::
1041         Row("str_a_2", null, null) ::
1042         Row(null, "str_b_3", null) ::
1043         Row("str_a_4", "str_b_4", "str_c_4") :: Nil
1044     )
1045   }
1046
1047   test("Corrupt records: FAILFAST mode") {
1048     // `FAILFAST` mode should throw an exception for corrupt records.
1049     val exceptionOne = intercept[SparkException] {
1050       spark.read
1051         .option("mode", "FAILFAST")
1052         .json(corruptRecords)
1053     }.getMessage
1054     assert(exceptionOne.contains(
1055       "Malformed records are detected in schema inference. Parse Mode: FAILFAST."))
1056
1057     val exceptionTwo = intercept[SparkException] {
1058       spark.read
1059         .option("mode", "FAILFAST")
1060         .schema("a string")
1061         .json(corruptRecords)
1062         .collect()
1063     }.getMessage
1064     assert(exceptionTwo.contains(
1065       "Malformed records are detected in record parsing. Parse Mode: FAILFAST."))
1066   }
1067
1068   test("Corrupt records: DROPMALFORMED mode") {
1069     val schemaOne = StructType(
1070       StructField("a", StringType, true) ::
1071         StructField("b", StringType, true) ::
1072         StructField("c", StringType, true) :: Nil)
1073     val schemaTwo = StructType(
1074       StructField("a", StringType, true) :: Nil)
1075     // `DROPMALFORMED` mode should skip corrupt records
1076     val jsonDFOne = spark.read
1077       .option("mode", "DROPMALFORMED")
1078       .json(corruptRecords)
1079     checkAnswer(
1080       jsonDFOne,
1081       Row("str_a_4", "str_b_4", "str_c_4") :: Nil
1082     )
1083     assert(jsonDFOne.schema === schemaOne)
1084
1085     val jsonDFTwo = spark.read
1086       .option("mode", "DROPMALFORMED")
1087       .schema(schemaTwo)
1088       .json(corruptRecords)
1089     checkAnswer(
1090       jsonDFTwo,
1091       Row("str_a_4") :: Nil)
1092     assert(jsonDFTwo.schema === schemaTwo)
1093   }
1094
1095   test("SPARK-19641: Additional corrupt records: DROPMALFORMED mode") {
1096     val schema = new StructType().add("dummy", StringType)
1097     // `DROPMALFORMED` mode should skip corrupt records
1098     val jsonDF = spark.read
1099       .option("mode", "DROPMALFORMED")
1100       .json(additionalCorruptRecords)
1101     checkAnswer(
1102       jsonDF,
1103       Row("test"))
1104     assert(jsonDF.schema === schema)
1105   }
1106
1107   test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
1108     val schema = StructType(
1109       StructField("a", StringType, true) ::
1110         StructField("b", StringType, true) ::
1111         StructField("c", StringType, true) :: Nil)
1112
1113     val jsonDF = spark.read.schema(schema).json(corruptRecords)
1114
1115     checkAnswer(
1116       jsonDF.select($"a", $"b", $"c"),
1117       Seq(
1118         // Corrupted records are replaced with null
1119         Row(null, null, null),
1120         Row(null, null, null),
1121         Row(null, null, null),
1122         Row("str_a_4", "str_b_4", "str_c_4"),
1123         Row(null, null, null))
1124     )
1125   }
1126
1127   test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
1128     // Test if we can query corrupt records.
1129     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
1130       val jsonDF = spark.read.json(corruptRecords)
1131       val schema = StructType(
1132         StructField("_unparsed", StringType, true) ::
1133           StructField("a", StringType, true) ::
1134           StructField("b", StringType, true) ::
1135           StructField("c", StringType, true) :: Nil)
1136
1137       assert(schema === jsonDF.schema)
1138
1139       // In HiveContext, backticks should be used to access columns starting with a underscore.
1140       checkAnswer(
1141         jsonDF.select($"a", $"b", $"c", $"_unparsed"),
1142         Row(null, null, null, "{") ::
1143           Row(null, null, null, """{"a":1, b:2}""") ::
1144           Row(null, null, null, """{"a":{, b:3}""") ::
1145           Row("str_a_4", "str_b_4", "str_c_4", null) ::
1146           Row(null, null, null, "]") :: Nil
1147       )
1148
1149       checkAnswer(
1150         jsonDF.filter($"_unparsed".isNull).select($"a", $"b", $"c"),
1151         Row("str_a_4", "str_b_4", "str_c_4")
1152       )
1153
1154       checkAnswer(
1155         jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
1156         Row("{") ::
1157           Row("""{"a":1, b:2}""") ::
1158           Row("""{"a":{, b:3}""") ::
1159           Row("]") :: Nil
1160       )
1161     }
1162   }
1163
1164   test("SPARK-13953 Rename the corrupt record field via option") {
1165     val jsonDF = spark.read
1166       .option("columnNameOfCorruptRecord", "_malformed")
1167       .json(corruptRecords)
1168     val schema = StructType(
1169       StructField("_malformed", StringType, true) ::
1170         StructField("a", StringType, true) ::
1171         StructField("b", StringType, true) ::
1172         StructField("c", StringType, true) :: Nil)
1173
1174     assert(schema === jsonDF.schema)
1175     checkAnswer(
1176       jsonDF.selectExpr("a", "b", "c", "_malformed"),
1177       Row(null, null, null, "{") ::
1178         Row(null, null, null, """{"a":1, b:2}""") ::
1179         Row(null, null, null, """{"a":{, b:3}""") ::
1180         Row("str_a_4", "str_b_4", "str_c_4", null) ::
1181         Row(null, null, null, "]") :: Nil
1182     )
1183   }
1184
1185   test("SPARK-4068: nulls in arrays") {
1186     val jsonDF = spark.read.json(nullsInArrays)
1187     jsonDF.createOrReplaceTempView("jsonTable")
1188
1189     val schema = StructType(
1190       StructField("field1",
1191         ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), true), true), true) ::
1192       StructField("field2",
1193         ArrayType(ArrayType(
1194           StructType(StructField("Test", LongType, true) :: Nil), true), true), true) ::
1195       StructField("field3",
1196         ArrayType(ArrayType(
1197           StructType(StructField("Test", StringType, true) :: Nil), true), true), true) ::
1198       StructField("field4",
1199         ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: Nil)
1200
1201     assert(schema === jsonDF.schema)
1202
1203     checkAnswer(
1204       sql(
1205         """
1206           |SELECT field1, field2, field3, field4
1207           |FROM jsonTable
1208         """.stripMargin),
1209       Row(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) ::
1210         Row(null, Seq(null, Seq(Row(1))), null, null) ::
1211         Row(null, null, Seq(Seq(null), Seq(Row("2"))), null) ::
1212         Row(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil
1213     )
1214   }
1215
1216   test("SPARK-4228 DataFrame to JSON") {
1217     val schema1 = StructType(
1218       StructField("f1", IntegerType, false) ::
1219       StructField("f2", StringType, false) ::
1220       StructField("f3", BooleanType, false) ::
1221       StructField("f4", ArrayType(StringType), nullable = true) ::
1222       StructField("f5", IntegerType, true) :: Nil)
1223
1224     val rowRDD1 = unparsedStrings.map { r =>
1225       val values = r.split(",").map(_.trim)
1226       val v5 = try values(3).toInt catch {
1227         case _: NumberFormatException => null
1228       }
1229       Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5)
1230     }
1231
1232     val df1 = spark.createDataFrame(rowRDD1, schema1)
1233     df1.createOrReplaceTempView("applySchema1")
1234     val df2 = df1.toDF
1235     val result = df2.toJSON.collect()
1236     // scalastyle:off
1237     assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
1238     assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
1239     // scalastyle:on
1240
1241     val schema2 = StructType(
1242       StructField("f1", StructType(
1243         StructField("f11", IntegerType, false) ::
1244         StructField("f12", BooleanType, false) :: Nil), false) ::
1245       StructField("f2", MapType(StringType, IntegerType, true), false) :: Nil)
1246
1247     val rowRDD2 = unparsedStrings.map { r =>
1248       val values = r.split(",").map(_.trim)
1249       val v4 = try values(3).toInt catch {
1250         case _: NumberFormatException => null
1251       }
1252       Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4))
1253     }
1254
1255     val df3 = spark.createDataFrame(rowRDD2, schema2)
1256     df3.createOrReplaceTempView("applySchema2")
1257     val df4 = df3.toDF
1258     val result2 = df4.toJSON.collect()
1259
1260     assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
1261     assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
1262
1263     val jsonDF = spark.read.json(primitiveFieldAndType)
1264     val primTable = spark.read.json(jsonDF.toJSON)
1265     primTable.createOrReplaceTempView("primitiveTable")
1266     checkAnswer(
1267         sql("select * from primitiveTable"),
1268       Row(new java.math.BigDecimal("92233720368547758070"),
1269         true,
1270         1.7976931348623157E308,
1271         10,
1272         21474836470L,
1273         "this is a simple string.")
1274       )
1275
1276     val complexJsonDF = spark.read.json(complexFieldAndType1)
1277     val compTable = spark.read.json(complexJsonDF.toJSON)
1278     compTable.createOrReplaceTempView("complexTable")
1279     // Access elements of a primitive array.
1280     checkAnswer(
1281       sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from complexTable"),
1282       Row("str1", "str2", null)
1283     )
1284
1285     // Access an array of null values.
1286     checkAnswer(
1287       sql("select arrayOfNull from complexTable"),
1288       Row(Seq(null, null, null, null))
1289     )
1290
1291     // Access elements of a BigInteger array (we use DecimalType internally).
1292     checkAnswer(
1293       sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] " +
1294         " from complexTable"),
1295       Row(new java.math.BigDecimal("922337203685477580700"),
1296         new java.math.BigDecimal("-922337203685477580800"), null)
1297     )
1298
1299     // Access elements of an array of arrays.
1300     checkAnswer(
1301       sql("select arrayOfArray1[0], arrayOfArray1[1] from complexTable"),
1302       Row(Seq("1", "2", "3"), Seq("str1", "str2"))
1303     )
1304
1305     // Access elements of an array of arrays.
1306     checkAnswer(
1307       sql("select arrayOfArray2[0], arrayOfArray2[1] from complexTable"),
1308       Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1))
1309     )
1310
1311     // Access elements of an array inside a filed with the type of ArrayType(ArrayType).
1312     checkAnswer(
1313       sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from complexTable"),
1314       Row("str2", 2.1)
1315     )
1316
1317     // Access a struct and fields inside of it.
1318     checkAnswer(
1319       sql("select struct, struct.field1, struct.field2 from complexTable"),
1320       Row(
1321         Row(true, new java.math.BigDecimal("92233720368547758070")),
1322         true,
1323         new java.math.BigDecimal("92233720368547758070")) :: Nil
1324     )
1325
1326     // Access an array field of a struct.
1327     checkAnswer(
1328       sql("select structWithArrayFields.field1, structWithArrayFields.field2 from complexTable"),
1329       Row(Seq(4, 5, 6), Seq("str1", "str2"))
1330     )
1331
1332     // Access elements of an array field of a struct.
1333     checkAnswer(
1334       sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] " +
1335         "from complexTable"),
1336       Row(5, null)
1337     )
1338   }
1339
1340   test("Dataset toJSON doesn't construct rdd") {
1341     val containsRDD = spark.emptyDataFrame.toJSON.queryExecution.logical.find {
1342       case ExternalRDD(_, _) => true
1343       case _ => false
1344     }
1345
1346     assert(containsRDD.isEmpty, "Expected logical plan of toJSON to not contain an RDD")
1347   }
1348
1349   test("JSONRelation equality test") {
1350     withTempPath(dir => {
1351       val path = dir.getCanonicalFile.toURI.toString
1352       sparkContext.parallelize(1 to 100)
1353         .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
1354
1355       val d1 = DataSource(
1356         spark,
1357         userSpecifiedSchema = None,
1358         partitionColumns = Array.empty[String],
1359         bucketSpec = None,
1360         className = classOf[JsonFileFormat].getCanonicalName,
1361         options = Map("path" -> path)).resolveRelation()
1362
1363       val d2 = DataSource(
1364         spark,
1365         userSpecifiedSchema = None,
1366         partitionColumns = Array.empty[String],
1367         bucketSpec = None,
1368         className = classOf[JsonFileFormat].getCanonicalName,
1369         options = Map("path" -> path)).resolveRelation()
1370       assert(d1 === d2)
1371     })
1372   }
1373
1374   test("SPARK-6245 JsonInferSchema.infer on empty RDD") {
1375     // This is really a test that it doesn't throw an exception
1376     val emptySchema = JsonInferSchema.infer(
1377       empty.rdd,
1378       new JSONOptions(Map.empty[String, String], "GMT"),
1379       CreateJacksonParser.string)
1380     assert(StructType(Seq()) === emptySchema)
1381   }
1382
1383   test("SPARK-7565 MapType in JsonRDD") {
1384     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
1385       withTempDir { dir =>
1386         val schemaWithSimpleMap = StructType(
1387           StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
1388         val df = spark.read.schema(schemaWithSimpleMap).json(mapType1)
1389
1390         val path = dir.getAbsolutePath
1391         df.write.mode("overwrite").parquet(path)
1392         // order of MapType is not defined
1393         assert(spark.read.parquet(path).count() == 5)
1394
1395         val df2 = spark.read.json(corruptRecords)
1396         df2.write.mode("overwrite").parquet(path)
1397         checkAnswer(spark.read.parquet(path), df2.collect())
1398       }
1399     }
1400   }
1401
1402   test("SPARK-8093 Erase empty structs") {
1403     val emptySchema = JsonInferSchema.infer(
1404       emptyRecords.rdd,
1405       new JSONOptions(Map.empty[String, String], "GMT"),
1406       CreateJacksonParser.string)
1407     assert(StructType(Seq()) === emptySchema)
1408   }
1409
1410   test("JSON with Partition") {
1411     def makePartition(rdd: RDD[String], parent: File, partName: String, partValue: Any): File = {
1412       val p = new File(parent, s"$partName=${partValue.toString}")
1413       rdd.saveAsTextFile(p.getCanonicalPath)
1414       p
1415     }
1416
1417     withTempPath(root => {
1418       val d1 = new File(root, "d1=1")
1419       // root/dt=1/col1=abc
1420       val p1_col1 = makePartition(
1421         sparkContext.parallelize(2 to 5).map(i => s"""{"a": 1, "b": "str$i"}"""),
1422         d1,
1423         "col1",
1424         "abc")
1425
1426       // root/dt=1/col1=abd
1427       val p2 = makePartition(
1428         sparkContext.parallelize(6 to 10).map(i => s"""{"a": 1, "b": "str$i"}"""),
1429         d1,
1430         "col1",
1431         "abd")
1432
1433         spark.read.json(root.getAbsolutePath).createOrReplaceTempView("test_myjson_with_part")
1434         checkAnswer(sql(
1435           "SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abc'"), Row(4))
1436         checkAnswer(sql(
1437           "SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abd'"), Row(5))
1438         checkAnswer(sql(
1439           "SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
1440     })
1441   }
1442
1443   test("backward compatibility") {
1444     // This test we make sure our JSON support can read JSON data generated by previous version
1445     // of Spark generated through toJSON method and JSON data source.
1446     // The data is generated by the following program.
1447     // Here are a few notes:
1448     //  - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
1449     //      in the JSON object.
1450     //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
1451     //      JSON objects generated by those Spark versions (col17).
1452     //  - If the type is NullType, we do not write data out.
1453
1454     // Create the schema.
1455     val struct =
1456       StructType(
1457         StructField("f1", FloatType, true) ::
1458           StructField("f2", ArrayType(BooleanType), true) :: Nil)
1459
1460     val dataTypes =
1461       Seq(
1462         StringType, BinaryType, NullType, BooleanType,
1463         ByteType, ShortType, IntegerType, LongType,
1464         FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
1465         DateType, TimestampType,
1466         ArrayType(IntegerType), MapType(StringType, LongType), struct,
1467         new UDT.MyDenseVectorUDT())
1468     val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
1469       StructField(s"col$index", dataType, nullable = true)
1470     }
1471     val schema = StructType(fields)
1472
1473     val constantValues =
1474       Seq(
1475         "a string in binary".getBytes(StandardCharsets.UTF_8),
1476         null,
1477         true,
1478         1.toByte,
1479         2.toShort,
1480         3,
1481         Long.MaxValue,
1482         0.25.toFloat,
1483         0.75,
1484         new java.math.BigDecimal(s"1234.23456"),
1485         new java.math.BigDecimal(s"1.23456"),
1486         java.sql.Date.valueOf("2015-01-01"),
1487         java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
1488         Seq(2, 3, 4),
1489         Map("a string" -> 2000L),
1490         Row(4.75.toFloat, Seq(false, true)),
1491         new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
1492     val data =
1493       Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
1494
1495     // Data generated by previous versions.
1496     // scalastyle:off
1497     val existingJSONData =
1498       """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1499       """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1500       """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1501       """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1502       """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1503       """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1504       """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
1505     // scalastyle:on
1506
1507     // Generate data for the current version.
1508     val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
1509     withTempPath { path =>
1510       df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
1511
1512       // df.toJSON will convert internal rows to external rows first and then generate
1513       // JSON objects. While, df.write.format("json") will write internal rows directly.
1514       val allJSON =
1515         existingJSONData ++
1516           df.toJSON.collect() ++
1517           sparkContext.textFile(path.getCanonicalPath).collect()
1518
1519       Utils.deleteRecursively(path)
1520       sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
1521
1522       // Read data back with the schema specified.
1523       val col0Values =
1524         Seq(
1525           "Spark 1.2.2",
1526           "Spark 1.3.1",
1527           "Spark 1.3.1",
1528           "Spark 1.4.1",
1529           "Spark 1.4.1",
1530           "Spark 1.5.0",
1531           "Spark 1.5.0",
1532           "Spark " + spark.sparkContext.version,
1533           "Spark " + spark.sparkContext.version)
1534       val expectedResult = col0Values.map { v =>
1535         Row.fromSeq(Seq(v) ++ constantValues)
1536       }
1537       checkAnswer(
1538         spark.read.format("json").schema(schema).load(path.getCanonicalPath),
1539         expectedResult
1540       )
1541     }
1542   }
1543
1544   test("SPARK-11544 test pathfilter") {
1545     withTempPath { dir =>
1546       val path = dir.getCanonicalPath
1547
1548       val df = spark.range(2)
1549       df.write.json(path + "/p=1")
1550       df.write.json(path + "/p=2")
1551       assert(spark.read.json(path).count() === 4)
1552
1553       val extraOptions = Map(
1554         "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
1555         "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
1556       )
1557       assert(spark.read.options(extraOptions).json(path).count() === 2)
1558     }
1559   }
1560
1561   test("SPARK-12057 additional corrupt records do not throw exceptions") {
1562     // Test if we can query corrupt records.
1563     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
1564       withTempView("jsonTable") {
1565         val schema = StructType(
1566           StructField("_unparsed", StringType, true) ::
1567             StructField("dummy", StringType, true) :: Nil)
1568
1569         {
1570           // We need to make sure we can infer the schema.
1571           val jsonDF = spark.read.json(additionalCorruptRecords)
1572           assert(jsonDF.schema === schema)
1573         }
1574
1575         {
1576           val jsonDF = spark.read.schema(schema).json(additionalCorruptRecords)
1577           jsonDF.createOrReplaceTempView("jsonTable")
1578
1579           // In HiveContext, backticks should be used to access columns starting with a underscore.
1580           checkAnswer(
1581             sql(
1582               """
1583                 |SELECT dummy, _unparsed
1584                 |FROM jsonTable
1585               """.stripMargin),
1586             Row("test", null) ::
1587               Row(null, """[1,2,3]""") ::
1588               Row(null, """":"test", "a":1}""") ::
1589               Row(null, """42""") ::
1590               Row(null, """     ","ian":"test"}""") :: Nil
1591           )
1592         }
1593       }
1594     }
1595   }
1596
1597   test("Parse JSON rows having an array type and a struct type in the same field.") {
1598     withTempDir { dir =>
1599       val dir = Utils.createTempDir()
1600       dir.delete()
1601       val path = dir.getCanonicalPath
1602       arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).write.text(path)
1603
1604       val schema =
1605         StructType(
1606           StructField("a", StructType(
1607             StructField("b", StringType) :: Nil
1608           )) :: Nil)
1609       val jsonDF = spark.read.schema(schema).json(path)
1610       assert(jsonDF.count() == 2)
1611     }
1612   }
1613
1614   test("SPARK-12872 Support to specify the option for compression codec") {
1615     withTempDir { dir =>
1616       val dir = Utils.createTempDir()
1617       dir.delete()
1618       val path = dir.getCanonicalPath
1619       primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
1620
1621       val jsonDF = spark.read.json(path)
1622       val jsonDir = new File(dir, "json").getCanonicalPath
1623       jsonDF.coalesce(1).write
1624         .format("json")
1625         .option("compression", "gZiP")
1626         .save(jsonDir)
1627
1628       val compressedFiles = new File(jsonDir).listFiles()
1629       assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
1630
1631       val jsonCopy = spark.read
1632         .format("json")
1633         .load(jsonDir)
1634
1635       assert(jsonCopy.count == jsonDF.count)
1636       val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
1637       val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
1638       checkAnswer(jsonCopySome, jsonDFSome)
1639     }
1640   }
1641
1642   test("SPARK-13543 Write the output as uncompressed via option()") {
1643     val extraOptions = Map[String, String](
1644       "mapreduce.output.fileoutputformat.compress" -> "true",
1645       "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString,
1646       "mapreduce.output.fileoutputformat.compress.codec" -> classOf[GzipCodec].getName,
1647       "mapreduce.map.output.compress" -> "true",
1648       "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName
1649     )
1650     withTempDir { dir =>
1651       val dir = Utils.createTempDir()
1652       dir.delete()
1653
1654       val path = dir.getCanonicalPath
1655       primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
1656
1657       val jsonDF = spark.read.json(path)
1658       val jsonDir = new File(dir, "json").getCanonicalPath
1659       jsonDF.coalesce(1).write
1660         .format("json")
1661         .option("compression", "none")
1662         .options(extraOptions)
1663         .save(jsonDir)
1664
1665       val compressedFiles = new File(jsonDir).listFiles()
1666       assert(compressedFiles.exists(!_.getName.endsWith(".json.gz")))
1667
1668       val jsonCopy = spark.read
1669         .format("json")
1670         .options(extraOptions)
1671         .load(jsonDir)
1672
1673       assert(jsonCopy.count == jsonDF.count)
1674       val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
1675       val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
1676       checkAnswer(jsonCopySome, jsonDFSome)
1677     }
1678   }
1679
1680   test("Casting long as timestamp") {
1681     withTempView("jsonTable") {
1682       val schema = (new StructType).add("ts", TimestampType)
1683       val jsonDF = spark.read.schema(schema).json(timestampAsLong)
1684
1685       jsonDF.createOrReplaceTempView("jsonTable")
1686
1687       checkAnswer(
1688         sql("select ts from jsonTable"),
1689         Row(java.sql.Timestamp.valueOf("2016-01-02 03:04:05"))
1690       )
1691     }
1692   }
1693
1694   test("wide nested json table") {
1695     val nested = (1 to 100).map { i =>
1696       s"""
1697          |"c$i": $i
1698        """.stripMargin
1699     }.mkString(", ")
1700     val json = s"""
1701        |{"a": [{$nested}], "b": [{$nested}]}
1702      """.stripMargin
1703     val df = spark.read.json(Seq(json).toDS())
1704     assert(df.schema.size === 2)
1705     df.collect()
1706   }
1707
1708   test("Write dates correctly with dateFormat option") {
1709     val customSchema = new StructType(Array(StructField("date", DateType, true)))
1710     withTempDir { dir =>
1711       // With dateFormat option.
1712       val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
1713       val datesWithFormat = spark.read
1714         .schema(customSchema)
1715         .option("dateFormat", "dd/MM/yyyy HH:mm")
1716         .json(datesRecords)
1717
1718       datesWithFormat.write
1719         .format("json")
1720         .option("dateFormat", "yyyy/MM/dd")
1721         .save(datesWithFormatPath)
1722
1723       // This will load back the dates as string.
1724       val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
1725       val stringDatesWithFormat = spark.read
1726         .schema(stringSchema)
1727         .json(datesWithFormatPath)
1728       val expectedStringDatesWithFormat = Seq(
1729         Row("2015/08/26"),
1730         Row("2014/10/27"),
1731         Row("2016/01/28"))
1732
1733       checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
1734     }
1735   }
1736
1737   test("Write timestamps correctly with timestampFormat option") {
1738     val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
1739     withTempDir { dir =>
1740       // With dateFormat option.
1741       val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
1742       val timestampsWithFormat = spark.read
1743         .schema(customSchema)
1744         .option("timestampFormat", "dd/MM/yyyy HH:mm")
1745         .json(datesRecords)
1746       timestampsWithFormat.write
1747         .format("json")
1748         .option("timestampFormat", "yyyy/MM/dd HH:mm")
1749         .save(timestampsWithFormatPath)
1750
1751       // This will load back the timestamps as string.
1752       val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
1753       val stringTimestampsWithFormat = spark.read
1754         .schema(stringSchema)
1755         .json(timestampsWithFormatPath)
1756       val expectedStringDatesWithFormat = Seq(
1757         Row("2015/08/26 18:00"),
1758         Row("2014/10/27 18:30"),
1759         Row("2016/01/28 20:00"))
1760
1761       checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
1762     }
1763   }
1764
1765   test("Write timestamps correctly with timestampFormat option and timeZone option") {
1766     val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
1767     withTempDir { dir =>
1768       // With dateFormat option and timeZone option.
1769       val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
1770       val timestampsWithFormat = spark.read
1771         .schema(customSchema)
1772         .option("timestampFormat", "dd/MM/yyyy HH:mm")
1773         .json(datesRecords)
1774       timestampsWithFormat.write
1775         .format("json")
1776         .option("timestampFormat", "yyyy/MM/dd HH:mm")
1777         .option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
1778         .save(timestampsWithFormatPath)
1779
1780       // This will load back the timestamps as string.
1781       val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
1782       val stringTimestampsWithFormat = spark.read
1783         .schema(stringSchema)
1784         .json(timestampsWithFormatPath)
1785       val expectedStringDatesWithFormat = Seq(
1786         Row("2015/08/27 01:00"),
1787         Row("2014/10/28 01:30"),
1788         Row("2016/01/29 04:00"))
1789
1790       checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
1791
1792       val readBack = spark.read
1793         .schema(customSchema)
1794         .option("timestampFormat", "yyyy/MM/dd HH:mm")
1795         .option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
1796         .json(timestampsWithFormatPath)
1797
1798       checkAnswer(readBack, timestampsWithFormat)
1799     }
1800   }
1801
1802   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
1803     val records = Seq("""{"a": 3, "b": 1.1}""", """{"a": 3.1, "b": 0.000001}""").toDS()
1804
1805     val schema = StructType(
1806       StructField("a", DecimalType(21, 1), true) ::
1807       StructField("b", DecimalType(7, 6), true) :: Nil)
1808
1809     val df1 = spark.read.option("prefersDecimal", "true").json(records)
1810     assert(df1.schema == schema)
1811     val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
1812     assert(df2.schema == schema)
1813   }
1814
1815   test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
1816     withTempPath { dir =>
1817       val path = dir.getCanonicalPath
1818       primitiveFieldAndType
1819         .toDF("value")
1820         .write
1821         .option("compression", "GzIp")
1822         .text(path)
1823
1824       assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
1825
1826       val jsonDF = spark.read.option("multiLine", true).json(path)
1827       val jsonDir = new File(dir, "json").getCanonicalPath
1828       jsonDF.coalesce(1).write
1829         .option("compression", "gZiP")
1830         .json(jsonDir)
1831
1832       assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
1833
1834       val originalData = spark.read.json(primitiveFieldAndType)
1835       checkAnswer(jsonDF, originalData)
1836       checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData)
1837     }
1838   }
1839
1840   test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
1841     withTempPath { dir =>
1842       val path = dir.getCanonicalPath
1843       primitiveFieldAndType
1844         .toDF("value")
1845         .write
1846         .text(path)
1847
1848       val jsonDF = spark.read.option("multiLine", true).json(path)
1849       val jsonDir = new File(dir, "json").getCanonicalPath
1850       jsonDF.coalesce(1).write.json(jsonDir)
1851
1852       val compressedFiles = new File(jsonDir).listFiles()
1853       assert(compressedFiles.exists(_.getName.endsWith(".json")))
1854
1855       val originalData = spark.read.json(primitiveFieldAndType)
1856       checkAnswer(jsonDF, originalData)
1857       checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData)
1858     }
1859   }
1860
1861   test("SPARK-18352: Expect one JSON document per file") {
1862     // the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token.
1863     // this might not be the optimal behavior but this test verifies that only the first value
1864     // is parsed and the rest are discarded.
1865
1866     // alternatively the parser could continue parsing following objects, which may further reduce
1867     // allocations by skipping the line reader entirely
1868
1869     withTempPath { dir =>
1870       val path = dir.getCanonicalPath
1871       spark
1872         .createDataFrame(Seq(Tuple1("{}{invalid}")))
1873         .coalesce(1)
1874         .write
1875         .text(path)
1876
1877       val jsonDF = spark.read.option("multiLine", true).json(path)
1878       // no corrupt record column should be created
1879       assert(jsonDF.schema === StructType(Seq()))
1880       // only the first object should be read
1881       assert(jsonDF.count() === 1)
1882     }
1883   }
1884
1885   test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
1886     withTempPath { dir =>
1887       val path = dir.getCanonicalPath
1888       val corruptRecordCount = additionalCorruptRecords.count().toInt
1889       assert(corruptRecordCount === 5)
1890
1891       additionalCorruptRecords
1892         .toDF("value")
1893         // this is the minimum partition count that avoids hash collisions
1894         .repartition(corruptRecordCount * 4, F.hash($"value"))
1895         .write
1896         .text(path)
1897
1898       val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
1899       assert(jsonDF.count() === corruptRecordCount)
1900       assert(jsonDF.schema === new StructType()
1901         .add("_corrupt_record", StringType)
1902         .add("dummy", StringType))
1903       val counts = jsonDF
1904         .join(
1905           additionalCorruptRecords.toDF("value"),
1906           F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"),
1907           "outer")
1908         .agg(
1909           F.count($"dummy").as("valid"),
1910           F.count($"_corrupt_record").as("corrupt"),
1911           F.count("*").as("count"))
1912       checkAnswer(counts, Row(1, 4, 6))
1913     }
1914   }
1915
1916   test("SPARK-19641: Handle multi-line corrupt documents (DROPMALFORMED)") {
1917     withTempPath { dir =>
1918       val path = dir.getCanonicalPath
1919       val corruptRecordCount = additionalCorruptRecords.count().toInt
1920       assert(corruptRecordCount === 5)
1921
1922       additionalCorruptRecords
1923         .toDF("value")
1924         // this is the minimum partition count that avoids hash collisions
1925         .repartition(corruptRecordCount * 4, F.hash($"value"))
1926         .write
1927         .text(path)
1928
1929       val jsonDF = spark.read.option("multiLine", true).option("mode", "DROPMALFORMED").json(path)
1930       checkAnswer(jsonDF, Seq(Row("test")))
1931     }
1932   }
1933
1934   test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") {
1935     withTempPath { dir =>
1936       val path = dir.getCanonicalPath
1937       val corruptRecordCount = additionalCorruptRecords.count().toInt
1938       assert(corruptRecordCount === 5)
1939
1940       additionalCorruptRecords
1941         .toDF("value")
1942         // this is the minimum partition count that avoids hash collisions
1943         .repartition(corruptRecordCount * 4, F.hash($"value"))
1944         .write
1945         .text(path)
1946
1947       val schema = new StructType().add("dummy", StringType)
1948
1949       // `FAILFAST` mode should throw an exception for corrupt records.
1950       val exceptionOne = intercept[SparkException] {
1951         spark.read
1952           .option("multiLine", true)
1953           .option("mode", "FAILFAST")
1954           .json(path)
1955       }
1956       assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " +
1957         "inference. Parse Mode: FAILFAST."))
1958
1959       val exceptionTwo = intercept[SparkException] {
1960         spark.read
1961           .option("multiLine", true)
1962           .option("mode", "FAILFAST")
1963           .schema(schema)
1964           .json(path)
1965           .collect()
1966       }
1967       assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " +
1968         "parsing. Parse Mode: FAILFAST."))
1969     }
1970   }
1971
1972   test("Throw an exception if a `columnNameOfCorruptRecord` field violates requirements") {
1973     val columnNameOfCorruptRecord = "_unparsed"
1974     val schema = StructType(
1975       StructField(columnNameOfCorruptRecord, IntegerType, true) ::
1976         StructField("a", StringType, true) ::
1977         StructField("b", StringType, true) ::
1978         StructField("c", StringType, true) :: Nil)
1979     val errMsg = intercept[AnalysisException] {
1980       spark.read
1981         .option("mode", "Permissive")
1982         .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
1983         .schema(schema)
1984         .json(corruptRecords)
1985     }.getMessage
1986     assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
1987
1988     // We use `PERMISSIVE` mode by default if invalid string is given.
1989     withTempPath { dir =>
1990       val path = dir.getCanonicalPath
1991       corruptRecords.toDF("value").write.text(path)
1992       val errMsg = intercept[AnalysisException] {
1993         spark.read
1994           .option("mode", "permm")
1995           .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
1996           .schema(schema)
1997           .json(path)
1998           .collect
1999       }.getMessage
2000       assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
2001     }
2002   }
2003
2004   test("SPARK-18772: Parse special floats correctly") {
2005     val jsons = Seq(
2006       """{"a": "NaN"}""",
2007       """{"a": "Infinity"}""",
2008       """{"a": "-Infinity"}""")
2009
2010     // positive cases
2011     val checks: Seq[Double => Boolean] = Seq(
2012       _.isNaN,
2013       _.isPosInfinity,
2014       _.isNegInfinity)
2015
2016     Seq(FloatType, DoubleType).foreach { dt =>
2017       jsons.zip(checks).foreach { case (json, check) =>
2018         val ds = spark.read
2019           .schema(StructType(Seq(StructField("a", dt))))
2020           .json(Seq(json).toDS())
2021           .select($"a".cast(DoubleType)).as[Double]
2022         assert(check(ds.first()))
2023       }
2024     }
2025
2026     // negative cases
2027     Seq(FloatType, DoubleType).foreach { dt =>
2028       val lowerCasedJsons = jsons.map(_.toLowerCase(Locale.ROOT))
2029       // The special floats are case-sensitive so these cases below throw exceptions.
2030       lowerCasedJsons.foreach { lowerCasedJson =>
2031         val e = intercept[SparkException] {
2032           spark.read
2033             .option("mode", "FAILFAST")
2034             .schema(StructType(Seq(StructField("a", dt))))
2035             .json(Seq(lowerCasedJson).toDS())
2036             .collect()
2037         }
2038         assert(e.getMessage.contains("Cannot parse"))
2039       }
2040     }
2041   }
2042
2043   test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " +
2044     "from a file") {
2045     withTempPath { dir =>
2046       val path = dir.getCanonicalPath
2047       val data =
2048         """{"field": 1}
2049           |{"field": 2}
2050           |{"field": "3"}""".stripMargin
2051       Seq(data).toDF().repartition(1).write.text(path)
2052       val schema = new StructType().add("field", ByteType).add("_corrupt_record", StringType)
2053       // negative cases
2054       val msg = intercept[AnalysisException] {
2055         spark.read.schema(schema).json(path).select("_corrupt_record").collect()
2056       }.getMessage
2057       assert(msg.contains("only include the internal corrupt record column"))
2058       intercept[catalyst.errors.TreeNodeException[_]] {
2059         spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
2060       }
2061       // workaround
2062       val df = spark.read.schema(schema).json(path).cache()
2063       assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
2064       assert(df.filter($"_corrupt_record".isNull).count() == 2)
2065       checkAnswer(
2066         df.select("_corrupt_record"),
2067         Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil
2068       )
2069     }
2070   }
2071
2072   def testLineSeparator(lineSep: String): Unit = {
2073     test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
2074       // Read
2075       val data =
2076         s"""
2077           |  {"f":
2078           |"a", "f0": 1}$lineSep{"f":
2079           |
2080           |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
2081         """.stripMargin
2082       val dataWithTrailingLineSep = s"$data$lineSep"
2083
2084       Seq(data, dataWithTrailingLineSep).foreach { lines =>
2085         withTempPath { path =>
2086           Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
2087           val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath)
2088           val expectedSchema =
2089             StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil)
2090           checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
2091           assert(df.schema === expectedSchema)
2092         }
2093       }
2094
2095       // Write
2096       withTempPath { path =>
2097         Seq("a", "b", "c").toDF("value").coalesce(1)
2098           .write.option("lineSep", lineSep).json(path.getAbsolutePath)
2099         val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
2100         val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
2101         assert(
2102           readBack === s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""")
2103       }
2104
2105       // Roundtrip
2106       withTempPath { path =>
2107         val df = Seq("a", "b", "c").toDF()
2108         df.write.option("lineSep", lineSep).json(path.getAbsolutePath)
2109         val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath)
2110         checkAnswer(df, readBack)
2111       }
2112     }
2113   }
2114
2115   // scalastyle:off nonascii
2116   Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep =>
2117     testLineSeparator(lineSep)
2118   }
2119   // scalastyle:on nonascii
2120
2121   test("""SPARK-21289: Support line separator - default value \r, \r\n and \n""") {
2122     val data =
2123       "{\"f\": \"a\", \"f0\": 1}\r{\"f\": \"c\",  \"f0\": 2}\r\n{\"f\": \"d\",  \"f0\": 3}\n"
2124
2125     withTempPath { path =>
2126       Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
2127       val df = spark.read.json(path.getAbsolutePath)
2128       val expectedSchema =
2129         StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil)
2130       checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
2131       assert(df.schema === expectedSchema)
2132     }
2133   }
2134
2135   test("SPARK-23849: schema inferring touches less data if samplingRatio < 1.0") {
2136     // Set default values for the DataSource parameters to make sure
2137     // that whole test file is mapped to only one partition. This will guarantee
2138     // reliable sampling of the input file.
2139     withSQLConf(
2140       "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
2141       "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
2142     )(withTempPath { path =>
2143       val ds = sampledTestData.coalesce(1)
2144       ds.write.text(path.getAbsolutePath)
2145       val readback = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath)
2146
2147       assert(readback.schema == new StructType().add("f1", LongType))
2148     })
2149   }
2150
2151   test("SPARK-23849: usage of samplingRatio while parsing a dataset of strings") {
2152     val ds = sampledTestData.coalesce(1)
2153     val readback = spark.read.option("samplingRatio", 0.1).json(ds)
2154
2155     assert(readback.schema == new StructType().add("f1", LongType))
2156   }
2157
2158   test("SPARK-23849: samplingRatio is out of the range (0, 1.0]") {
2159     val ds = spark.range(0, 100, 1, 1).map(_.toString)
2160
2161     val errorMsg0 = intercept[IllegalArgumentException] {
2162       spark.read.option("samplingRatio", -1).json(ds)
2163     }.getMessage
2164     assert(errorMsg0.contains("samplingRatio (-1.0) should be greater than 0"))
2165
2166     val errorMsg1 = intercept[IllegalArgumentException] {
2167       spark.read.option("samplingRatio", 0).json(ds)
2168     }.getMessage
2169     assert(errorMsg1.contains("samplingRatio (0.0) should be greater than 0"))
2170
2171     val sampled = spark.read.option("samplingRatio", 1.0).json(ds)
2172     assert(sampled.count() == ds.count())
2173   }
2174
2175   test("SPARK-23723: json in UTF-16 with BOM") {
2176     val fileName = "test-data/utf16WithBOM.json"
2177     val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
2178     val jsonDF = spark.read.schema(schema)
2179       .option("multiline", "true")
2180       .option("encoding", "UTF-16")
2181       .json(testFile(fileName))
2182
2183     checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood")))
2184   }
2185
2186   test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
2187     val fileName = "test-data/utf32BEWithBOM.json"
2188     val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
2189     val jsonDF = spark.read.schema(schema)
2190       .option("multiline", "true")
2191       .json(testFile(fileName))
2192
2193     checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
2194   }
2195
2196   test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") {
2197     val fileName = "test-data/utf16LE.json"
2198     val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
2199     val jsonDF = spark.read.schema(schema)
2200       .option("multiline", "true")
2201       .options(Map("encoding" -> "UTF-16LE"))
2202       .json(testFile(fileName))
2203
2204     checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
2205   }
2206
2207   test("SPARK-23723: Unsupported encoding name") {
2208     val invalidCharset = "UTF-128"
2209     val exception = intercept[UnsupportedCharsetException] {
2210       spark.read
2211         .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
2212         .json(testFile("test-data/utf16LE.json"))
2213         .count()
2214     }
2215
2216     assert(exception.getMessage.contains(invalidCharset))
2217   }
2218
2219   test("SPARK-23723: checking that the encoding option is case agnostic") {
2220     val fileName = "test-data/utf16LE.json"
2221     val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
2222     val jsonDF = spark.read.schema(schema)
2223       .option("multiline", "true")
2224       .options(Map("encoding" -> "uTf-16lE"))
2225       .json(testFile(fileName))
2226
2227     checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
2228   }
2229
2230
2231   test("SPARK-23723: specified encoding is not matched to actual encoding") {
2232     val fileName = "test-data/utf16LE.json"
2233     val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
2234     val exception = intercept[SparkException] {
2235       spark.read.schema(schema)
2236         .option("mode", "FAILFAST")
2237         .option("multiline", "true")
2238         .options(Map("encoding" -> "UTF-16BE"))
2239         .json(testFile(fileName))
2240         .count()
2241     }
2242     val errMsg = exception.getMessage
2243
2244     assert(errMsg.contains("Malformed records are detected in record parsing"))
2245   }
2246
2247   def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
2248       expectedContent: String): Unit = {
2249     val jsonFiles = new File(pathToJsonFiles)
2250       .listFiles()
2251       .filter(_.isFile)
2252       .filter(_.getName.endsWith("json"))
2253     val actualContent = jsonFiles.map { file =>
2254       new String(Files.readAllBytes(file.toPath), expectedEncoding)
2255     }.mkString.trim
2256
2257     assert(actualContent == expectedContent)
2258   }
2259
2260   test("SPARK-23723: save json in UTF-32BE") {
2261     val encoding = "UTF-32BE"
2262     withTempPath { path =>
2263       val df = spark.createDataset(Seq(("Dog", 42)))
2264       df.write
2265         .options(Map("encoding" -> encoding, "lineSep" -> "\n"))
2266         .json(path.getCanonicalPath)
2267
2268       checkEncoding(
2269         expectedEncoding = encoding,
2270         pathToJsonFiles = path.getCanonicalPath,
2271         expectedContent = """{"_1":"Dog","_2":42}""")
2272     }
2273   }
2274
2275   test("SPARK-23723: save json in default encoding - UTF-8") {
2276     withTempPath { path =>
2277       val df = spark.createDataset(Seq(("Dog", 42)))
2278       df.write.json(path.getCanonicalPath)
2279
2280       checkEncoding(
2281         expectedEncoding = "UTF-8",
2282         pathToJsonFiles = path.getCanonicalPath,
2283         expectedContent = """{"_1":"Dog","_2":42}""")
2284     }
2285   }
2286
2287   test("SPARK-23723: wrong output encoding") {
2288     val encoding = "UTF-128"
2289     val exception = intercept[UnsupportedCharsetException] {
2290       withTempPath { path =>
2291         val df = spark.createDataset(Seq((0)))
2292         df.write
2293           .options(Map("encoding" -> encoding, "lineSep" -> "\n"))
2294           .json(path.getCanonicalPath)
2295       }
2296     }
2297
2298     assert(exception.getMessage == encoding)
2299   }
2300
2301   test("SPARK-23723: read back json in UTF-16LE") {
2302     val options = Map("encoding" -> "UTF-16LE", "lineSep" -> "\n")
2303     withTempPath { path =>
2304       val ds = spark.createDataset(Seq(("a", 1), ("b", 2), ("c", 3))).repartition(2)
2305       ds.write.options(options).json(path.getCanonicalPath)
2306
2307       val readBack = spark
2308         .read
2309         .options(options)
2310         .json(path.getCanonicalPath)
2311
2312       checkAnswer(readBack.toDF(), ds.toDF())
2313     }
2314   }
2315
2316   test("SPARK-23723: write json in UTF-16/32 with multiline off") {
2317     Seq("UTF-16", "UTF-32").foreach { encoding =>
2318       withTempPath { path =>
2319         val ds = spark.createDataset(Seq(
2320           ("a", 1), ("b", 2), ("c", 3))
2321         ).repartition(2)
2322         val e = intercept[IllegalArgumentException] {
2323           ds.write
2324             .option("encoding", encoding)
2325             .option("multiline", "false")
2326             .format("json").mode("overwrite")
2327             .save(path.getCanonicalPath)
2328         }.getMessage
2329         assert(e.contains(
2330           s"$encoding encoding in the blacklist is not allowed when multiLine is disabled"))
2331       }
2332     }
2333   }
2334
2335   def checkReadJson(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = {
2336     test(s"SPARK-23724: checks reading json in ${encoding} #${id}") {
2337       val schema = new StructType().add("f1", StringType).add("f2", IntegerType)
2338       withTempPath { path =>
2339         val records = List(("a", 1), ("b", 2))
2340         val data = records
2341           .map(rec => s"""{"f1":"${rec._1}", "f2":${rec._2}}""".getBytes(encoding))
2342           .reduce((a1, a2) => a1 ++ lineSep.getBytes(encoding) ++ a2)
2343         val os = new FileOutputStream(path)
2344         os.write(data)
2345         os.close()
2346         val reader = if (inferSchema) {
2347           spark.read
2348         } else {
2349           spark.read.schema(schema)
2350         }
2351         val readBack = reader
2352           .option("encoding", encoding)
2353           .option("lineSep", lineSep)
2354           .json(path.getCanonicalPath)
2355         checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
2356       }
2357     }
2358   }
2359
2360   // scalastyle:off nonascii
2361   List(
2362     (0, "|", "UTF-8", false),
2363     (1, "^", "UTF-16BE", true),
2364     (2, "::", "ISO-8859-1", true),
2365     (3, "!!!@3", "UTF-32LE", false),
2366     (4, 0x1E.toChar.toString, "UTF-8", true),
2367     (5, "아", "UTF-32BE", false),
2368     (6, "куку", "CP1251", true),
2369     (7, "sep", "utf-8", false),
2370     (8, "\r\n", "UTF-16LE", false),
2371     (9, "\r\n", "utf-16be", true),
2372     (10, "\u000d\u000a", "UTF-32BE", false),
2373     (11, "\u000a\u000d", "UTF-8", true),
2374     (12, "===", "US-ASCII", false),
2375     (13, "$^+", "utf-32le", true)
2376   ).foreach {
2377     case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum)
2378   }
2379   // scalastyle:on nonascii
2380
2381   test("SPARK-23724: lineSep should be set if encoding if different from UTF-8") {
2382     val encoding = "UTF-16LE"
2383     val exception = intercept[IllegalArgumentException] {
2384       spark.read
2385         .options(Map("encoding" -> encoding))
2386         .json(testFile("test-data/utf16LE.json"))
2387         .count()
2388     }
2389
2390     assert(exception.getMessage.contains(
2391       s"""The lineSep option must be specified for the $encoding encoding"""))
2392   }
2393
2394   private val badJson = "\u0000\u0000\u0000A\u0001AAA"
2395
2396   test("SPARK-23094: permissively read JSON file with leading nulls when multiLine is enabled") {
2397     withTempPath { tempDir =>
2398       val path = tempDir.getAbsolutePath
2399       Seq(badJson + """{"a":1}""").toDS().write.text(path)
2400       val expected = s"""${badJson}{"a":1}\n"""
2401       val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType)
2402       val df = spark.read.format("json")
2403         .option("mode", "PERMISSIVE")
2404         .option("multiLine", true)
2405         .option("encoding", "UTF-8")
2406         .schema(schema).load(path)
2407       checkAnswer(df, Row(null, expected))
2408     }
2409   }
2410
2411   test("SPARK-23094: permissively read JSON file with leading nulls when multiLine is disabled") {
2412     withTempPath { tempDir =>
2413       val path = tempDir.getAbsolutePath
2414       Seq(badJson, """{"a":1}""").toDS().write.text(path)
2415       val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType)
2416       val df = spark.read.format("json")
2417         .option("mode", "PERMISSIVE")
2418         .option("multiLine", false)
2419         .option("encoding", "UTF-8")
2420         .schema(schema).load(path)
2421       checkAnswer(df, Seq(Row(1, null), Row(null, badJson)))
2422     }
2423   }
2424
2425   test("SPARK-23094: permissively parse a dataset contains JSON with leading nulls") {
2426     checkAnswer(
2427       spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()),
2428       Row(badJson))
2429   }
2430 }