Skip to content

Commit 137eebc

Browse files
c21chenzhx
authored andcommitted
[SPARK-34960][SQL] Aggregate push down for ORC
### What changes were proposed in this pull request? This PR is to add aggregate push down feature for ORC data source v2 reader. At a high level, the PR does: * The supported aggregate expression is MIN/MAX/COUNT same as [Parquet aggregate push down](apache#33639). * BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DateType are allowed in MIN/MAXX aggregate push down. All other columns types are not allowed in MIN/MAX aggregate push down. * All columns types are supported in COUNT aggregate push down. * Nested column's sub-fields are disallowed in aggregate push down. * If the file does not have valid statistics, Spark will throw exception and fail query. * If aggregate has filter or group-by column, aggregate will not be pushed down. At code level, the PR does: * `OrcScanBuilder`: `pushAggregation()` checks whether the aggregation can be pushed down. The most checking logic is shared between Parquet and ORC, extracted into `AggregatePushDownUtils.getSchemaForPushedAggregation()`. `OrcScanBuilder` will create a `OrcScan` with aggregation and aggregation data schema. * `OrcScan`: `createReaderFactory` creates a ORC reader factory with aggregation and schema. Similar change with `ParquetScan`. * `OrcPartitionReaderFactory`: `buildReaderWithAggregates` creates a ORC reader with aggregate push down (i.e. read ORC file footer to process columns statistics, instead of reading actual data in the file). `buildColumnarReaderWithAggregates` creates a columnar ORC reader similarly. Both delegate the real work to read footer in `OrcUtils.createAggInternalRowFromFooter`. * `OrcUtils.createAggInternalRowFromFooter`: reads ORC file footer to process columns statistics (real heavy lift happens here). Similar to `ParquetUtils.createAggInternalRowFromFooter`. Leverage utility method such as `OrcFooterReader.readStatistics`. * `OrcFooterReader`: `readStatistics` reads the ORC `ColumnStatistics[]` into Spark `OrcColumnStatistics`. The transformation is needed here, because ORC `ColumnStatistics[]` stores all columns statistics in a flatten array style, and hard to process. Spark `OrcColumnStatistics` stores the statistics in nested tree structure (e.g. like `StructType`). This is used by `OrcUtils.createAggInternalRowFromFooter` * `OrcColumnStatistics`: the easy-to-manipulate structure for ORC `ColumnStatistics`. This is used by `OrcFooterReader.readStatistics`. ### Why are the changes needed? To improve the performance of query with aggregate. ### Does this PR introduce _any_ user-facing change? Yes. A user-facing config `spark.sql.orc.aggregatePushdown` is added to control enabling/disabling the aggregate push down for ORC. By default the feature is disabled. ### How was this patch tested? Added unit test in `FileSourceAggregatePushDownSuite.scala`. Refactored all unit tests in apache#33639, and it now works for both Parquet and ORC. Closes apache#34298 from c21/orc-agg. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent a429aaf commit 137eebc

16 files changed

