diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala index ef657ba35455f..b26a0785b7f4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.analysis import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Column, Dataset} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Equality, Expression, ExprId} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Cast, Equality, Expression, ExprId} import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -86,6 +86,7 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] { val colRefs = colRefAttrs.map(toColumnReference).distinct val ambiguousColRefs = mutable.HashSet.empty[ColumnReference] val dsIdSet = colRefs.map(_.datasetId).toSet + val inputAttrs = AttributeSet(plan.children.flatMap(_.output)) plan.foreach { case LogicalPlanWithDatasetId(p, id) if dsIdSet.contains(id) => @@ -101,7 +102,15 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] { // the attribute of column reference, then the column reference is ambiguous, as it // refers to a column that gets regenerated by self-join. val actualAttr = p.output(ref.colPos).asInstanceOf[AttributeReference] - if (actualAttr.exprId != ref.exprId) { + // We should only count ambiguous column references if the attribute is available as + // the input attributes of the root node. For example: + // Join(b#1 = 3) + // TableScan(t, [a#0, b#1]) + // Project(a#2) + // TableScan(t, [a#2, b#3]) + // This query is a self-join. The column 'b' in the join condition is not ambiguous, + // as it can't come from the right side, which only has column 'a'. + if (actualAttr.exprId != ref.exprId && inputAttrs.contains(actualAttr)) { ambiguousColRefs += ref } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala index 50846d9d12b97..76f07b5b0132d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala @@ -248,4 +248,13 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { val ds_id2 = df.logicalPlan.getTagValue(Dataset.DATASET_ID_TAG) assert(ds_id1 === ds_id2) } + + test("SPARK-34200: ambiguous column reference should consider attribute availability") { + withTable("t") { + sql("CREATE TABLE t USING json AS SELECT 1 a, 2 b") + val df1 = spark.table("t") + val df2 = df1.select("a") + checkAnswer(df1.join(df2, df1("b") === 2), Row(1, 2, 1)) + } + } }