[SPARK-25357][SQL] Add metadata to SparkPlanInfo to dump more information like file...
authorLantaoJin <jinlantao@gmail.com>
Thu, 13 Sep 2018 01:57:34 +0000 (09:57 +0800)
committerWenchen Fan <wenchen@databricks.com>
Thu, 13 Sep 2018 01:57:34 +0000 (09:57 +0800)
## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After #18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes #22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala

index 2a23158..59ffd16 100644 (file)
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.SQLMetricInfo
@@ -28,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetricInfo
  * Stores information about a SQL SparkPlan.
  */
 @DeveloperApi
-@JsonIgnoreProperties(Array("metadata")) // The metadata field was removed in Spark 2.3.
 class SparkPlanInfo(
     val nodeName: String,
     val simpleString: String,
     val children: Seq[SparkPlanInfo],
+    val metadata: Map[String, String],
     val metrics: Seq[SQLMetricInfo]) {
 
   override def hashCode(): Int = {
@@ -59,6 +57,12 @@ private[execution] object SparkPlanInfo {
       new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
     }
 
-    new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics)
+    // dump the file scan metadata (e.g file path) to event log
+    val metadata = plan match {
+      case fileScan: FileSourceScanExec => fileScan.metadata
+      case _ => Map[String, String]()
+    }
+    new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
+      metadata, metrics)
   }
 }
index c2e62b9..08e40e2 100644 (file)
@@ -46,7 +46,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite {
       """.stripMargin
     val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString))
     val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan",
-      new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0)
+      new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
     assert(reconstructedEvent == expectedEvent)
   }
 }
index 34dc6f3..47ff372 100644 (file)
@@ -50,4 +50,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") {
+    withTempPath { path =>
+      spark.range(5).write.parquet(path.getAbsolutePath)
+      val f = spark.read.parquet(path.getAbsolutePath)
+      assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty)
+    }
+  }
 }