[SPARK-24242][SQL] RangeExec should have correct outputOrdering and outputPartitioning
authorLiang-Chi Hsieh <viirya@gmail.com>
Mon, 21 May 2018 07:39:35 +0000 (15:39 +0800)
committerhyukjinkwon <gurwls223@apache.org>
Mon, 21 May 2018 07:39:35 +0000 (15:39 +0800)
## What changes were proposed in this pull request?

Logical `Range` node has been added with `outputOrdering` recently. It's used to eliminate redundant `Sort` during optimization. However, this `outputOrdering` doesn't not propagate to physical `RangeExec` node.

We also add correct `outputPartitioning` to `RangeExec` node.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21291 from viirya/SPARK-24242.

python/pyspark/sql/tests.py
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala

index a1b6db7..c7bd8f0 100644 (file)
@@ -5239,8 +5239,8 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         expected2 = df.groupby().agg(sum(df.v))
 
         # groupby one column and one sql expression
-        result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-        expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+        result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
+        expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)).orderBy(df.id, df.v % 2)
 
         # groupby one python UDF
         result4 = df.groupby(plus_one(df.id)).agg(sum_udf(df.v))
index 1edfdc8..2df81d0 100644 (file)
@@ -345,6 +345,20 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+  override def outputPartitioning: Partitioning = {
+    if (numElements > 0) {
+      if (numSlices == 1) {
+        SinglePartition
+      } else {
+        RangePartitioning(outputOrdering, numSlices)
+      }
+    } else {
+      UnknownPartitioning(0)
+    }
+  }
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
index 949505e..276496b 100644 (file)
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {
     def computeChiSquareTest(): Double = {
       val n = 10000
       // Trigger a sort
-      val data = spark.range(0, n, 1, 1).sort('id.desc)
+      // Range has range partitioning in its output now. To have a range shuffle, we
+      // need to run a repartition first.
+      val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
         .selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()
 
       // Compute histogram for the number of records per partition post sort
index a375f88..b2aba8e 100644 (file)
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range, Repartition, Sort}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
@@ -633,6 +633,31 @@ class PlannerSuite extends SharedSQLContext {
       requiredOrdering = Seq(orderingA, orderingB),
       shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering and partitioning") {
+    val df = spark.range(10)
+    val rangeExec = df.queryExecution.executedPlan.collect {
+      case r: RangeExec => r
+    }
+    val range = df.queryExecution.optimizedPlan.collect {
+      case r: Range => r
+    }
+    assert(rangeExec.head.outputOrdering == range.head.outputOrdering)
+    assert(rangeExec.head.outputPartitioning ==
+      RangePartitioning(rangeExec.head.outputOrdering, df.rdd.getNumPartitions))
+
+    val rangeInOnePartition = spark.range(1, 10, 1, 1)
+    val rangeExecInOnePartition = rangeInOnePartition.queryExecution.executedPlan.collect {
+      case r: RangeExec => r
+    }
+    assert(rangeExecInOnePartition.head.outputPartitioning == SinglePartition)
+
+    val rangeInZeroPartition = spark.range(-10, -9, -20, 1)
+    val rangeExecInZeroPartition = rangeInZeroPartition.queryExecution.executedPlan.collect {
+      case r: RangeExec => r
+    }
+    assert(rangeExecInZeroPartition.head.outputPartitioning == UnknownPartitioning(0))
+  }
 }
 
 // Used for unit-testing EnsureRequirements
index 9180a22..b714dcd 100644 (file)
@@ -51,12 +51,12 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
   }
 
   test("Aggregate with grouping keys should be included in WholeStageCodegen") {
-    val df = spark.range(3).groupBy("id").count().orderBy("id")
+    val df = spark.range(3).groupBy(col("id") * 2).count().orderBy(col("id") * 2)
     val plan = df.queryExecution.executedPlan
     assert(plan.find(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
         p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
-    assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1)))
+    assert(df.collect() === Array(Row(0, 1), Row(2, 1), Row(4, 1)))
   }
 
   test("BroadcastHashJoin should be included in WholeStageCodegen") {
index adcaf2d..8251ff1 100644 (file)
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.debug
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData.TestData
 
@@ -33,14 +34,16 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("debugCodegen") {
-    val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
+    val res = codegenString(spark.range(10).groupBy(col("id") * 2).count()
+      .queryExecution.executedPlan)
     assert(res.contains("Subtree 1 / 2"))
     assert(res.contains("Subtree 2 / 2"))
     assert(res.contains("Object[]"))
   }
 
   test("debugCodegenStringSeq") {
-    val res = codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
+    val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
+      .queryExecution.executedPlan)
     assert(res.length == 2)
     assert(res.forall{ case (subtree, code) =>
       subtree.contains("Range") && code.contains("Object[]")})