Commit 29da8d6
[SPARK-34796][SQL][3.1] Initialize counter variable for LIMIT code-gen in doProduce()
This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed.
Here is an example:
```
test("failed limit query") {
withTable("left_table", "empty_right_table", "output_table") {
spark.range(5).toDF("k").write.saveAsTable("left_table")
spark.range(0).toDF("k").write.saveAsTable("empty_right_table")
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
spark.sql("CREATE TABLE output_table (k INT) USING parquet")
spark.sql(
s"""
|INSERT INTO TABLE output_table
|SELECT t1.k FROM left_table t1
|JOIN empty_right_table t2
|ON t1.k = t2.k
|LIMIT 3
|""".stripMargin)
}
}
}
```
Query plan:
```
Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable(
Database: default
Table: output_table
Created Time: Thu Mar 18 21:46:26 PDT 2021
Last Access: UNKNOWN
Created By: Spark 3.2.0-SNAPSHOT
Type: MANAGED
Provider: parquet
Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table
Schema: root
|-- k: integer (nullable = true)
), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k]
+- *(3) Project [ansi_cast(k#228L as int) AS k#231]
+- *(3) GlobalLimit 3
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=apache#179]
+- *(2) LocalLimit 3
+- *(2) Project [k#228L]
+- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false
:- *(2) Filter isnotnull(k#228L)
: +- *(2) ColumnarToRow
: +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=apache#173]
+- *(1) Filter isnotnull(k#229L)
+- *(1) ColumnarToRow
+- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
```
Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 .
The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable.
The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called.
Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled.
Fix query failure.
No.
Added unit test in `SQLQuerySuite.scala`.
Closes apache#31911 from c21/limit-fix-3.1.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>1 parent e640212 commit 29da8d6
2 files changed
Lines changed: 27 additions & 4 deletions
File tree
- sql/core/src
- main/scala/org/apache/spark/sql/execution
- test/scala/org/apache/spark/sql
Lines changed: 8 additions & 4 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
122 | 122 | | |
123 | 123 | | |
124 | 124 | | |
125 | | - | |
126 | | - | |
127 | | - | |
128 | | - | |
129 | 125 | | |
130 | 126 | | |
131 | 127 | | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
132 | 132 | | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
133 | 137 | | |
134 | 138 | | |
135 | 139 | | |
| |||
Lines changed: 19 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
3950 | 3950 | | |
3951 | 3951 | | |
3952 | 3952 | | |
| 3953 | + | |
| 3954 | + | |
| 3955 | + | |
| 3956 | + | |
| 3957 | + | |
| 3958 | + | |
| 3959 | + | |
| 3960 | + | |
| 3961 | + | |
| 3962 | + | |
| 3963 | + | |
| 3964 | + | |
| 3965 | + | |
| 3966 | + | |
| 3967 | + | |
| 3968 | + | |
| 3969 | + | |
| 3970 | + | |
| 3971 | + | |
3953 | 3972 | | |
3954 | 3973 | | |
3955 | 3974 | | |
0 commit comments