[SPARK-17091][SQL] Add rule to convert IN predicate to equivalent Parquet filter
authorYuming Wang <yumwang@ebay.com>
Sat, 14 Jul 2018 09:50:54 +0000 (17:50 +0800)
committerhyukjinkwon <gurwls223@apache.org>
Sat, 14 Jul 2018 09:50:54 +0000 (17:50 +0800)
## What changes were proposed in this pull request?

The original pr is: https://github.com/apache/spark/pull/18424

Add a new optimizer rule to convert an IN predicate to an equivalent Parquet filter and add `spark.sql.parquet.pushdown.inFilterThreshold` to control limit thresholds. Different data types have different limit thresholds, this is a copy of data for reference:

Type | limit threshold
-- | --
string | 370
int | 210
long | 285
double | 270
float | 220
decimal | Won't provide better performance before [SPARK-24549](https://issues.apache.org/jira/browse/SPARK-24549)

## How was this patch tested?
unit tests and manual tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21603 from wangyum/SPARK-17091.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
sql/core/benchmarks/FilterPushdownBenchmark-results.txt
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

index 14dd528..699e939 100644 (file)
@@ -386,6 +386,18 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
+    buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
+      .doc("The maximum number of values to filter push-down optimization for IN predicate. " +
+        "Large threshold won't necessarily provide much better performance. " +
+        "The experiment argued that 300 is the limit threshold. " +
+        "By setting this value to 0 this feature can be disabled. " +
+        "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
+      .internal()
+      .intConf
+      .checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
+      .createWithDefault(10)
+
   val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
     .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
       "versions, when converting Parquet schema to Spark SQL schema and vice versa.")
@@ -1485,6 +1497,9 @@ class SQLConf extends Serializable with Logging {
   def parquetFilterPushDownStringStartWith: Boolean =
     getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
 
+  def parquetFilterPushDownInFilterThreshold: Int =
+    getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)
+
   def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
