[SPARK-24543][SQL] Support any type as DDL string for from_json's schema
authorMaxim Gekk <maxim.gekk@databricks.com>
Thu, 14 Jun 2018 20:27:27 +0000 (13:27 -0700)
committerWenchen Fan <wenchen@databricks.com>
Thu, 14 Jun 2018 20:27:27 +0000 (13:27 -0700)
## What changes were proposed in this pull request?

In the PR, I propose to support any DataType represented as DDL string for the from_json function. After the changes, it will be possible to specify `MapType` in SQL like:
```sql
select from_json('{"a":1, "b":2}', 'map<string, int>')
```
and in Scala (similar in other languages)
```scala
val in = Seq("""{"a": {"b": 1}}""").toDS()
val schema = "map<string, map<string, int>>"
val out = in.select(from_json($"value", schema, Map.empty[String, String]))
```

## How was this patch tested?

Added a couple sql tests and modified existing tests for Python and Scala. The former tests were modified because it is not imported for them in which format schema for `from_json` is provided.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21550 from MaxGekk/from_json-ddl-schema.

python/pyspark/sql/functions.py
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
sql/core/src/main/scala/org/apache/spark/sql/functions.scala
sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala

index a5e3384..e634669 100644 (file)
@@ -2168,8 +2168,7 @@ def from_json(col, schema, options={}):
     [Row(json=Row(a=1))]
     >>> df.select(from_json(df.value, "a INT").alias("json")).collect()
     [Row(json=Row(a=1))]
-    >>> schema = MapType(StringType(), IntegerType())
-    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    >>> df.select(from_json(df.value, "MAP<STRING,INT>").alias("json")).collect()
     [Row(json={u'a': 1})]
     >>> data = [(1, '''[{"a": 1}]''')]
     >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
index 04a4eb0..f6d74f5 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData, MapData}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -747,8 +746,8 @@ case class StructsToJson(
 
 object JsonExprUtils {
 
-  def validateSchemaLiteral(exp: Expression): StructType = exp match {
-    case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
+  def validateSchemaLiteral(exp: Expression): DataType = exp match {
+    case Literal(s, StringType) => DataType.fromDDL(s.toString)
     case e => throw new AnalysisException(s"Expected a string literal instead of $e")
   }
 
index 0bef116..fd40741 100644 (file)
@@ -19,6 +19,8 @@ package org.apache.spark.sql.types
 
 import java.util.Locale
 
+import scala.util.control.NonFatal
+
 import org.json4s._
 import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
@@ -26,6 +28,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
@@ -110,6 +113,14 @@ abstract class DataType extends AbstractDataType {
 @InterfaceStability.Stable
 object DataType {
 
+  def fromDDL(ddl: String): DataType = {
+    try {
+      CatalystSqlParser.parseDataType(ddl)
+    } catch {
+      case NonFatal(_) => CatalystSqlParser.parseTableSchema(ddl)
+    }
+  }
+
   def fromJson(json: String): DataType = parseDataType(parse(json))
 
   private val nonDecimalNameToType = {
index 87bd7b3..8551058 100644 (file)
@@ -3369,7 +3369,7 @@ object functions {
     val dataType = try {
       DataType.fromJson(schema)
     } catch {
-      case NonFatal(_) => StructType.fromDDL(schema)
+      case NonFatal(_) => DataType.fromDDL(schema)
     }
     from_json(e, dataType, options)
   }
index fea069e..dc15d13 100644 (file)
@@ -31,3 +31,7 @@ CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1,
 SELECT json_tuple(jsonField, 'b', CAST(NULL AS STRING), a) FROM jsonTable;
 -- Clean up
 DROP VIEW IF EXISTS jsonTable;
+
+-- from_json - complex types
+select from_json('{"a":1, "b":2}', 'map<string, int>');
+select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>');
index 14a6912..2b3288d 100644 (file)
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 26
+-- Number of queries: 28
 
 
 -- !query 0
@@ -258,3 +258,19 @@ DROP VIEW IF EXISTS jsonTable
 struct<>
 -- !query 25 output
 
+
+
+-- !query 26
+select from_json('{"a":1, "b":2}', 'map<string, int>')
+-- !query 26 schema
+struct<entries:map<string,int>>
+-- !query 26 output
+{"a":1,"b":2}
+
+
+-- !query 27
+select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>')
+-- !query 27 schema
+struct<jsontostructs({"a":1, "b":"2"}):struct<a:int,b:string>>
+-- !query 27 output
+{"a":1,"b":"2"}
index 055e1fc..7bf17cb 100644 (file)
@@ -354,8 +354,8 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
 
   test("SPARK-24027: from_json - map<string, map<string, int>>") {
     val in = Seq("""{"a": {"b": 1}}""").toDS()
-    val schema = MapType(StringType, MapType(StringType, IntegerType))
-    val out = in.select(from_json($"value", schema))
+    val schema = "map<string, map<string, int>>"
+    val out = in.select(from_json($"value", schema, Map.empty[String, String]))
 
     checkAnswer(out, Row(Map("a" -> Map("b" -> 1))))
   }