Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 17, 2024

What changes were proposed in this pull request?

We got a customer issue that a MergeInto query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4.

The error looks like

Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString.

The source table of MergeInto uses ScalaUDF. The error happens when Spark invokes the deserializer of input encoder of the ScalaUDF and the deserializer is not resolved yet.

The encoders of ScalaUDF are resolved by the rule ResolveEncodersInUDF which will be applied at the end of analysis phase.

During rewriting MergeInto to ReplaceData query, Spark creates an Exists subquery and ScalaUDF is part of the plan of the subquery. Note that the ScalaUDF is already resolved by the analyzer.

Then, in ResolveSubquery rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing ScalaUDF is resolved, the rule skips it so ResolveEncodersInUDF won't be applied on it. So the analyzed ReplaceData query contains a ScalaUDF with encoders unresolved that cause the error.

This patch modifies ResolveSubquery so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed.

This patch moves ResolveEncodersInUDF rule before rewriting MergeInto to make sure the ScalaUDF in the subquery plan is fully analyzed.

Why are the changes needed?

Fixing production query error.

Does this PR introduce any user-facing change?

Yes, fixing user-facing issue.

How was this patch tested?

Manually test with MergeInto query and add an unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Jul 17, 2024
val testRelation = LocalRelation($"a".int, $"b".double)
val testRelation2 = LocalRelation($"c".int, $"d".string)

test("SPARK-48921: ScalaUDF in subquery should run through analyzer") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 17, 2024

Do you happen to know which JIRA issue is related to this regression, @viirya ?

after upgrading to Spark 3.4.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs).

cc @cloud-fan, @yaooqinn , too

@viirya
Copy link
Member Author

viirya commented Jul 17, 2024

Do you happen to know which JIRA issue is related to this regression, @viirya ?

after upgrading to Spark 3.4.

Thank you for review, @dongjoon-hyun.

It is not caused by a JIRA so I think that it is not a regression.

The Iceberg MergeInto query error is happened on the row-level group filter query. The feature is added in Spark 3.4.
So in the previous Spark version the customer uses, it doesn't trigger the issue.

@viirya
Copy link
Member Author

viirya commented Jul 17, 2024

I re-triggered the failed Run Docker integration tests.

All CIs are passed now: https://github.com/viirya/spark-1/actions/runs/9967182407/job/27542878853

@dongjoon-hyun
Copy link
Member

Got it. Feel free to merge and backport, @viirya ~

@viirya
Copy link
Member Author

viirya commented Jul 17, 2024

Thank you @dongjoon-hyun. I will keep it for a day and merge if no more comments.

private def resolveSubQueries(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = {
plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION), ruleId) {
case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.resolved =>
case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.analyzed =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we ever set the analyzed flag to true for plans in SubqueryExpression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it runs execute instead of executeCheck. It will re-enter this every call.

Hmm, maybe we should change to executeCheck?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis will check subquery expressions recursively, so we shouldn't check it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mark ScalaUDF as unresolved if the encoder is not resolved yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a checkAnalysis after analysis of the subquery plan now.

Copy link
Member Author

@viirya viirya Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis will check subquery expressions recursively, so we shouldn't check it here.

I meant to check sub plan included in the subquery, not the subquery expression itself. It shouldn't be recursive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mark ScalaUDF as unresolved if the encoder is not resolved yet?

I also did it before, but I saw some side effect that causes the MergeInto query to fail. So I removed it before submitting this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis will check subquery expressions recursively, so we shouldn't check it here.

I meant to check sub plan included in the subquery, not the subquery expression itself. It shouldn't be recursive.

It isn't recursive, but InlineCTE.buildCTEMap has some issues on it.

@cloud-fan
Copy link
Contributor

is this a rule order issue? Shall we run ResolveEncodersInUDF before rewriting MergeInto?

@viirya
Copy link
Member Author

viirya commented Jul 17, 2024

is this a rule order issue? Shall we run ResolveEncodersInUDF before rewriting MergeInto?

It is also working. Actually that is my first fix.

executeSameContext(e.plan)
}

