Commit 56dec39
[SPARK-48666][SQL] Do not push down filter if it contains PythonUDFs
This PR proposes to prevent pushing down Python UDFs. This PR uses the same approach as #47033, therefore added the author as a co-author, but simplifies the change.
Extracting filters to push down happens first
https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L46
https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L211
https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L51
Before extracting Python UDFs
https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L80
Here is full stacktrace:
```
[INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval(Expression.scala:387)
at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval$(Expression.scala:386)
at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:72)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:563)
at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:403)
at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:53)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:189)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:188)
at scala.collection.immutable.List.filter(List.scala:516)
at scala.collection.immutable.List.filter(List.scala:79)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:188)
at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:604)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1358)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:168)
at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:72)
at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:50)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1251)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1250)
at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:552)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:50)
at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:35)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
at scala.collection.immutable.List.foldLeft(List.scala:79)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
at scala.collection.immutable.List.foreach(List.scala:334)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:167)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:159)
at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$19(PythonUDFSuite.scala:136)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:307)
at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:305)
at org.apache.spark.sql.execution.python.PythonUDFSuite.withTable(PythonUDFSuite.scala:25)
at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$18(PythonUDFSuite.scala:130)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:334)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:334)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
```
In order for end users to use Python UDFs against partitioned columns.
Yes, this fixes a bug - this PR allows to use Python UDF in partitioned columns.
Unittest added.
No.
Closes #47033
Closes #47313 from HyukjinKwon/SPARK-48666.
Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Wei Zheng <weiz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit d747853)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>1 parent 596f680 commit 56dec39
4 files changed
Lines changed: 32 additions & 7 deletions
File tree
- sql
- core/src
- main/scala/org/apache/spark/sql/execution/datasources
- v2
- test/scala/org/apache/spark/sql/execution/python
- hive/src/main/scala/org/apache/spark/sql/hive/execution
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
Lines changed: 6 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
63 | 63 | | |
64 | 64 | | |
65 | 65 | | |
66 | | - | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
67 | 72 | | |
68 | 73 | | |
69 | 74 | | |
| |||
Lines changed: 5 additions & 2 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
19 | 19 | | |
20 | 20 | | |
21 | 21 | | |
22 | | - | |
| 22 | + | |
23 | 23 | | |
24 | 24 | | |
25 | 25 | | |
| |||
73 | 73 | | |
74 | 74 | | |
75 | 75 | | |
76 | | - | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
77 | 80 | | |
78 | 81 | | |
79 | 82 | | |
| |||
Lines changed: 14 additions & 2 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
17 | 17 | | |
18 | 18 | | |
19 | 19 | | |
20 | | - | |
21 | | - | |
| 20 | + | |
| 21 | + | |
22 | 22 | | |
23 | 23 | | |
24 | 24 | | |
| |||
111 | 111 | | |
112 | 112 | | |
113 | 113 | | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
114 | 126 | | |
Lines changed: 7 additions & 2 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
22 | 22 | | |
23 | 23 | | |
24 | 24 | | |
25 | | - | |
| 25 | + | |
26 | 26 | | |
27 | 27 | | |
28 | 28 | | |
| |||
50 | 50 | | |
51 | 51 | | |
52 | 52 | | |
53 | | - | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
54 | 59 | | |
55 | 60 | | |
56 | 61 | | |
| |||
0 commit comments