index 110669b..c44908b 100644 (file)
@@ -417,120 +417,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 5, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7477 / 7587          2.1         475.4       1.0X
-Parquet Vectorized (Pushdown)                 7862 / 8346          2.0         499.9       1.0X
-Native ORC Vectorized                         6447 / 7021          2.4         409.9       1.2X
-Native ORC Vectorized (Pushdown)               983 / 1003         16.0          62.5       7.6X
+Parquet Vectorized                            7993 / 8104          2.0         508.2       1.0X
+Parquet Vectorized (Pushdown)                  507 /  532         31.0          32.2      15.8X
+Native ORC Vectorized                         6922 / 7163          2.3         440.1       1.2X
+Native ORC Vectorized (Pushdown)              1017 / 1058         15.5          64.6       7.9X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 5, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7107 / 7290          2.2         451.9       1.0X
-Parquet Vectorized (Pushdown)                 7196 / 7258          2.2         457.5       1.0X
-Native ORC Vectorized                         6102 / 6222          2.6         388.0       1.2X
-Native ORC Vectorized (Pushdown)               926 /  958         17.0          58.9       7.7X
+Parquet Vectorized                            7855 / 7963          2.0         499.4       1.0X
+Parquet Vectorized (Pushdown)                  503 /  516         31.3          32.0      15.6X
+Native ORC Vectorized                         6825 / 6954          2.3         433.9       1.2X
+Native ORC Vectorized (Pushdown)              1019 / 1044         15.4          64.8       7.7X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 5, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7374 / 7692          2.1         468.8       1.0X
-Parquet Vectorized (Pushdown)                 7771 / 7848          2.0         494.1       0.9X
-Native ORC Vectorized                         6184 / 6356          2.5         393.2       1.2X
-Native ORC Vectorized (Pushdown)               920 /  963         17.1          58.5       8.0X
+Parquet Vectorized                            7858 / 7928          2.0         499.6       1.0X
+Parquet Vectorized (Pushdown)                  490 /  519         32.1          31.1      16.0X
+Native ORC Vectorized                         7079 / 7966          2.2         450.1       1.1X
+Native ORC Vectorized (Pushdown)              1276 / 1673         12.3          81.1       6.2X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 10, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7073 / 7326          2.2         449.7       1.0X
-Parquet Vectorized (Pushdown)                 7304 / 7647          2.2         464.4       1.0X
-Native ORC Vectorized                         6222 / 6579          2.5         395.6       1.1X
-Native ORC Vectorized (Pushdown)               958 /  994         16.4          60.9       7.4X
+Parquet Vectorized                           8007 / 11155          2.0         509.0       1.0X
+Parquet Vectorized (Pushdown)                  519 /  540         30.3          33.0      15.4X
+Native ORC Vectorized                         6848 / 7072          2.3         435.4       1.2X
+Native ORC Vectorized (Pushdown)              1026 / 1050         15.3          65.2       7.8X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 10, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7121 / 7501          2.2         452.7       1.0X
-Parquet Vectorized (Pushdown)                 7751 / 8334          2.0         492.8       0.9X
-Native ORC Vectorized                         6225 / 6680          2.5         395.8       1.1X
-Native ORC Vectorized (Pushdown)               998 / 1020         15.8          63.5       7.1X
+Parquet Vectorized                            7876 / 7956          2.0         500.7       1.0X
+Parquet Vectorized (Pushdown)                  521 /  535         30.2          33.1      15.1X
+Native ORC Vectorized                         7051 / 7368          2.2         448.3       1.1X
+Native ORC Vectorized (Pushdown)              1014 / 1035         15.5          64.5       7.8X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 10, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7157 / 7399          2.2         455.1       1.0X
-Parquet Vectorized (Pushdown)                 7806 / 7911          2.0         496.3       0.9X
-Native ORC Vectorized                         6548 / 6720          2.4         416.3       1.1X
-Native ORC Vectorized (Pushdown)              1016 / 1050         15.5          64.6       7.0X
+Parquet Vectorized                            7897 / 8229          2.0         502.1       1.0X
+Parquet Vectorized (Pushdown)                  513 /  530         30.7          32.6      15.4X
+Native ORC Vectorized                         6730 / 6990          2.3         427.9       1.2X
+Native ORC Vectorized (Pushdown)              1003 / 1036         15.7          63.8       7.9X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 50, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7662 / 7805          2.1         487.1       1.0X
-Parquet Vectorized (Pushdown)                 7590 / 7861          2.1         482.5       1.0X
-Native ORC Vectorized                         6840 / 8073          2.3         434.9       1.1X
-Native ORC Vectorized (Pushdown)              1041 / 1075         15.1          66.2       7.4X
+Parquet Vectorized                            7967 / 8175          2.0         506.5       1.0X
+Parquet Vectorized (Pushdown)                 8155 / 8434          1.9         518.5       1.0X
+Native ORC Vectorized                         7002 / 7107          2.2         445.2       1.1X
+Native ORC Vectorized (Pushdown)              1092 / 1139         14.4          69.4       7.3X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 50, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            8230 / 9266          1.9         523.2       1.0X
-Parquet Vectorized (Pushdown)                 7735 / 7960          2.0         491.8       1.1X
-Native ORC Vectorized                         6945 / 7109          2.3         441.6       1.2X
-Native ORC Vectorized (Pushdown)              1123 / 1144         14.0          71.4       7.3X
+Parquet Vectorized                            8032 / 8122          2.0         510.7       1.0X
+Parquet Vectorized (Pushdown)                 8141 / 8908          1.9         517.6       1.0X
+Native ORC Vectorized                         7140 / 7387          2.2         454.0       1.1X
+Native ORC Vectorized (Pushdown)              1156 / 1220         13.6          73.5       6.9X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 50, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7656 / 8058          2.1         486.7       1.0X
-Parquet Vectorized (Pushdown)                 7860 / 8247          2.0         499.7       1.0X
-Native ORC Vectorized                         6684 / 7003          2.4         424.9       1.1X
-Native ORC Vectorized (Pushdown)              1085 / 1172         14.5          69.0       7.1X
+Parquet Vectorized                            8088 / 8350          1.9         514.2       1.0X
+Parquet Vectorized (Pushdown)                 8629 / 8702          1.8         548.6       0.9X
+Native ORC Vectorized                         7480 / 7886          2.1         475.6       1.1X
+Native ORC Vectorized (Pushdown)              1106 / 1145         14.2          70.3       7.3X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 100, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7594 / 8128          2.1         482.8       1.0X
-Parquet Vectorized (Pushdown)                 7845 / 7923          2.0         498.8       1.0X
-Native ORC Vectorized                         5859 / 6421          2.7         372.5       1.3X
-Native ORC Vectorized (Pushdown)              1037 / 1054         15.2          66.0       7.3X
+Parquet Vectorized                            8028 / 8165          2.0         510.4       1.0X
+Parquet Vectorized (Pushdown)                 8349 / 8674          1.9         530.8       1.0X
+Native ORC Vectorized                         7107 / 7354          2.2         451.8       1.1X
+Native ORC Vectorized (Pushdown)              1175 / 1207         13.4          74.7       6.8X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 100, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            6762 / 6775          2.3         429.9       1.0X
-Parquet Vectorized (Pushdown)                 6911 / 6970          2.3         439.4       1.0X
-Native ORC Vectorized                         5884 / 5960          2.7         374.1       1.1X
-Native ORC Vectorized (Pushdown)              1028 / 1052         15.3          65.4       6.6X
+Parquet Vectorized                            8041 / 8195          2.0         511.2       1.0X
+Parquet Vectorized (Pushdown)                 8466 / 8604          1.9         538.2       0.9X
+Native ORC Vectorized                         7116 / 7286          2.2         452.4       1.1X
+Native ORC Vectorized (Pushdown)              1197 / 1214         13.1          76.1       6.7X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 100, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            6718 / 6767          2.3         427.1       1.0X
-Parquet Vectorized (Pushdown)                 6812 / 6909          2.3         433.1       1.0X
-Native ORC Vectorized                         5842 / 5883          2.7         371.4       1.1X
-Native ORC Vectorized (Pushdown)              1040 / 1058         15.1          66.1       6.5X
+Parquet Vectorized                            7998 / 8311          2.0         508.5       1.0X
+Parquet Vectorized (Pushdown)                9366 / 11257          1.7         595.5       0.9X
+Native ORC Vectorized                         7856 / 9273          2.0         499.5       1.0X
+Native ORC Vectorized (Pushdown)              1350 / 1747         11.7          85.8       5.9X
 
 
 ================================================================================================
