Skip to content

Commit 3b634f6

Browse files
karenfengcloud-fan
authored andcommitted
[SPARK-34923][SQL] Metadata output should be empty for more plans
### What changes were proposed in this pull request? Changes the metadata propagation framework. Previously, most `LogicalPlan`'s propagated their `children`'s `metadataOutput`. This did not make sense in cases where the `LogicalPlan` did not even propagate their `children`'s `output`. I set the metadata output for plans that do not propagate their `children`'s `output` to be `Nil`. Notably, `Project` and `View` no longer have metadata output. ### Why are the changes needed? Previously, `SELECT m from (SELECT a from tb)` would output `m` if it were metadata. This did not make sense. ### Does this PR introduce _any_ user-facing change? Yes. Now, `SELECT m from (SELECT a from tb)` will encounter an `AnalysisException`. ### How was this patch tested? Added unit tests. I did not cover all cases, as they are fairly extensive. However, the new tests cover major cases (and an existing test already covers Join). Closes apache#32017 from karenfeng/spark-34923. Authored-by: Karen Feng <karen.feng@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent caf04f9 commit 3b634f6

3 files changed

Lines changed: 132 additions & 1 deletion

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ abstract class LogicalPlan
3434
with QueryPlanConstraints
3535
with Logging {
3636

37-
/** Metadata fields that can be projected from this node */
37+
/**
38+
* Metadata fields that can be projected from this node.
39+
* Should be overridden if the plan does not propagate its children's output.
40+
*/
3841
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
3942

4043
/** Returns true if this subtree has data from a streaming data source. */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ object Subquery {
6161
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
6262
extends OrderPreservingUnaryNode {
6363
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
64+
override def metadataOutput: Seq[Attribute] = Nil
6465
override def maxRows: Option[Long] = child.maxRows
6566

6667
override lazy val resolved: Boolean = {
@@ -187,6 +188,8 @@ case class Intersect(
187188
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
188189
}
189190

191+
override def metadataOutput: Seq[Attribute] = Nil
192+
190193
override protected lazy val validConstraints: ExpressionSet =
191194
leftConstraints.union(rightConstraints)
192195

@@ -207,6 +210,8 @@ case class Except(
207210
/** We don't use right.output because those rows get excluded from the set. */
208211
override def output: Seq[Attribute] = left.output
209212

213+
override def metadataOutput: Seq[Attribute] = Nil
214+
210215
override protected lazy val validConstraints: ExpressionSet = leftConstraints
211216
}
212217

@@ -270,6 +275,8 @@ case class Union(
270275
}
271276
}
272277

278+
override def metadataOutput: Seq[Attribute] = Nil
279+
273280
override lazy val resolved: Boolean = {
274281
// allChildrenCompatible needs to be evaluated after childrenResolved
275282
def allChildrenCompatible: Boolean =
@@ -364,6 +371,17 @@ case class Join(
364371
}
365372
}
366373

374+
override def metadataOutput: Seq[Attribute] = {
375+
joinType match {
376+
case ExistenceJoin(_) =>
377+
left.metadataOutput
378+
case LeftExistence(_) =>
379+
left.metadataOutput
380+
case _ =>
381+
children.flatMap(_.metadataOutput)
382+
}
383+
}
384+
367385
override protected lazy val validConstraints: ExpressionSet = {
368386
joinType match {
369387
case _: InnerLike if condition.isDefined =>
@@ -440,6 +458,7 @@ case class InsertIntoDir(
440458
extends UnaryNode {
441459

442460
override def output: Seq[Attribute] = Seq.empty
461+
override def metadataOutput: Seq[Attribute] = Nil
443462
override lazy val resolved: Boolean = false
444463
}
445464

@@ -466,6 +485,8 @@ case class View(
466485

467486
override def output: Seq[Attribute] = child.output
468487

488+
override def metadataOutput: Seq[Attribute] = Nil
489+
469490
override def simpleString(maxFields: Int): String = {
470491
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
471492
}
@@ -647,6 +668,7 @@ case class Aggregate(
647668
}
648669

649670
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
671+
override def metadataOutput: Seq[Attribute] = Nil
650672
override def maxRows: Option[Long] = {
651673
if (groupingExpressions.isEmpty) {
652674
Some(1L)
@@ -782,6 +804,8 @@ case class Expand(
782804
override lazy val references: AttributeSet =
783805
AttributeSet(projections.flatten.flatMap(_.references))
784806

807+
override def metadataOutput: Seq[Attribute] = Nil
808+
785809
override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)
786810

787811
// This operator can reuse attributes (for example making them null when doing a roll up) so
@@ -818,6 +842,7 @@ case class Pivot(
818842
}
819843
groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg
820844
}
845+
override def metadataOutput: Seq[Attribute] = Nil
821846
}
822847

823848
/**

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2690,6 +2690,109 @@ class DataSourceV2SQLSuite
26902690
}
26912691
}
26922692

2693+
test("SPARK-34923: do not propagate metadata columns through Project") {
2694+
val t1 = s"${catalogAndNamespace}table"
2695+
withTable(t1) {
2696+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2697+
"PARTITIONED BY (bucket(4, id), id)")
2698+
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2699+
2700+
assertThrows[AnalysisException] {
2701+
sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)")
2702+
}
2703+
assertThrows[AnalysisException] {
2704+
spark.table(t1).select("id", "data").select("index", "_partition")
2705+
}
2706+
}
2707+
}
2708+
2709+
test("SPARK-34923: do not propagate metadata columns through View") {
2710+
val t1 = s"${catalogAndNamespace}table"
2711+
val view = "view"
2712+
2713+
withTable(t1) {
2714+
withTempView(view) {
2715+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2716+
"PARTITIONED BY (bucket(4, id), id)")
2717+
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2718+
sql(s"CACHE TABLE $view AS SELECT * FROM $t1")
2719+
assertThrows[AnalysisException] {
2720+
sql(s"SELECT index, _partition FROM $view")
2721+
}
2722+
}
2723+
}
2724+
}
2725+
2726+
test("SPARK-34923: propagate metadata columns through Filter") {
2727+
val t1 = s"${catalogAndNamespace}table"
2728+
withTable(t1) {
2729+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2730+
"PARTITIONED BY (bucket(4, id), id)")
2731+
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2732+
2733+
val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1")
2734+
val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition")
2735+
2736+
Seq(sqlQuery, dfQuery).foreach { query =>
2737+
checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2738+
}
2739+
}
2740+
}
2741+
2742+
test("SPARK-34923: propagate metadata columns through Sort") {
2743+
val t1 = s"${catalogAndNamespace}table"
2744+
withTable(t1) {
2745+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2746+
"PARTITIONED BY (bucket(4, id), id)")
2747+
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2748+
2749+
val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id")
2750+
val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition")
2751+
2752+
Seq(sqlQuery, dfQuery).foreach { query =>
2753+
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2754+
}
2755+
}
2756+
}
2757+
2758+
test("SPARK-34923: propagate metadata columns through RepartitionBy") {
2759+
val t1 = s"${catalogAndNamespace}table"
2760+
withTable(t1) {
2761+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2762+
"PARTITIONED BY (bucket(4, id), id)")
2763+
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2764+
2765+
val sqlQuery = spark.sql(
2766+
s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1")
2767+
val tbl = spark.table(t1)
2768+
val dfQuery = tbl.repartitionByRange(3, tbl.col("id"))
2769+
.select("id", "data", "index", "_partition")
2770+
2771+
Seq(sqlQuery, dfQuery).foreach { query =>
2772+
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2773+
}
2774+
}
2775+
}
2776+
2777+
test("SPARK-34923: propagate metadata columns through SubqueryAlias") {
2778+
val t1 = s"${catalogAndNamespace}table"
2779+
val sbq = "sbq"
2780+
withTable(t1) {
2781+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2782+
"PARTITIONED BY (bucket(4, id), id)")
2783+
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2784+
2785+
val sqlQuery = spark.sql(
2786+
s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq")
2787+
val dfQuery = spark.table(t1).as(sbq).select(
2788+
s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition")
2789+
2790+
Seq(sqlQuery, dfQuery).foreach { query =>
2791+
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2792+
}
2793+
}
2794+
}
2795+
26932796
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
26942797
val e = intercept[AnalysisException] {
26952798
sql(s"$sqlCommand $sqlParams")

0 commit comments

Comments
 (0)