Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) {
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name) && values.length < 20 =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threshold is 20. Too many values may be OOM, for example:

spark.range(10000000).coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/SPARK-17091")
val df = spark.read.parquet("/tmp/spark/parquet/SPARK-17091/")
df.where(s"id in(${Range(1, 10000).mkString(",")})").count
Exception in thread "SIGINT handler" 18/06/21 13:00:54 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at java.lang.StringBuilder.toString(StringBuilder.java:407)
        at org.apache.parquet.filter2.predicate.Operators$BinaryLogicalFilterPredicate.<init>(Operators.java:263)
        at org.apache.parquet.filter2.predicate.Operators$Or.<init>(Operators.java:316)
        at org.apache.parquet.filter2.predicate.FilterApi.or(FilterApi.java:261)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$createFilter$15.apply(ParquetFilters.scala:276)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$createFilter$15.apply(ParquetFilters.scala:276)
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about making this threshold configurable?

Copy link
Member

@gatorsmile gatorsmile Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it configurable. Use spark.sql.parquet.pushdown.inFilterThreshold. By default, it should be around 10. Please also check the perf.

cc @jiangxb1987

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the push-down performance is better when threshold is less than 300:
spark-17091-perf

The code:

    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
      import testImplicits._
      withTempPath { path =>
        val total = 10000000
        (0 to total).toDF().coalesce(1)
          .write.option("parquet.block.size", 512)
          .parquet(path.getAbsolutePath)
        val df = spark.read.parquet(path.getAbsolutePath)
        // scalastyle:off println
        var lastSize = -1
        var i = 16000
        while (i < total) {
          val filter = Range(0, total).filter(_ % i == 0)
          i += 100
          if (lastSize != filter.size) {
            if (lastSize == -1) println(s"start size: ${filter.size}")
            lastSize = filter.size
            sql("set spark.sql.parquet.pushdown.inFilterThreshold=1000000")
            val begin1 = System.currentTimeMillis()
            df.where(s"id in(${filter.mkString(",")})").count()
            val end1 = System.currentTimeMillis()
            val time1 = end1 - begin1

            sql("set spark.sql.parquet.pushdown.inFilterThreshold=10")
            val begin2 = System.currentTimeMillis()
            df.where(s"id in(${filter.mkString(",")})").count()
            val end2 = System.currentTimeMillis()
            val time2 = end2 - begin2
            if (time1 <= time2) println(s"Max threshold: $lastSize")
          }
        }
      }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this benchmark, this shall be useful, while I still have some questions:

  1. For small table (with columns <= 1000000), is the performance of InFilters still better than InSet?
  2. Can you also forge different filters? Currently your filters are distributed evenly, which don't always happen on real workload. We shall at least benchmark with different filter ratio (#rows filtered / #total rows)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mainly depends on how many row groups can skip. for small table (assuming only one row group). There is no obvious difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have prepared a test case that you can verify it:

  test("Benchmark") {
    def benchmark(func: () => Unit): Long = {
      val start = System.currentTimeMillis()
      func()
      val end = System.currentTimeMillis()
      end - start
    }
    // scalastyle:off
    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
      withTempPath { path =>
        Seq(1000000, 10000000).foreach { count =>
          Seq(1048576, 10485760, 104857600).foreach { blockSize =>
            spark.range(count).toDF().selectExpr("id", "cast(id as string) as d1",
              "cast(id as double) as d2", "cast(id as float) as d3", "cast(id as int) as d4",
              "cast(id as decimal(38)) as d5")
              .coalesce(1).write.mode("overwrite")
              .option("parquet.block.size", blockSize).parquet(path.getAbsolutePath)
            val df = spark.read.parquet(path.getAbsolutePath)
            println(s"path: ${path.getAbsolutePath}")
            Seq(1000, 100, 10, 1).foreach { ratio =>
              println(s"##########[ count: $count, blockSize: $blockSize, ratio: $ratio ]#########")
              var i = 1
              while (i < 300) {
                val filter = Range(0, i).map(r => scala.util.Random.nextInt(count / ratio))
                i += 4

                sql("set spark.sql.parquet.pushdown.inFilterThreshold=1")
                val vanillaTime = benchmark(() => df.where(s"id in(${filter.mkString(",")})").count())
                sql("set spark.sql.parquet.pushdown.inFilterThreshold=1000")
                val pushDownTime = benchmark(() => df.where(s"id in(${filter.mkString(",")})").count())

                if (pushDownTime > vanillaTime) {
                  println(s"vanilla is better, threshold: ${filter.size}, $pushDownTime, $vanillaTime")
                } else {
                  println(s"push down is better, threshold: ${filter.size}, $pushDownTime, $vanillaTime")
                }
              }
            }
          }
        }
      }
    }
  }

values.flatMap { v =>
makeEq.lift(nameToType(name)).map(_(name, v))
}.reduceLeftOption(FilterApi.or)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about null handling? Do we get the same result as before? Anyway, can we add a test for it?


case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}

Expand Down Expand Up @@ -660,6 +660,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.where("col > 0").count() === 2)
}
}

test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))

assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(10)))
}

assertResult(Some(or(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this test because it looks basically a duplicate of the below.

FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 20: Integer)))
) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 20)))
}

assertResult(Some(or(or(
FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 20: Integer)),
FilterApi.eq(intColumn("a"), 30: Integer)))
) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30)))
}

assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 20).toArray)).isDefined)
assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 21).toArray)).isEmpty)
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
Expand Down