Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.analysis
import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, Column, Dataset}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Equality, Expression, ExprId}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Cast, Equality, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -86,6 +86,7 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
val colRefs = colRefAttrs.map(toColumnReference).distinct
val ambiguousColRefs = mutable.HashSet.empty[ColumnReference]
val dsIdSet = colRefs.map(_.datasetId).toSet
val inputAttrs = AttributeSet(plan.children.flatMap(_.output))

plan.foreach {
case LogicalPlanWithDatasetId(p, id) if dsIdSet.contains(id) =>
Expand All @@ -101,7 +102,15 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
// the attribute of column reference, then the column reference is ambiguous, as it
// refers to a column that gets regenerated by self-join.
val actualAttr = p.output(ref.colPos).asInstanceOf[AttributeReference]
if (actualAttr.exprId != ref.exprId) {
// We should only count ambiguous column references if the attribute is available as
// the input attributes of the root node. For example:
// Join(b#1 = 3)
// TableScan(t, [a#0, b#1])
// Project(a#2)
// TableScan(t, [a#2, b#3])
// This query is a self-join. The column 'b' in the join condition is not ambiguous,
// as it can't come from the right side, which only has column 'a'.
if (actualAttr.exprId != ref.exprId && inputAttrs.contains(actualAttr)) {
ambiguousColRefs += ref
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,13 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
val ds_id2 = df.logicalPlan.getTagValue(Dataset.DATASET_ID_TAG)
assert(ds_id1 === ds_id2)
}

test("SPARK-34200: ambiguous column reference should consider attribute availability") {
withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 a, 2 b")
val df1 = spark.table("t")
val df2 = df1.select("a")
checkAnswer(df1.join(df2, df1("b") === 2), Row(1, 2, 1))
}
}
}