Skip to content

Commit 0a5c7ca

Browse files
committed
[SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2.
1 parent f206bbd commit 0a5c7ca

9 files changed

Lines changed: 171 additions & 11 deletions

File tree

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.apache.spark.sql.connector.catalog;
2+
3+
import org.apache.spark.annotation.Evolving;
4+
import org.apache.spark.sql.connector.expressions.Transform;
5+
import org.apache.spark.sql.types.DataType;
6+
7+
/**
8+
* Interface for a metadata column.
9+
* <p>
10+
* A metadata column can expose additional metadata about a row. For example, rows from Kafka can
11+
* use metadata columns to expose a message's topic, partition number, and offset.
12+
* <p>
13+
* A metadata column could also be the result of a transform applied to a value in the row. For
14+
* example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In
15+
* this case, {@link #transform()} should return a non-null {@link Transform} that produced the
16+
* metadata column's values.
17+
*/
18+
@Evolving
19+
public interface MetadataColumn {
20+
/**
21+
* The name of this metadata column.
22+
*
23+
* @return a String name
24+
*/
25+
String name();
26+
27+
/**
28+
* The data type of values in this metadata column.
29+
*
30+
* @return a {@link DataType}
31+
*/
32+
DataType dataType();
33+
34+
/**
35+
* @return whether values produced by this metadata column may be null
36+
*/
37+
default boolean isNullable() {
38+
return true;
39+
}
40+
41+
/**
42+
* Documentation for this metadata column, or null.
43+
*
44+
* @return a documentation String
45+
*/
46+
String comment();
47+
48+
/**
49+
* The {@link Transform} used to produce this metadata column from data rows, or null.
50+
*
51+
* @return a {@link Transform} used to produce the column's values, or null if there isn't one
52+
*/
53+
Transform transform();
54+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.apache.spark.sql.connector.catalog;
2+
3+
import org.apache.spark.annotation.Evolving;
4+
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
5+
import org.apache.spark.sql.types.StructField;
6+
import org.apache.spark.sql.types.StructType;
7+
8+
/**
9+
* An interface for exposing data columns for a table that are not in the table schema. For example,
10+
* a file source could expose a "file" column that contains the path of the file that contained each
11+
* row.
12+
* <p>
13+
* The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in
14+
* requested projections. Sources that implement this interface and column projection using
15+
* {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to
16+
* {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
17+
*/
18+
@Evolving
19+
public interface SupportsMetadataColumns extends Table {
20+
/**
21+
* Metadata columns that are supported by this {@link Table}.
22+
* <p>
23+
* The columns returned by this method may be passed as {@link StructField} in requested
24+
* projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
25+
*
26+
* @return an array of {@link MetadataColumn}
27+
*/
28+
MetadataColumn[] metadataColumns();
29+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ class Analyzer(
217217
ResolveInsertInto ::
218218
ResolveRelations ::
219219
ResolveTables ::
220+
AddMetadataColumns ::
220221
ResolveReferences ::
221222
ResolveCreateNamedStruct ::
222223
ResolveDeserializer ::
@@ -803,6 +804,29 @@ class Analyzer(
803804
}
804805
}
805806

807+
/**
808+
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
809+
*
810+
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
811+
* but the relation's output does not include the metadata columns until the relation is replaced
812+
* using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
813+
* relation's output, the analyzer will detect that nothing produces the columns.
814+
*
815+
* This rule only adds metadata columns when a node is resolved but is missing input from its
816+
* children. This ensures that metadata columns are not added to the plan unless they are used. By
817+
* checking only resolved nodes, this ensures that * expansion is already done so that metadata
818+
* columns are not accidentally selected by *.
819+
*/
820+
object AddMetadataColumns extends Rule[LogicalPlan] {
821+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
822+
case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
823+
node resolveOperatorsUp {
824+
case rel: DataSourceV2Relation =>
825+
rel.withMetadataColumns()
826+
}
827+
}
828+
}
829+
806830
/**
807831
* Resolve table relations with concrete relations from v2 catalog.
808832
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ abstract class LogicalPlan
3333
with QueryPlanConstraints
3434
with Logging {
3535

36+
/** Metadata fields that can be projected from this node */
37+
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
38+
3639
/** Returns true if this subtree has data from a streaming data source. */
3740
def isStreaming: Boolean = children.exists(_.isStreaming)
3841

@@ -86,7 +89,8 @@ abstract class LogicalPlan
8689
}
8790
}
8891

89-
private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
92+
private[this] lazy val childAttributes =
93+
AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))
9094

