Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ case class EventTimeWatermark(
a
}
}

override val metadataOutput: Seq[Attribute] = child.metadataOutput
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {

/** Metadata fields that can be projected from this node */
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
/**
* Metadata fields that can be projected from this node.
* Should be non-empty if the plan propagates its children's output.
*/
def metadataOutput: Seq[Attribute] = Nil

/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.random.RandomSampler

/**
* When planning take() or collect() operations, this special node that is inserted at the top of
* When planning take() or collect() operations, this special node is inserted at the top of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: including unrelated changes tends to cause git conflicts.

* the logical plan before invoking the query planner.
*
* Rules can pattern-match on this node in order to apply transformations that only take effect
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.util.random.RandomSampler
case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/**
Expand All @@ -51,6 +52,7 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
*/
case class Subquery(child: LogicalPlan, correlated: Boolean) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

object Subquery {
Expand Down Expand Up @@ -134,11 +136,13 @@ case class Generate(
}

def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

case class Filter(condition: Expression, child: LogicalPlan)
extends OrderPreservingUnaryNode with PredicateHelper {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput

override def maxRows: Option[Long] = child.maxRows

Expand Down Expand Up @@ -364,6 +368,17 @@ case class Join(
}
}

override def metadataOutput: Seq[Attribute] = {
joinType match {
case ExistenceJoin(_) =>
left.metadataOutput
case LeftExistence(_) =>
left.metadataOutput
case _ =>
children.flatMap(_.metadataOutput)
}
}

override protected lazy val validConstraints: ExpressionSet = {
joinType match {
case _: InnerLike if condition.isDefined =>
Expand Down Expand Up @@ -520,6 +535,8 @@ object View {
case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def metadataOutput: Seq[Attribute] = child.metadataOutput

override def simpleString(maxFields: Int): String = {
val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields)
s"CTE $cteAliases"
Expand All @@ -532,6 +549,7 @@ case class WithWindowDefinition(
windowDefinitions: Map[String, WindowSpecDefinition],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/**
Expand All @@ -545,6 +563,7 @@ case class Sort(
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
override def maxRows: Option[Long] = child.maxRows
override def outputOrdering: Seq[SortOrder] = order
}
Expand Down Expand Up @@ -669,6 +688,7 @@ case class Window(
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] =
child.output ++ windowExpressions.map(_.toAttribute)
override def metadataOutput: Seq[Attribute] = child.metadataOutput

override def producedAttributes: AttributeSet = windowOutputSet

Expand Down Expand Up @@ -861,6 +881,7 @@ object Limit {
*/
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
Expand All @@ -877,6 +898,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderP
*/
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput

override def maxRowsPerPartition: Option[Long] = {
limitExpr match {
Expand All @@ -898,6 +920,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
*/
case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
Expand All @@ -924,11 +947,6 @@ case class SubqueryAlias(
child.output.map(_.withQualifier(qualifierList))
}

override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the logic here removed? Won't this cause resolution failures when referencing a metadata column via an alias? Like SELECT s._file FROM (SELECT ...) s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, should we support this case if it requires changing the query during analysis after being resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only expose the metadata column in a single SELECT group, e.g. SELECT _file FROM t, SELECT t1._file FROM t1 JOIN t2.

It's super weird if we can propagate the metadata column through SELECT groups, e.g. SELECT s._file FROM (SELECT a, b FROM t) s. The s is a subquery alias and the subquery has a clear output list which is a, b. It may surprise users if they can access s._file.

However, I do agree with @rdblue that simple alias should be supported. For example, SELECT t1._file FROM t t1 JOIN t t2. SELECT s._file FROM (SELECT ...) s won't work anyway because Project can't propagate the metadata columns.

That said, let's propagate metadata columns in SubqueryAlias


override def doCanonicalize(): LogicalPlan = child.canonicalized
}

Expand Down Expand Up @@ -983,6 +1001,7 @@ case class Sample(

override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/**
Expand All @@ -991,6 +1010,7 @@ case class Sample(
case class Distinct(child: LogicalPlan) extends UnaryNode {
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/**
Expand All @@ -1001,6 +1021,7 @@ abstract class RepartitionOperation extends UnaryNode {
def numPartitions: Int
override final def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
def partitioning: Partitioning
}

Expand Down Expand Up @@ -1095,6 +1116,7 @@ case class Deduplicate(
child: LogicalPlan) extends UnaryNode {
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/**
Expand Down Expand Up @@ -1123,4 +1145,5 @@ case class CollectMetrics(
}

override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan

override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = child.output
override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/**
Expand All @@ -42,6 +43,8 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())

override def output: Seq[Attribute] = child.output

override def metadataOutput: Seq[Attribute] = child.metadataOutput

override def doCanonicalize(): LogicalPlan = child.canonicalized
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ case class TypedFilter(

override def output: Seq[Attribute] = child.output

override def metadataOutput: Seq[Attribute] = child.metadataOutput

def withObjectProducerChild(obj: LogicalPlan): Filter = {
assert(obj.output.length == 1)
Filter(typedCondition(obj.output.head), obj)
Expand Down Expand Up @@ -333,6 +335,8 @@ case class AppendColumns(

override def output: Seq[Attribute] = child.output ++ newColumns

override def metadataOutput: Seq[Attribute] = child.metadataOutput

def newColumns: Seq[Attribute] = serializer.map(_.toAttribute)
}

Expand All @@ -346,6 +350,8 @@ case class AppendColumnsWithObject(
child: LogicalPlan) extends ObjectConsumer {

override def output: Seq[Attribute] = (childSerializer ++ newColumnsSerializer).map(_.toAttribute)

override def metadataOutput: Seq[Attribute] = child.metadataOutput
}

/** Factory for constructing new `MapGroups` nodes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ trait BaseEvalPython extends UnaryNode {

override def output: Seq[Attribute] = child.output ++ resultAttrs

override def metadataOutput: Seq[Attribute] = child.metadataOutput

override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,73 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-34923: do not propagate metadata columns through Project") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")

assertThrows[AnalysisException] {
sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)")
}
assertThrows[AnalysisException] {
spark.table(t1).select("id", "data").select("index", "_partition")
}
}
}

test("SPARK-34923: propagate metadata columns through Filter") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")

val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1")
val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition")

Seq(sqlQuery, dfQuery).foreach { query =>
checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
}
}
}

test("SPARK-34923: propagate metadata columns through Sort") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")

val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id")
val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition")

Seq(sqlQuery, dfQuery).foreach { query =>
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
}
}
}

test("SPARK-34923: propagate metadata columns through RepartitionBy") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")

val sqlQuery = spark.sql(
s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1")
val tbl = spark.table(t1)
val dfQuery = tbl.repartitionByRange(3, tbl.col("id"))
.select("id", "data", "index", "_partition")

Seq(sqlQuery, dfQuery).foreach { query =>
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
}
}
}

private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down