[SPARK-25425][SQL] Extra options should override session options in DataSource V2
authorMaxim Gekk <maxim.gekk@databricks.com>
Sun, 16 Sep 2018 00:24:11 +0000 (17:24 -0700)
committerDongjoon Hyun <dongjoon@apache.org>
Sun, 16 Sep 2018 00:24:11 +0000 (17:24 -0700)
## What changes were proposed in this pull request?

In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options. Entries from seconds map overwrites entries with the same key from the first map, for example:
```Scala
scala> Map("option" -> false) ++ Map("option" -> true)
res0: scala.collection.immutable.Map[String,Boolean] = Map(option -> true)
```

## How was this patch tested?

Added a test for checking which option is propagated to a data source in `load()`.

Closes #22413 from MaxGekk/session-options.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

index e6c2cba..fe69f25 100644 (file)
@@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
           DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
         }
         Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
-          ds, extraOptions.toMap ++ sessionOptions + pathsOption,
+          ds, sessionOptions ++ extraOptions.toMap + pathsOption,
           userSpecifiedSchema = userSpecifiedSchema))
       } else {
         loadV1Source(paths: _*)
index dfb8c47..188fce7 100644 (file)
@@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       val source = cls.newInstance().asInstanceOf[DataSourceV2]
       source match {
         case provider: BatchWriteSupportProvider =>
-          val options = extraOptions ++
-              DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
+          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+            source,
+            df.sparkSession.sessionState.conf)
+          val options = sessionOptions ++ extraOptions
 
-          val relation = DataSourceV2Relation.create(source, options.toMap)
+          val relation = DataSourceV2Relation.create(source, options)
           if (mode == SaveMode.Append) {
             runCommand(df.sparkSession, "save") {
               AppendData.byName(relation, df.logicalPlan)
index f6c3e0c..7cc8abc 100644 (file)
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources.v2
 
+import java.io.File
+
 import test.org.apache.spark.sql.sources.v2._
 
 import org.apache.spark.SparkException
@@ -317,6 +319,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
     checkCanonicalizedOutput(df, 2, 2)
     checkCanonicalizedOutput(df.select('i), 2, 1)
   }
+
+  test("SPARK-25425: extra options should override sessions options during reading") {
+    val prefix = "spark.datasource.userDefinedDataSource."
+    val optionName = "optionA"
+    withSQLConf(prefix + optionName -> "true") {
+      val df = spark
+        .read
+        .option(optionName, false)
+        .format(classOf[DataSourceV2WithSessionConfig].getName).load()
+      val options = df.queryExecution.optimizedPlan.collectFirst {
+        case d: DataSourceV2Relation => d.options
+      }
+      assert(options.get.get(optionName) == Some("false"))
+    }
+  }
+
+  test("SPARK-25425: extra options should override sessions options during writing") {
+    withTempPath { path =>
+      val sessionPath = path.getCanonicalPath
+      withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) {
+        withTempPath { file =>
+          val optionPath = file.getCanonicalPath
+          val format = classOf[SimpleWritableDataSource].getName
+
+          val df = Seq((1L, 2L)).toDF("i", "j")
+          df.write.format(format).option("path", optionPath).save()
+          assert(!new File(sessionPath).exists)
+          checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
+        }
+      }
+    }
+  }
 }
 
 
@@ -385,7 +419,6 @@ class SimpleDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider {
   }
 }
 
-
 class AdvancedDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider {
 
   class ReadSupport extends SimpleReadSupport {
index 952241b..a0f4404 100644 (file)
@@ -39,10 +39,14 @@ import org.apache.spark.util.SerializableConfiguration
  * Each job moves files from `target/_temporary/queryId/` to `target`.
  */
 class SimpleWritableDataSource extends DataSourceV2
-  with BatchReadSupportProvider with BatchWriteSupportProvider {
+  with BatchReadSupportProvider
+  with BatchWriteSupportProvider
+  with SessionConfigSupport {
 
   private val schema = new StructType().add("i", "long").add("j", "long")
 
+  override def keyPrefix: String = "simpleWritableDataSource"
+
   class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {
 
     override def fullSchema(): StructType = schema