index b86b97e..efddf8d 100644 (file)
@@ -334,17 +334,15 @@ class ParquetFileFormat
     val enableVectorizedReader: Boolean =
       sqlConf.parquetVectorizedReaderEnabled &&
       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-    val enableRecordFilter: Boolean =
-      sparkSession.sessionState.conf.parquetRecordFilterEnabled
-    val timestampConversion: Boolean =
-      sparkSession.sessionState.conf.isParquetINT96TimestampConversion
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
-    val enableParquetFilterPushDown: Boolean =
-      sparkSession.sessionState.conf.parquetFilterPushDown
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
     val pushDownDate = sqlConf.parquetFilterPushDownDate
     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
 
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
@@ -368,12 +366,13 @@ class ParquetFileFormat
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
           .getFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(pushDownDate,
+          pushDownStringStartWith, pushDownInFilterThreshold)
         filters
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
           // is used here.
-          .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
-          .createFilter(parquetSchema, _))
+          .flatMap(parquetFilters.createFilter(parquetSchema, _))
           .reduceOption(FilterApi.and)
       } else {
         None
index 4c9b940..e590c15 100644 (file)
@@ -37,7 +37,10 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+    pushDownDate: Boolean,
+    pushDownStartWith: Boolean,
+    pushDownInFilterThreshold: Int) {
 
   private case class ParquetSchemaType(
       originalType: OriginalType,
@@ -232,6 +235,15 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
     // See SPARK-20364.
     def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")
 
+    // All DataTypes that support `makeEq` can provide better performance.
+    def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match {
+      case ParquetBooleanType | ParquetByteType | ParquetShortType | ParquetIntegerType
+           | ParquetLongType | ParquetFloatType | ParquetDoubleType | ParquetStringType
+           | ParquetBinaryType => true
+      case ParquetDateType if pushDownDate => true
+      case _ => false
+    }
+
     // NOTE:
     //
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -295,6 +307,12 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
       case sources.Not(pred) =>
         createFilter(schema, pred).map(FilterApi.not)
 
+      case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name)
+        && values.distinct.length <= pushDownInFilterThreshold =>
+        values.distinct.flatMap { v =>
+          makeEq.lift(nameToType(name)).map(_(name, v))
+        }.reduceLeftOption(FilterApi.or)
+
       case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) =>
         Option(prefix).map { v =>
           FilterApi.userDefined(binaryColumn(name),
index 067d2fe..00c191f 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.nio.charset.StandardCharsets
 import java.sql.Date
 
-import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
 
@@ -56,7 +56,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
 class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
   private lazy val parquetFilters =
-    new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownStringStartWith)
+    new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownStringStartWith,
+      conf.parquetFilterPushDownInFilterThreshold)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -803,6 +804,67 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     // Test inverseCanDrop() has taken effect
     testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not like '10%'")
   }
+
+  test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
+    val schema = StructType(Seq(
+      StructField("a", IntegerType, nullable = false)
+    ))
+
+    val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
+
+    assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(null)))
+    }
+
+    assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10)))
+    }
+
+    // Remove duplicates
+    assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 10)))
+    }
+
+    assertResult(Some(or(or(
+      FilterApi.eq(intColumn("a"), 10: Integer),
+      FilterApi.eq(intColumn("a"), 20: Integer)),
+      FilterApi.eq(intColumn("a"), 30: Integer)))
+    ) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 20, 30)))
+    }
+
+    assert(parquetFilters.createFilter(parquetSchema, sources.In("a",
+      Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined)
+    assert(parquetFilters.createFilter(parquetSchema, sources.In("a",
+      Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty)
+
+    import testImplicits._
+    withTempPath { path =>
+      val data = 0 to 1024
+      data.toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 to null
+        .coalesce(1).write.option("parquet.block.size", 512)
+        .parquet(path.getAbsolutePath)
+      val df = spark.read.parquet(path.getAbsolutePath)
+      Seq(true, false).foreach { pushEnabled =>
+        withSQLConf(
+          SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushEnabled.toString) {
+          Seq(1, 5, 10, 11).foreach { count =>
+            val filter = s"a in(${Range(0, count).mkString(",")})"
+            assert(df.where(filter).count() === count)
+            val actual = stripSparkFilter(df.where(filter)).collect().length
+            if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) {
+              assert(actual > 1 && actual < data.length)
+            } else {
+              assert(actual === data.length)
+            }
+          }
+          assert(df.where("a in(null)").count() === 0)
+          assert(df.where("a = null").count() === 0)
+          assert(df.where("a is null").count() === 1)
+        }
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {