[SPARK-25352][SQL][FOLLOWUP] Add helper method and address style issue
authorLiang-Chi Hsieh <viirya@gmail.com>
Thu, 13 Sep 2018 12:21:00 +0000 (14:21 +0200)
committerHerman van Hovell <hvanhovell@databricks.com>
Thu, 13 Sep 2018 12:21:00 +0000 (14:21 +0200)
## What changes were proposed in this pull request?

This follow-up patch addresses [the review comment](https://github.com/apache/spark/pull/22344/files#r217070658) by adding a helper method to simplify code and fixing style issue.

## How was this patch tested?

Existing unit tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #22409 from viirya/SPARK-25352-followup.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

index 7c8ce31..89442a7 100644 (file)
@@ -66,44 +66,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    * Plans special cases of limit operators.
    */
   object SpecialLimits extends Strategy {
-    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case ReturnAnswer(rootPlan) => rootPlan match {
-        case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
-          if (limit < conf.topKSortFallbackThreshold) {
+    private def decideTopRankNode(limit: Int, child: LogicalPlan): Seq[SparkPlan] = {
+      if (limit < conf.topKSortFallbackThreshold) {
+        child match {
+          case Sort(order, true, child) =>
             TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
-          } else {
-            GlobalLimitExec(limit,
-              LocalLimitExec(limit, planLater(s)),
-              orderedLimit = true) :: Nil
-          }
-        case Limit(IntegerLiteral(limit), p@Project(projectList, Sort(order, true, child))) =>
-          if (limit < conf.topKSortFallbackThreshold) {
+          case Project(projectList, Sort(order, true, child)) =>
             TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
-          } else {
-            GlobalLimitExec(limit,
-              LocalLimitExec(limit, planLater(p)),
-              orderedLimit = true) :: Nil
-          }
+        }
+      } else {
+        GlobalLimitExec(limit,
+          LocalLimitExec(limit, planLater(child)),
+          orderedLimit = true) :: Nil
+      }
+    }
+
+    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case ReturnAnswer(rootPlan) => rootPlan match {
+        case Limit(IntegerLiteral(limit), s @ Sort(order, true, child)) =>
+          decideTopRankNode(limit, s)
+        case Limit(IntegerLiteral(limit), p @ Project(projectList, Sort(order, true, child))) =>
+          decideTopRankNode(limit, p)
         case Limit(IntegerLiteral(limit), child) =>
           CollectLimitExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
-      case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
-        if (limit < conf.topKSortFallbackThreshold) {
-          TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
-        } else {
-          GlobalLimitExec(limit,
-            LocalLimitExec(limit, planLater(s)),
-            orderedLimit = true) :: Nil
-        }
-      case Limit(IntegerLiteral(limit), p@Project(projectList, Sort(order, true, child))) =>
-        if (limit < conf.topKSortFallbackThreshold) {
-          TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
-        } else {
-          GlobalLimitExec(limit,
-            LocalLimitExec(limit, planLater(p)),
-            orderedLimit = true) :: Nil
-        }
+      case Limit(IntegerLiteral(limit), s @ Sort(order, true, child)) =>
+        decideTopRankNode(limit, s)
+      case Limit(IntegerLiteral(limit), p @ Project(projectList, Sort(order, true, child))) =>
+        decideTopRankNode(limit, p)
       case _ => Nil
     }
   }