Commit 7561a62
[SPARK-21351][SQL] Update nullability based on children's output
## What changes were proposed in this pull request?
This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary code. For example:
```
scala> val df = Seq((Some(1), Some(2))).toDF("a", "b")
scala> val bIsNotNull = df.where($"b" =!= 2).select($"b")
scala> val targetQuery = bIsNotNull.distinct
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = true
scala> targetQuery.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*HashAggregate(keys=[b#19], functions=[], output=[b#19])
+- Exchange hashpartitioning(b#19, 200)
+- *HashAggregate(keys=[b#19], functions=[], output=[b#19])
+- *Project [_2#16 AS b#19]
+- *Filter isnotnull(_2#16)
+- LocalTableScan [_1#15, _2#16]
Generated code:
...
/* 124 */ protected void processNext() throws java.io.IOException {
...
/* 132 */ // output the result
/* 133 */
/* 134 */ while (agg_mapIter.next()) {
/* 135 */ wholestagecodegen_numOutputRows.add(1);
/* 136 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 137 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 138 */
/* 139 */ boolean agg_isNull4 = agg_aggKey.isNullAt(0);
/* 140 */ int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0));
/* 141 */ agg_rowWriter1.zeroOutNullBytes();
/* 142 */
// We don't need this NULL check because NULL is filtered out in `$"b" =!=2`
/* 143 */ if (agg_isNull4) {
/* 144 */ agg_rowWriter1.setNullAt(0);
/* 145 */ } else {
/* 146 */ agg_rowWriter1.write(0, agg_value4);
/* 147 */ }
/* 148 */ append(agg_result1);
/* 149 */
/* 150 */ if (shouldStop()) return;
/* 151 */ }
/* 152 */
/* 153 */ agg_mapIter.close();
/* 154 */ if (agg_sorter == null) {
/* 155 */ agg_hashMap.free();
/* 156 */ }
/* 157 */ }
/* 158 */
/* 159 */ }
```
In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`.
This pr could remove this NULL check;
```
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = false
scala> targetQuery.debugCodegen
...
Generated code:
...
/* 144 */ protected void processNext() throws java.io.IOException {
...
/* 152 */ // output the result
/* 153 */
/* 154 */ while (agg_mapIter.next()) {
/* 155 */ wholestagecodegen_numOutputRows.add(1);
/* 156 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 157 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 158 */
/* 159 */ int agg_value4 = agg_aggKey.getInt(0);
/* 160 */ agg_rowWriter1.write(0, agg_value4);
/* 161 */ append(agg_result1);
/* 162 */
/* 163 */ if (shouldStop()) return;
/* 164 */ }
/* 165 */
/* 166 */ agg_mapIter.close();
/* 167 */ if (agg_sorter == null) {
/* 168 */ agg_hashMap.free();
/* 169 */ }
/* 170 */ }
```
## How was this patch tested?
Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes apache#18576 from maropu/SPARK-21351.1 parent 699cc97 commit 7561a62
4 files changed
Lines changed: 75 additions & 15 deletions
File tree
- sql
- catalyst/src
- main/scala/org/apache/spark/sql/catalyst/optimizer
- test/scala/org/apache/spark/sql/catalyst/optimizer
- core/src/test/scala/org/apache/spark/sql
Lines changed: 18 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
153 | 153 | | |
154 | 154 | | |
155 | 155 | | |
156 | | - | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
157 | 159 | | |
158 | 160 | | |
159 | 161 | | |
| |||
1309 | 1311 | | |
1310 | 1312 | | |
1311 | 1313 | | |
| 1314 | + | |
| 1315 | + | |
| 1316 | + | |
| 1317 | + | |
| 1318 | + | |
| 1319 | + | |
| 1320 | + | |
| 1321 | + | |
| 1322 | + | |
| 1323 | + | |
| 1324 | + | |
| 1325 | + | |
| 1326 | + | |
| 1327 | + | |
| 1328 | + | |
Lines changed: 57 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
Lines changed: 0 additions & 9 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
378 | 378 | | |
379 | 379 | | |
380 | 380 | | |
381 | | - | |
382 | | - | |
383 | | - | |
384 | | - | |
385 | | - | |
386 | | - | |
387 | | - | |
388 | | - | |
389 | | - | |
390 | 381 | | |
391 | 382 | | |
392 | 383 | | |
| |||
Lines changed: 0 additions & 5 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
2055 | 2055 | | |
2056 | 2056 | | |
2057 | 2057 | | |
2058 | | - | |
2059 | | - | |
2060 | | - | |
2061 | | - | |
2062 | | - | |
2063 | 2058 | | |
2064 | 2059 | | |
2065 | 2060 | | |
| |||
0 commit comments