Lines changed: 870 additions & 322 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,14 @@ object SQLConf {
950950
.booleanConf
951951
.createWithDefault(true)
952952

953+
val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
954+
.doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
955+
"COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
956+
"type. For COUNT, support all data types.")
957+
.version("3.3.0")
958+
.booleanConf
959+
.createWithDefault(false)
960+
953961
val ORC_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.orc.mergeSchema")
954962
.doc("When true, the Orc data source merges schemas collected from all data files, " +
955963
"otherwise the schema is picked from a random data file.")
@@ -3691,6 +3699,8 @@ class SQLConf extends Serializable with Logging {
36913699

36923700
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
36933701

3702+
def orcAggregatePushDown: Boolean = getConf(ORC_AGGREGATE_PUSHDOWN_ENABLED)
3703+
36943704
def isOrcSchemaMergingEnabled: Boolean = getConf(ORC_SCHEMA_MERGING_ENABLED)
36953705

36963706
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
115115
def names: Array[String] = fieldNames
116116

117117
private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
118-
private[sql] lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
118+
private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
119119
private lazy val nameToIndex: Map[String, Int] = fieldNames.zipWithIndex.toMap
120120

121121
override def equals(that: Any): Boolean = {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc;
19+
20+
import org.apache.orc.ColumnStatistics;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
/**
26+
* Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
27+
*
28+
* Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
29+
* this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
30+
* according to data types. The flatten array stores all data types (including nested types) in
31+
* tree pre-ordering. This is used for aggregate push down in ORC.
32+
*
33+
* For nested data types (array, map and struct), the sub-field statistics are stored recursively
34+
* inside parent column's children field. Here is an example of {@link OrcColumnStatistics}:
35+
*
36+
* Data schema:
37+
* c1: int
38+
* c2: struct<f1: int, f2: float>
39+
* c3: map<key: int, value: string>
40+
* c4: array<int>
41+
*
42+
* OrcColumnStatistics
43+
* | (children)
44+
* ---------------------------------------------
45+
* / | \ \
46+
* c1 c2 c3 c4
47+
* (integer) (struct) (map) (array)
48+
* (min:1, | (children) | (children) | (children)
49+
* max:10) ----- ----- element
50+
* / \ / \ (integer)
51+
* c2.f1 c2.f2 key value
52+
* (integer) (float) (integer) (string)
53+
* (min:0.1, (min:"a",
54+
* max:100.5) max:"zzz")
55+
*/
56+
public class OrcColumnStatistics {
57+
private final ColumnStatistics statistics;
58+
private final List<OrcColumnStatistics> children;
59+
60+
public OrcColumnStatistics(ColumnStatistics statistics) {
61+
this.statistics = statistics;
62+
this.children = new ArrayList<>();
63+
}
64+
65+
public ColumnStatistics getStatistics() {
66+
return statistics;
67+
}
68+
69+
public OrcColumnStatistics get(int ordinal) {
70+
if (ordinal < 0 || ordinal >= children.size()) {
71+
throw new IndexOutOfBoundsException(
72+
String.format("Ordinal %d out of bounds of statistics size %d", ordinal, children.size()));
73+
}
74+
return children.get(ordinal);
75+
}
76+
77+
public void add(OrcColumnStatistics newChild) {
78+
children.add(newChild);
79+
}
80+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc;
19+
20+
import org.apache.orc.ColumnStatistics;
21+
import org.apache.orc.Reader;
22+
import org.apache.orc.TypeDescription;
23+
import org.apache.spark.sql.types.*;
24+
25+
import java.util.Arrays;
26+
import java.util.LinkedList;
27+
import java.util.Queue;
28+
29+
/**
30+
* {@link OrcFooterReader} is a util class which encapsulates the helper
31+
* methods of reading ORC file footer.
32+
*/
33+
public class OrcFooterReader {
34+
35+
/**
36+
* Read the columns statistics from ORC file footer.
37+
*
38+
* @param orcReader the reader to read ORC file footer.
39+
* @return Statistics for all columns in the file.
40+
*/
41+
public static OrcColumnStatistics readStatistics(Reader orcReader) {
42+
TypeDescription orcSchema = orcReader.getSchema();
43+
ColumnStatistics[] orcStatistics = orcReader.getStatistics();
44+
StructType sparkSchema = OrcUtils.toCatalystSchema(orcSchema);
45+
return convertStatistics(sparkSchema, new LinkedList<>(Arrays.asList(orcStatistics)));
46+
}
47+
48+
/**
49+
* Convert a queue of ORC {@link ColumnStatistics}s into Spark {@link OrcColumnStatistics}.
50+
* The queue of ORC {@link ColumnStatistics}s are assumed to be ordered as tree pre-order.
51+
*/
52+
private static OrcColumnStatistics convertStatistics(
53+
DataType sparkSchema, Queue<ColumnStatistics> orcStatistics) {
54+
OrcColumnStatistics statistics = new OrcColumnStatistics(orcStatistics.remove());
55+
if (sparkSchema instanceof StructType) {
56+
for (StructField field : ((StructType) sparkSchema).fields()) {
57+
statistics.add(convertStatistics(field.dataType(), orcStatistics));
58+
}
59+
} else if (sparkSchema instanceof MapType) {
60+
statistics.add(convertStatistics(((MapType) sparkSchema).keyType(), orcStatistics));
61+
statistics.add(convertStatistics(((MapType) sparkSchema).valueType(), orcStatistics));
62+
} else if (sparkSchema instanceof ArrayType) {
63+
statistics.add(convertStatistics(((ArrayType) sparkSchema).elementType(), orcStatistics));
64+
}
65+
return statistics;
66+
}
67+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.expressions.Expression
22+
import org.apache.spark.sql.connector.expressions.NamedReference
23+
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
24+
import org.apache.spark.sql.execution.RowToColumnConverter
25+
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
26+
import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}
27+
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
28+
29+
/**
30+
* Utility class for aggregate push down to Parquet and ORC.
31+
*/
32+
object AggregatePushDownUtils {
33+
34+
/**
35+
* Get the data schema for aggregate to be pushed down.
36+
*/
37+
def getSchemaForPushedAggregation(
38+
aggregation: Aggregation,
39+
schema: StructType,
40+
partitionNames: Set[String],
41+
dataFilters: Seq[Expression]): Option[StructType] = {
42+
43+
var finalSchema = new StructType()
44+
45+
def getStructFieldForCol(col: NamedReference): StructField = {
46+
schema.apply(col.fieldNames.head)
47+
}
48+
49+
def isPartitionCol(col: NamedReference) = {
50+
partitionNames.contains(col.fieldNames.head)
51+
}
52+
53+
def processMinOrMax(agg: AggregateFunc): Boolean = {
54+
val (column, aggType) = agg match {
55+
case max: Max => (max.column, "max")
56+
case min: Min => (min.column, "min")
57+
case _ =>
58+
throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
59+
}
60+
61+
if (isPartitionCol(column)) {
62+
// don't push down partition column, footer doesn't have max/min for partition column
63+
return false
64+
}
65+
val structField = getStructFieldForCol(column)
66+
67+
structField.dataType match {
68+
// not push down complex type
69+
// not push down Timestamp because INT96 sort order is undefined,
70+
// Parquet doesn't return statistics for INT96
71+
// not push down Parquet Binary because min/max could be truncated
72+
// (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
73+
// could be Spark StringType, BinaryType or DecimalType.
74+
// not push down for ORC with same reason.
75+
case BooleanType | ByteType | ShortType | IntegerType
76+
| LongType | FloatType | DoubleType | DateType =>
77+
finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
78+
true
79+
case _ =>
80+
false
81+
}
82+
}
83+
84+
if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
85+
// Parquet/ORC footer has max/min/count for columns
86+
// e.g. SELECT COUNT(col1) FROM t
87+
// but footer doesn't have max/min/count for a column if max/min/count
88+
// are combined with filter or group by
89+
// e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
90+
// SELECT COUNT(col1) FROM t GROUP BY col2
91+
// However, if the filter is on partition column, max/min/count can still be pushed down
92+
// Todo: add support if groupby column is partition col
93+
// (https://issues.apache.org/jira/browse/SPARK-36646)
94+
return None
95+
}
96+
97+
aggregation.aggregateExpressions.foreach {
98+
case max: Max =>
99+
if (!processMinOrMax(max)) return None
100+
case min: Min =>
101+
if (!processMinOrMax(min)) return None
102+
case count: Count =>
103+
if (count.column.fieldNames.length != 1 || count.isDistinct) return None
104+
finalSchema =
105+
finalSchema.add(StructField(s"count(" + count.column.fieldNames.head + ")", LongType))
106+
case _: CountStar =>
107+
finalSchema = finalSchema.add(StructField("count(*)", LongType))
108+
case _ =>
109+
return None
110+
}
111+
112+
Some(finalSchema)
113+
}
114+
115+
/**
116+
* Check if two Aggregation `a` and `b` is equal or not.
117+
*/
118+
def equivalentAggregations(a: Aggregation, b: Aggregation): Boolean = {
119+
a.aggregateExpressions.sortBy(_.hashCode())
120+
.sameElements(b.aggregateExpressions.sortBy(_.hashCode())) &&
121+
a.groupByColumns.sortBy(_.hashCode()).sameElements(b.groupByColumns.sortBy(_.hashCode()))
122+
}
123+
124+
/**
125+
* Convert the aggregates result from `InternalRow` to `ColumnarBatch`.
126+
* This is used for columnar reader.
127+
*/
128+
def convertAggregatesRowToBatch(
129+
aggregatesAsRow: InternalRow,
130+
aggregatesSchema: StructType,
131+
offHeap: Boolean): ColumnarBatch = {
132+
val converter = new RowToColumnConverter(aggregatesSchema)
133+
val columnVectors = if (offHeap) {
134+
OffHeapColumnVector.allocateColumns(1, aggregatesSchema)
135+
} else {
136+
OnHeapColumnVector.allocateColumns(1, aggregatesSchema)
137+
}
138+
converter.convert(aggregatesAsRow, columnVectors.toArray)
139+
new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
140+
}
141+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,22 @@ class OrcDeserializer(
6868
resultRow
6969
}
7070

71+
def deserializeFromValues(orcValues: Seq[WritableComparable[_]]): InternalRow = {
72+
var targetColumnIndex = 0
73+
while (targetColumnIndex < fieldWriters.length) {
74+
if (fieldWriters(targetColumnIndex) != null) {
75+
val value = orcValues(requestedColIds(targetColumnIndex))
76+
if (value == null) {
77+
resultRow.setNullAt(targetColumnIndex)
78+
} else {
79+
fieldWriters(targetColumnIndex)(value)
80+
}
81+
}
82+
targetColumnIndex += 1
83+
}
84+
resultRow
85+
}
86+
7187
/**
7288
* Creates a writer to write ORC values to Catalyst data structure at the given ordinal.
7389
*/

0 commit comments

Comments
 (0)