[SPARK-24478][SQL] Move projection and filter push down to physical conversion
authorRyan Blue <blue@apache.org>
Fri, 15 Jun 2018 03:59:42 +0000 (20:59 -0700)
committerWenchen Fan <wenchen@databricks.com>
Fri, 15 Jun 2018 03:59:42 +0000 (20:59 -0700)
## What changes were proposed in this pull request?

This removes the v2 optimizer rule for push-down and instead pushes filters and required columns when converting to a physical plan, as suggested by marmbrus. This makes the v2 relation cleaner because the output and filters do not change in the logical plan.

A side-effect of this change is that the stats from the logical (optimized) plan no longer reflect pushed filters and projection. This is a temporary state, until the planner gathers stats from the physical plan instead. An alternative to this approach is https://github.com/rdblue/spark/commit/9d3a11e68bca6c5a56a2be47fb09395350362ac5.

The first commit was proposed in #21262. This PR replaces #21262.

## How was this patch tested?

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #21503 from rdblue/SPARK-24478-move-push-down-to-physical-conversion.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala [deleted file]
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

index 11bb13f..a79080a 100644 (file)
@@ -22,6 +22,11 @@ import org.apache.spark.annotation.InterfaceStability;
 /**
  * A mix in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to report statistics to Spark.
+ *
+ * Statistics are reported to the optimizer before a projection or any filters are pushed to the
+ * DataSourceReader. Implementations that return more accurate statistics based on projection and
+ * filters will not improve query performance until the planner can push operators before getting
+ * stats.
  */
 @InterfaceStability.Evolving
 public interface SupportsReportStatistics extends DataSourceReader {
index 1c8e405..00ff4c8 100644 (file)
@@ -21,7 +21,6 @@ import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
-import org.apache.spark.sql.execution.datasources.v2.PushDownOperatorsToDataSource
 import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
 
 class SparkOptimizer(
@@ -32,8 +31,7 @@ class SparkOptimizer(
   override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
-    Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
-    Batch("Push down operators to data source scan", Once, PushDownOperatorsToDataSource)) ++
+    Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
     postHocOptimizationBatches :+
     Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
 
index 90fb5a1..e08af21 100644 (file)
@@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
@@ -32,69 +31,27 @@ import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
     source: DataSourceV2,
+    output: Seq[AttributeReference],
     options: Map[String, String],
-    projection: Seq[AttributeReference],
-    filters: Option[Seq[Expression]] = None,
     userSpecifiedSchema: Option[StructType] = None)
   extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
 
-  override def simpleString: String = "RelationV2 " + metadataString
-
-  override lazy val schema: StructType = reader.readSchema()
-
-  override lazy val output: Seq[AttributeReference] = {
-    // use the projection attributes to avoid assigning new ids. fields that are not projected
-    // will be assigned new ids, which is okay because they are not projected.
-    val attrMap = projection.map(a => a.name -> a).toMap
-    schema.map(f => attrMap.getOrElse(f.name,
-      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
-  }
-
-  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+  override def pushedFilters: Seq[Expression] = Seq.empty
 
-  // postScanFilters: filters that need to be evaluated after the scan.
-  // pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
-  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
-  lazy val (
-      reader: DataSourceReader,
-      postScanFilters: Seq[Expression],
-      pushedFilters: Seq[Expression]) = {
-    val newReader = userSpecifiedSchema match {
-      case Some(s) =>
-        source.asReadSupportWithSchema.createReader(s, v2Options)
-      case _ =>
-        source.asReadSupport.createReader(v2Options)
-    }
-
-    DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
-
-    val (postScanFilters, pushedFilters) = filters match {
-      case Some(filterSeq) =>
-        DataSourceV2Relation.pushFilters(newReader, filterSeq)
-      case _ =>
-        (Nil, Nil)
-    }
-    logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
-    logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
-
-    (newReader, postScanFilters, pushedFilters)
-  }
-
-  override def doCanonicalize(): LogicalPlan = {
-    val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+  override def simpleString: String = "RelationV2 " + metadataString
 
-    // override output with canonicalized output to avoid attempting to configure a reader
-    val canonicalOutput: Seq[AttributeReference] = this.output
-        .map(a => QueryPlan.normalizeExprId(a, projection))
+  lazy val v2Options: DataSourceOptions = makeV2Options(options)
 
-    new DataSourceV2Relation(c.source, c.options, c.projection) {
-      override lazy val output: Seq[AttributeReference] = canonicalOutput
-    }
+  def newReader: DataSourceReader = userSpecifiedSchema match {
+    case Some(userSchema) =>
+      source.asReadSupportWithSchema.createReader(userSchema, v2Options)
+    case None =>
+      source.asReadSupport.createReader(v2Options)
   }
 
-  override def computeStats(): Statistics = reader match {
+  override def computeStats(): Statistics = newReader match {
     case r: SupportsReportStatistics =>
       Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
     case _ =>
@@ -102,9 +59,7 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-    // projection is used to maintain id assignment.
-    // if projection is not set, use output so the copy is not equal to the original
-    copy(projection = projection.map(_.newInstance()))
+    copy(output = output.map(_.newInstance()))
   }
 }
 
@@ -206,21 +161,27 @@ object DataSourceV2Relation {
   def create(
       source: DataSourceV2,
       options: Map[String, String],
-      filters: Option[Seq[Expression]] = None,
       userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
-    val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
-    DataSourceV2Relation(source, options, projection, filters, userSpecifiedSchema)
+    val output = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
+    DataSourceV2Relation(source, output, options, userSpecifiedSchema)
   }
 
-  private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
+  def pushRequiredColumns(
+      relation: DataSourceV2Relation,
+      reader: DataSourceReader,
+      struct: StructType): Seq[AttributeReference] = {
     reader match {
       case projectionSupport: SupportsPushDownRequiredColumns =>
         projectionSupport.pruneColumns(struct)
+        // return the output columns from the relation that were projected
+        val attrMap = relation.output.map(a => a.name -> a).toMap
+        projectionSupport.readSchema().map(f => attrMap(f.name))
       case _ =>
+        relation.output
     }
   }
 
-  private def pushFilters(
+  def pushFilters(
       reader: DataSourceReader,
       filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
     reader match {
@@ -248,7 +209,7 @@ object DataSourceV2Relation {
         // the data source cannot guarantee the rows returned can pass these filters.
         // As a result we must return it so Spark can plan an extra filter operator.
         val postScanFilters =
-          r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
+        r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
         // The filters which are marked as pushed to this data source
         val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
 
index 1b7c639..8bf858c 100644 (file)
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case r: DataSourceV2Relation =>
-      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
+    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
+      val projectSet = AttributeSet(project.flatMap(_.references))
+      val filterSet = AttributeSet(filters.flatMap(_.references))
+
+      val projection = if (filterSet.subsetOf(projectSet) &&
+          AttributeSet(relation.output) == projectSet) {
+        // When the required projection contains all of the filter columns and column pruning alone
+        // can produce the required projection, push the required projection.
+        // A final projection may still be needed if the data source produces a different column
+        // order or if it cannot prune all of the nested columns.
+        relation.output
+      } else {
+        // When there are filter columns not already in the required projection or when the required
+        // projection is more complicated than column pruning, base column pruning on the set of
+        // all columns needed by both.
+        (projectSet ++ filterSet).toSeq
+      }
+
+      val reader = relation.newReader
+
+      val output = DataSourceV2Relation.pushRequiredColumns(relation, reader,
+        projection.asInstanceOf[Seq[AttributeReference]].toStructType)
+
+      val (postScanFilters, pushedFilters) = DataSourceV2Relation.pushFilters(reader, filters)
+
+      logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
+      logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
+
+      val scan = DataSourceV2ScanExec(
+        output, relation.source, relation.options, pushedFilters, reader)
+
+      val filter = postScanFilters.reduceLeftOption(And)
+      val withFilter = filter.map(execution.FilterExec(_, scan)).getOrElse(scan)
+
+      val withProjection = if (withFilter.output != project) {
+        execution.ProjectExec(project, withFilter)
+      } else {
+        withFilter
+      }
+
+      withProjection :: Nil
 
     case r: StreamingDataSourceV2Relation =>
       DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
deleted file mode 100644 (file)
index e894f8a..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
-    // PhysicalOperation guarantees that filters are deterministic; no need to check
-    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
-      assert(relation.filters.isEmpty, "data source v2 should do push down only once.")
-
-      val projectAttrs = project.map(_.toAttribute)
-      val projectSet = AttributeSet(project.flatMap(_.references))
-      val filterSet = AttributeSet(filters.flatMap(_.references))
-
-      val projection = if (filterSet.subsetOf(projectSet) &&
-          AttributeSet(projectAttrs) == projectSet) {
-        // When the required projection contains all of the filter columns and column pruning alone
-        // can produce the required projection, push the required projection.
-        // A final projection may still be needed if the data source produces a different column
-        // order or if it cannot prune all of the nested columns.
-        projectAttrs
-      } else {
-        // When there are filter columns not already in the required projection or when the required
-        // projection is more complicated than column pruning, base column pruning on the set of
-        // all columns needed by both.
-        (projectSet ++ filterSet).toSeq
-      }
-
-      val newRelation = relation.copy(
-        projection = projection.asInstanceOf[Seq[AttributeReference]],
-        filters = Some(filters))
-
-      // Add a Filter for any filters that need to be evaluated after scan.
-      val postScanFilterCond = newRelation.postScanFilters.reduceLeftOption(And)
-      val filtered = postScanFilterCond.map(Filter(_, newRelation)).getOrElse(newRelation)
-
-      // Add a Project to ensure the output matches the required projection
-      if (newRelation.output != projectAttrs) {
-        Project(project, filtered)
-      } else {
-        filtered
-      }
-
-    case other => other.mapChildren(apply)
-  }
-}
index 505a3f3..e96cd45 100644 (file)
@@ -323,21 +323,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-23315: get output from canonicalized data source v2 related plans") {
-    def checkCanonicalizedOutput(df: DataFrame, numOutput: Int): Unit = {
+    def checkCanonicalizedOutput(
+        df: DataFrame, logicalNumOutput: Int, physicalNumOutput: Int): Unit = {
       val logical = df.queryExecution.optimizedPlan.collect {
         case d: DataSourceV2Relation => d
       }.head
-      assert(logical.canonicalized.output.length == numOutput)
+      assert(logical.canonicalized.output.length == logicalNumOutput)
 
       val physical = df.queryExecution.executedPlan.collect {
         case d: DataSourceV2ScanExec => d
       }.head
-      assert(physical.canonicalized.output.length == numOutput)
+      assert(physical.canonicalized.output.length == physicalNumOutput)
     }
 
     val df = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
-    checkCanonicalizedOutput(df, 2)
-    checkCanonicalizedOutput(df.select('i), 1)
+    checkCanonicalizedOutput(df, 2, 2)
+    checkCanonicalizedOutput(df.select('i), 2, 1)
   }
 }