From 7e47f436d67972d9fb99e283856e668f5dac51c9 Mon Sep 17 00:00:00 2001 From: Zee Chen Date: Thu, 12 Nov 2015 14:41:43 -0800 Subject: [PATCH] [SPARK-11390] [SQL] Query plan with/without filterPushdown indistinguishable Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply Add test case with parquet data source. Style fix. style fix Revert PhysicalRDD string formatting. --- .../apache/spark/sql/execution/ExistingRDD.scala | 6 ++++-- .../execution/datasources/DataSourceStrategy.scala | 6 ++++-- .../apache/spark/sql/execution/PlannerSuite.scala | 14 ++++++++++++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 8b41d3d3d892..62620ec642c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,9 @@ private[sql] object PhysicalRDD { def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], - relation: BaseRelation): PhysicalRDD = { - PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation]) + relation: BaseRelation, + extraInformation: String = ""): PhysicalRDD = { + PhysicalRDD(output, rdd, relation.toString + extraInformation, + relation.isInstanceOf[HadoopFsRelation]) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bbbfa7c77cb..544d5eccec03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) + val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ") + if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { @@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation) + relation.relation, pushedFiltersString) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation) + relation.relation, pushedFiltersString) execution.Project( projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index be53ec3e271c..dfec139985f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext { } } + test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") { + withTempPath { file => + val path = file.getCanonicalPath + testData.write.parquet(path) + val df = sqlContext.read.parquet(path) + sqlContext.registerDataFrameAsTable(df, "testPushed") + + withTempTable("testPushed") { + val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan + assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]")) + } + } + } + test("efficient limit -> project -> sort") { { val query =