[SPARK-25426][SQL] Remove the duplicate fallback logic in UnsafeProjection
authorTakeshi Yamamuro <yamamuro@apache.org>
Sat, 15 Sep 2018 23:20:45 +0000 (16:20 -0700)
committergatorsmile <gatorsmile@gmail.com>
Sat, 15 Sep 2018 23:20:45 +0000 (16:20 -0700)
## What changes were proposed in this pull request?
This pr removed the duplicate fallback logic in `UnsafeProjection`.

This pr comes from #22355.

## How was this patch tested?
Added tests in `CodeGeneratorWithInterpretedFallbackSuite`.

Closes #22417 from maropu/SPARK-25426.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

index 226a4dd..5f24170 100644 (file)
 
 package org.apache.spark.sql.catalyst.expressions
 
-import scala.util.control.NonFatal
-
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructType}
 
 /**
@@ -117,7 +116,7 @@ object UnsafeProjection
     extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {
 
   override protected def createCodeGeneratedObject(in: Seq[Expression]): UnsafeProjection = {
-    GenerateUnsafeProjection.generate(in)
+    GenerateUnsafeProjection.generate(in, SQLConf.get.subexpressionEliminationEnabled)
   }
 
   override protected def createInterpretedObject(in: Seq[Expression]): UnsafeProjection = {
@@ -168,26 +167,6 @@ object UnsafeProjection
   def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): UnsafeProjection = {
     create(toBoundExprs(exprs, inputSchema))
   }
-
-  /**
-   * Same as other create()'s but allowing enabling/disabling subexpression elimination.
-   * The param `subexpressionEliminationEnabled` doesn't guarantee to work. For example,
-   * when fallbacking to interpreted execution, it is not supported.
-   */
-  def create(
-      exprs: Seq[Expression],
-      inputSchema: Seq[Attribute],
-      subexpressionEliminationEnabled: Boolean): UnsafeProjection = {
-    val unsafeExprs = toUnsafeExprs(toBoundExprs(exprs, inputSchema))
-    try {
-      GenerateUnsafeProjection.generate(unsafeExprs, subexpressionEliminationEnabled)
-    } catch {
-      case NonFatal(_) =>
-        // We should have already seen the error message in `CodeGenerator`
-        logWarning("Expr codegen error and falling back to interpreter mode")
-        InterpretedUnsafeProjection.createProjection(unsafeExprs)
-    }
-  }
 }
 
 /**
index 9434ceb..222a1b8 100644 (file)
@@ -68,8 +68,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
 
   protected override def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
-      val project = UnsafeProjection.create(projectList, child.output,
-        subexpressionEliminationEnabled)
+      val project = UnsafeProjection.create(projectList, child.output)
       project.initialize(index)
       iter.map(project)
     }