checkAnalysis(newSubqueryPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is risky as we may fail earlier than before, while before we can still resolve the subquery expression after more iterations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, makes sense. Maybe restoring to the fix of moving ResolveEncodersInUDF?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adjusting the rule order is probably the safest solution for now. The current way of resolving subquery expressions is quite fragile. Ideally we should recursively invoke the full analyzer only once (and must invoke once) for each subquery expression, instead of doing it again and again with the if resolved check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I will make the change (and maybe adjust the test).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, for the MergInto issue we encountered, because the ScalaUDF is put in a subquery plan by rewriting MergeInto rule. So moving ResolveEncodersInUDF before rewriting can fix the issue.

But it doesn't fix the general issue if ScalaUDF is in other subquery plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to fix the general issue instead of just fixing the corner issue we encountered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix the issue we encounter first. We can consider the general issue later.

@viirya viirya changed the title [SPARK-48921][SQL] ScalaUDF in subquery should run through analyzer [SPARK-48921][SQL] ScalaUDF encoders in subquery should be resolved for MergeInto Jul 18, 2024
@viirya
Copy link
Member Author

viirya commented Jul 18, 2024

@cloud-fan I changed the rule order of ResolveEncodersInUDF. The unit test is updated too.

HandleNullInputsForUDF,
UpdateAttributeNullability),
Batch("UDF", Once,
ResolveEncodersInUDF),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, shall we move the MergeInto rewrite rule after ResolveEncodersInUDF instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we need to run ResolveEncodersInUDF after the ScalaUDF Null Handling batch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

@viirya viirya force-pushed the fix_subquery_resolve branch from 4b62a8f to 544828d Compare July 18, 2024 02:08
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that new code part fails to compile.

@viirya
Copy link
Member Author

viirya commented Jul 18, 2024

Thanks for review @dongjoon-hyun @huaxingao @yaooqinn @cloud-fan

I got some error when running merge_spark_pr.py. Could you help merge and back port this PR? If there are any conflicts, I will open back port PRs.

Thank you.

@dongjoon-hyun
Copy link
Member

Merged to master.

Could you make backporting PRs to branch-3.5 and branch-3.4 because there are conflicts, @viirya ?

@viirya
Copy link
Member Author

viirya commented Jul 18, 2024

Thank you @dongjoon-hyun . I will create backporting PRs.

@viirya
Copy link
Member Author

viirya commented Jul 18, 2024

The backport PR for branch-3.5: #47406

For branch-3.4, although MergeInto is added there, it isn't really supported for executing it. We have it internally as we backported some changes to internal 3.4 branch. So we don't need a backport PR for branch-3.4.

jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…or MergeInto

### What changes were proposed in this pull request?

We got a customer issue that a `MergeInto` query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4.

The error looks like

```
Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString.
```

The source table of `MergeInto` uses `ScalaUDF`. The error happens when Spark invokes the deserializer of input encoder of the `ScalaUDF` and the deserializer is not resolved yet.

The encoders of ScalaUDF are resolved by the rule `ResolveEncodersInUDF` which will be applied at the end of analysis phase.

During rewriting `MergeInto` to `ReplaceData` query, Spark creates an `Exists` subquery and `ScalaUDF` is part of the plan of the subquery. Note that the `ScalaUDF` is already resolved by the analyzer.

Then, in `ResolveSubquery` rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing `ScalaUDF` is resolved, the rule skips it so `ResolveEncodersInUDF` won't be applied on it. So the analyzed `ReplaceData` query contains a `ScalaUDF` with encoders unresolved that cause the error.

This patch modifies `ResolveSubquery` so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed.

This patch moves `ResolveEncodersInUDF` rule before rewriting `MergeInto` to make sure the `ScalaUDF` in the subquery plan is fully analyzed.

### Why are the changes needed?

Fixing production query error.

### Does this PR introduce _any_ user-facing change?

Yes, fixing user-facing issue.

### How was this patch tested?

Manually test with `MergeInto` query and add an unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47380 from viirya/fix_subquery_resolve.

Lead-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
gengliangwang added a commit that referenced this pull request May 22, 2025
### What changes were proposed in this pull request?

Follow-up of #47380, we should resolve the Scala UDF within all subqueries, instead of modifying the rule orders to make DML rewrites working.
This PR also moves the DML rewrite rules to the main resolution batch, so that the DML rewrite results can apply with other rules such as `ResolveTableConstraints`

### Why are the changes needed?

A better and simpler fix for SPARK-48921

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests
### Was this patch authored or co-authored using generative AI tooling?

No

Closes #50973 from gengliangwang/fixDML.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
### What changes were proposed in this pull request?

Follow-up of apache#47380, we should resolve the Scala UDF within all subqueries, instead of modifying the rule orders to make DML rewrites working.
This PR also moves the DML rewrite rules to the main resolution batch, so that the DML rewrite results can apply with other rules such as `ResolveTableConstraints`

### Why are the changes needed?

A better and simpler fix for SPARK-48921

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests
### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#50973 from gengliangwang/fixDML.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants