Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2599,19 +2599,19 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
*/
private def resolveSubQueries(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = {
plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION), ruleId) {
case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.resolved =>
case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.analyzed =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we ever set the analyzed flag to true for plans in SubqueryExpression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it runs execute instead of executeCheck. It will re-enter this every call.

Hmm, maybe we should change to executeCheck?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis will check subquery expressions recursively, so we shouldn't check it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mark ScalaUDF as unresolved if the encoder is not resolved yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a checkAnalysis after analysis of the subquery plan now.

Copy link
Member Author

@viirya viirya Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis will check subquery expressions recursively, so we shouldn't check it here.

I meant to check sub plan included in the subquery, not the subquery expression itself. It shouldn't be recursive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mark ScalaUDF as unresolved if the encoder is not resolved yet?

I also did it before, but I saw some side effect that causes the MergeInto query to fail. So I removed it before submitting this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis will check subquery expressions recursively, so we shouldn't check it here.

I meant to check sub plan included in the subquery, not the subquery expression itself. It shouldn't be recursive.

It isn't recursive, but InlineCTE.buildCTEMap has some issues on it.

resolveSubQuery(s, outer)(ScalarSubquery(_, _, exprId))
case e @ Exists(sub, _, exprId, _, _) if !sub.resolved =>
case e @ Exists(sub, _, exprId, _, _) if !sub.analyzed =>
resolveSubQuery(e, outer)(Exists(_, _, exprId))
case InSubquery(values, l @ ListQuery(_, _, exprId, _, _, _))
if values.forall(_.resolved) && !l.resolved =>
case InSubquery(values, l @ ListQuery(sub, _, exprId, _, _, _))
if values.forall(_.resolved) && !sub.analyzed =>
val expr = resolveSubQuery(l, outer)((plan, exprs) => {
ListQuery(plan, exprs, exprId, plan.output.length)
})
InSubquery(values, expr.asInstanceOf[ListQuery])
case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved =>
case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.analyzed =>
resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId))
case a: FunctionTableSubqueryArgumentExpression if !a.plan.resolved =>
case a: FunctionTableSubqueryArgumentExpression if !a.plan.analyzed =>
resolveSubQuery(a, outer)(
(plan, outerAttrs) => a.copy(plan = plan, outerAttrs = outerAttrs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, Expression, GetStructField, InSubquery, LambdaFunction, LateralSubquery, ListQuery, OuterReference, ScalarSubquery, UnresolvedNamedLambdaVariable}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, Exists, Expression, GetStructField, InSubquery, LambdaFunction, LateralSubquery, ListQuery, Literal, OuterReference, ScalarSubquery, ScalaUDF, UnresolvedNamedLambdaVariable}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.{ArrayType, IntegerType}
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType}

/**
* Unit tests for [[ResolveSubquery]].
Expand Down Expand Up @@ -299,4 +300,58 @@ class ResolveSubquerySuite extends AnalysisTest {
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.HIGHER_ORDER_FUNCTION",
expectedMessageParameters = Map.empty[String, String])
}

val testRelation = LocalRelation($"a".int, $"b".double)
val testRelation2 = LocalRelation($"c".int, $"d".string)

test("SPARK-48921: ScalaUDF in subquery should run through analyzer") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this.

val integer = testRelation2.output(0)
val udf = ScalaUDF((_: Int) => "x", StringType, integer :: Nil,
Option(ExpressionEncoder[Int]()) :: Nil)

var subPlan =
testRelation2
.where($"d" === udf)
.select(Literal(1)).analyze

// Manually remove the resolved encoder from the UDF.
// This simulates the case where the UDF is resolved during analysis
// but its encoders are not resolved.
subPlan = subPlan.transformUp {
case f: Filter =>
f.copy(condition = f.condition transform {
case s: ScalaUDF =>
val newUDF = s.copy(inputEncoders = Option(ExpressionEncoder[Int]()) :: Nil)
assert(newUDF.resolved)
newUDF
})
}

val existsSubquery =
testRelation
.where(Exists(subPlan))
.select($"a").analyze
assert(existsSubquery.resolved)

val existPlan = existsSubquery.collect {
case f: Filter =>
f.condition.collect {
case e: Exists =>
e.plan.collect {
case f: Filter => f
}
}
}.flatten.flatten.head

val udfs = existPlan.expressions.flatMap(e => e.collect {
case s: ScalaUDF =>
assert(s.inputEncoders.nonEmpty)
val encoder = s.inputEncoders.head
assert(encoder.isDefined)
assert(encoder.get.objDeserializer.resolved)

s
})
assert(udfs.size == 1)
}
}