9195
private[this] lazy val outputAttributes = AttributeSeq(output)
9296

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,12 @@ case class SubqueryAlias(
869869
val qualifierList = identifier.qualifier :+ alias
870870
child.output.map(_.withQualifier(qualifierList))
871871
}
872+
873+
override def metadataOutput: Seq[Attribute] = {
874+
val qualifierList = identifier.qualifier :+ alias
875+
child.metadataOutput.map(_.withQualifier(qualifierList))
876+
}
877+
872878
override def doCanonicalize(): LogicalPlan = child.canonicalized
873879
}
874880

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.spark.sql.AnalysisException
23-
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}
23+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
24+
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}
25+
import org.apache.spark.sql.types.{StructField, StructType}
2426
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2527

2628
object DataSourceV2Implicits {
@@ -57,6 +59,18 @@ object DataSourceV2Implicits {
5759
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
5860
}
5961

62+
implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
63+
def asStruct: StructType = {
64+
val fields = metadata.map { metaCol =>
65+
val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
66+
Option(metaCol.comment).map(field.withComment).getOrElse(field)
67+
}
68+
StructType(fields)
69+
}
70+
71+
def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
72+
}
73+
6074
implicit class OptionsHelper(options: Map[String, String]) {
6175
def asOptions: CaseInsensitiveStringMap = {
6276
new CaseInsensitiveStringMap(options.asJava)

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
2121
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2222
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2323
import org.apache.spark.sql.catalyst.util.truncatedString
24-
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
24+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsMetadataColumns, Table, TableCapability}
2525
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
2626
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
2727
import org.apache.spark.sql.connector.write.WriteBuilder
@@ -48,6 +48,15 @@ case class DataSourceV2Relation(
4848

4949
import DataSourceV2Implicits._
5050

51+
override lazy val metadataOutput: Seq[AttributeReference] = table match {
52+
case hasMeta: SupportsMetadataColumns =>
53+
val attrs = hasMeta.metadataColumns
54+
val outputNames = outputSet.map(_.name).toSet
55+
attrs.filterNot(col => outputNames.contains(col.name)).toAttributes
56+
case _ =>
57+
Nil
58+
}
59+
5160
override def name: String = table.name()
5261

5362
override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
@@ -78,6 +87,14 @@ case class DataSourceV2Relation(
7887
override def newInstance(): DataSourceV2Relation = {
7988
copy(output = output.map(_.newInstance()))
8089
}
90+
91+
def withMetadataColumns(): DataSourceV2Relation = {
92+
if (metadataOutput.nonEmpty) {
93+
DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
94+
} else {
95+
this
96+
}
97+
}
8198
}
8299

83100
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
26-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
26+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog}
2727
import org.apache.spark.sql.types.StructType
2828

2929
case class DescribeTableExec(
@@ -39,6 +39,7 @@ case class DescribeTableExec(
3939
addPartitioning(rows)
4040

4141
if (isExtended) {
42+
addMetadataColumns(rows)
4243
addTableDetails(rows)
4344
}
4445
rows
@@ -70,6 +71,19 @@ case class DescribeTableExec(
7071
}
7172
}
7273

74+
private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match {
75+
case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty =>
76+
rows += emptyRow()
77+
rows += toCatalystRow("# Metadata Columns", "", "")
78+
rows ++= hasMeta.metadataColumns.map { column =>
79+
toCatalystRow(
80+
column.name,
81+
column.dataType.simpleString,
82+
Option(column.comment()).getOrElse(""))
83+
}
84+
case _ =>
85+
}
86+
7387
private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
7488
rows += emptyRow()
7589
rows += toCatalystRow("# Partitioning", "", "")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,11 @@ object PushDownUtils extends PredicateHelper {
9595
val exprs = projects ++ filters
9696
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
9797
val neededOutput = relation.output.filter(requiredColumns.contains)
98-
if (neededOutput != relation.output) {
99-
r.pruneColumns(neededOutput.toStructType)
100-
val scan = r.build()
101-
scan -> toOutputAttrs(scan.readSchema(), relation)
102-
} else {
103-
r.build() -> relation.output
104-
}
98+
r.pruneColumns(neededOutput.toStructType)
99+
val scan = r.build()
100+
// always project, in case the relation's output has been updated and doesn't match
101+
// the underlying table schema
102+
scan -> toOutputAttrs(scan.readSchema(), relation)
105103

106104
case _ => scanBuilder.build() -> relation.output
107105
}

0 commit comments

Comments
 (0)