@@ -24,12 +24,16 @@ import scala.util.Random
2424import org .apache .spark .SparkConf
2525import org .apache .spark .benchmark .{Benchmark , BenchmarkBase }
2626import org .apache .spark .internal .config .UI ._
27- import org .apache .spark .sql .{DataFrame , SparkSession }
27+ import org .apache .spark .sql .{Column , DataFrame , SparkSession }
28+ import org .apache .spark .sql .catalyst .dsl .expressions ._
29+ import org .apache .spark .sql .catalyst .expressions .{AttributeReference , EqualTo , Expression , Literal , Or }
2830import org .apache .spark .sql .catalyst .plans .SQLHelper
29- import org .apache .spark .sql .functions .monotonically_increasing_id
31+ import org .apache .spark .sql .execution .datasources .DataSourceStrategy
32+ import org .apache .spark .sql .execution .datasources .orc .OrcFilters
33+ import org .apache .spark .sql .functions ._
3034import org .apache .spark .sql .internal .SQLConf
3135import org .apache .spark .sql .internal .SQLConf .ParquetOutputTimestampType
32- import org .apache .spark .sql .types .{ ByteType , Decimal , DecimalType , TimestampType }
36+ import org .apache .spark .sql .types ._
3337
3438/**
3539 * Benchmark to measure read performance with Filter pushdown.
@@ -135,6 +139,34 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper {
135139 benchmark.run()
136140 }
137141
142+ def filterPushDownBenchmarkWithColumn (
143+ values : Int ,
144+ title : String ,
145+ whereColumn : Column ,
146+ selectExpr : String = " *"
147+ ): Unit = {
148+ val benchmark = new Benchmark (title, values, minNumIters = 5 , output = output)
149+ benchmark.addCase(" Native ORC Vectorized (Pushdown)" ) { _ =>
150+ withSQLConf(SQLConf .ORC_FILTER_PUSHDOWN_ENABLED .key -> " true" ) {
151+ spark
152+ .table(" orcTable" )
153+ .select(selectExpr)
154+ .filter(whereColumn)
155+ .collect()
156+ }
157+ }
158+ benchmark.addCase(" Native Parquet Vectorized (Pushdown)" ) { _ =>
159+ withSQLConf(SQLConf .PARQUET_FILTER_PUSHDOWN_ENABLED .key -> " true" ) {
160+ spark
161+ .table(" parquetTable" )
162+ .select(selectExpr)
163+ .filter(whereColumn)
164+ .collect()
165+ }
166+ }
167+ benchmark.run()
168+ }
169+
138170 private def runIntBenchmark (numRows : Int , width : Int , mid : Int ): Unit = {
139171 Seq (" value IS NULL" , s " $mid < value AND value < $mid" ).foreach { whereExpr =>
140172 val title = s " Select 0 int row ( $whereExpr) " .replace(" value AND value" , " value" )
@@ -174,10 +206,10 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper {
174206 private def runStringBenchmark (
175207 numRows : Int , width : Int , searchValue : Int , colType : String ): Unit = {
176208 Seq (" value IS NULL" , s " ' $searchValue' < value AND value < ' $searchValue' " )
177- .foreach { whereExpr =>
178- val title = s " Select 0 $colType row ( $whereExpr) " .replace(" value AND value" , " value" )
179- filterPushDownBenchmark(numRows, title, whereExpr)
180- }
209+ .foreach { whereExpr =>
210+ val title = s " Select 0 $colType row ( $whereExpr) " .replace(" value AND value" , " value" )
211+ filterPushDownBenchmark(numRows, title, whereExpr)
212+ }
181213
182214 Seq (
183215 s " value = ' $searchValue' " ,
@@ -378,5 +410,56 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper {
378410 }
379411 }
380412 }
413+
414+ runBenchmark(s " Predicate conversion benchmark with unbalanced Column " ) {
415+ // This benchmark tests a very isolated part of the predicate pushdown process - specifically,
416+ // the individual action of converting a Spark `Expression` to an ORC `SearchArgument`.
417+ // This results in more granular numbers that can help highlight small performance
418+ // differences in this part of the code that would be hidden by slower components that
419+ // get run when a full Spark job is executed.
420+ // The benchmark below runs a more complete, end-to-end test which covers the whole pipeline
421+ // and can uncover high-level performance problems, but is bad at discriminating details.
422+ val numRows = 1
423+ val width = 2000
424+
425+ val columns = (1 to width).map(i => s " id c $i" )
426+ val df = spark.range(1 ).selectExpr(columns : _* )
427+ Seq (25 , 5000 , 15000 ).foreach { numFilter =>
428+ val whereColumn = (1 to numFilter)
429+ .map(i => col(" c1" ) === lit(i))
430+ .foldLeft(lit(false ))(_ || _)
431+ val benchmark = new Benchmark (
432+ s " Convert a filter with $numFilter columns to ORC filter " ,
433+ numRows, minNumIters = 5 , output = output)
434+ val name = s " Native ORC Vectorized (Pushdown) "
435+ benchmark.addCase(name) { _ =>
436+ OrcFilters .createFilter(df.schema,
437+ DataSourceStrategy .translateFilter(whereColumn.expr).toSeq)
438+ }
439+ benchmark.run()
440+ }
441+ }
442+
443+ runBenchmark(s " Pushdown benchmark with unbalanced Column " ) {
444+ val numRows = 1
445+ val width = 200
446+
447+ withTempPath { dir =>
448+ val columns = (1 to width).map(i => s " id c $i" )
449+ val df = spark.range(1 ).selectExpr(columns : _* )
450+ withTempTable(" orcTable" , " parquetTable" ) {
451+ saveAsTable(df, dir)
452+ Seq (25 , 500 , 1000 ).foreach { numFilter =>
453+ val whereColumn = (1 to numFilter)
454+ .map(i => col(" c1" ) === lit(i))
455+ .foldLeft(lit(false ))(_ || _)
456+ filterPushDownBenchmarkWithColumn(
457+ numRows,
458+ s " Select 1 row with $numFilter filters " ,
459+ whereColumn)
460+ }
461+ }
462+ }
463+ }
381464 }
382465}
0 commit comments