Skip to content

Commit 5c8d46b

Browse files
lianchengpdeyhim
authored andcommitted
[SPARK-1913][SQL] Bug fix: column pruning error in Parquet support
JIRA issue: [SPARK-1913](https://issues.apache.org/jira/browse/SPARK-1913) When scanning Parquet tables, attributes referenced only in predicates that are pushed down are not passed to the `ParquetTableScan` operator and causes exception. Author: Cheng Lian <[email protected]> Closes apache#863 from liancheng/spark-1913 and squashes the following commits: f976b73 [Cheng Lian] Addessed the readability issue commented by @rxin f5b257d [Cheng Lian] Added back comments deleted by mistake ae60ab3 [Cheng Lian] [SPARK-1913] Attributes referenced only in predicates pushed down should remain in ParquetTableScan operator
1 parent bf9b3dc commit 5c8d46b

File tree

4 files changed

+22
-11
lines changed

4 files changed

+22
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
206206
* final desired output requires complex expressions to be evaluated or when columns can be
207207
* further eliminated out after filtering has been done.
208208
*
209+
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
210+
* away by the filter pushdown optimization.
211+
*
209212
* The required attributes for both filtering and expression evaluation are passed to the
210213
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
211214
*/
212215
def pruneFilterProject(
213216
projectList: Seq[NamedExpression],
214217
filterPredicates: Seq[Expression],
218+
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
215219
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
216220

217221
val projectSet = projectList.flatMap(_.references).toSet
218222
val filterSet = filterPredicates.flatMap(_.references).toSet
219-
val filterCondition = filterPredicates.reduceLeftOption(And)
223+
val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)
220224

221225
// Right now we still use a projection even if the only evaluation is applying an alias
222226
// to a column. Since this is a no-op, it could be avoided. However, using this

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
141141
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
142142
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
143143
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
144-
val remainingFilters =
144+
val prunePushedDownFilters =
145145
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
146-
filters.filter {
147-
// Note: filters cannot be pushed down to Parquet if they contain more complex
148-
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
149-
// all filters that have been pushed down. Note that a predicate such as
150-
// "(A AND B) OR C" can result in "A OR C" being pushed down.
151-
filter =>
146+
(filters: Seq[Expression]) => {
147+
filters.filter { filter =>
148+
// Note: filters cannot be pushed down to Parquet if they contain more complex
149+
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
150+
// all filters that have been pushed down. Note that a predicate such as
151+
// "(A AND B) OR C" can result in "A OR C" being pushed down.
152152
val recordFilter = ParquetFilters.createFilter(filter)
153153
if (!recordFilter.isDefined) {
154154
// First case: the pushdown did not result in any record filter.
@@ -159,13 +159,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
159159
// still want to keep "A AND B" in the higher-level filter, not just "B".
160160
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
161161
}
162+
}
162163
}
163164
} else {
164-
filters
165+
identity[Seq[Expression]] _
165166
}
166167
pruneFilterProject(
167168
projectList,
168-
remainingFilters,
169+
filters,
170+
prunePushedDownFilters,
169171
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
170172
}
171173

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,5 +358,9 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
358358
assert(stringResult(0).getString(2) == "100", "stringvalue incorrect")
359359
assert(stringResult(0).getInt(1) === 100)
360360
}
361-
}
362361

362+
test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
363+
val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
364+
assert(query.collect().size === 10)
365+
}
366+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ private[hive] trait HiveStrategies {
6969
pruneFilterProject(
7070
projectList,
7171
otherPredicates,
72+
identity[Seq[Expression]],
7273
HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
7374
case _ =>
7475
Nil

0 commit comments

Comments
 (0)