Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.spark.sql.connector.catalog;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm @brkyvz @rdblue don't we require license header for new files?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's weird RAT missed this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a new PR to add the license headers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #30415

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was due to incorrect exclusion rule. I made a PR.


import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataType;

/**
* Interface for a metadata column.
* <p>
* A metadata column can expose additional metadata about a row. For example, rows from Kafka can
* use metadata columns to expose a message's topic, partition number, and offset.
* <p>
* A metadata column could also be the result of a transform applied to a value in the row. For
* example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In
* this case, {@link #transform()} should return a non-null {@link Transform} that produced the
* metadata column's values.
*/
@Evolving
public interface MetadataColumn {
/**
* The name of this metadata column.
*
* @return a String name
*/
String name();

/**
* The data type of values in this metadata column.
*
* @return a {@link DataType}
*/
DataType dataType();

/**
* @return whether values produced by this metadata column may be null
*/
default boolean isNullable() {
return true;
}

/**
* Documentation for this metadata column, or null.
*
* @return a documentation String
*/
default String comment() {
return null;
}

/**
* The {@link Transform} used to produce this metadata column from data rows, or null.
*
* @return a {@link Transform} used to produce the column's values, or null if there isn't one
*/
default Transform transform() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* An interface for exposing data columns for a table that are not in the table schema. For example,
* a file source could expose a "file" column that contains the path of the file that contained each
* row.
* <p>
Comment on lines +11 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we talk about the behavior of reserving names for metadata columns or the behavior that will happen during name collisions here (data columns will be selected over metadata)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll add that and rebase to fix the conflicts. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this:

If a table column and a metadata column have the same name, the metadata column will never be requested and is ignored. It is recommended that Table implementations reject data column names that conflict with metadata column names.

* The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in
* requested projections. Sources that implement this interface and column projection using
* {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to
* {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
* <p>
* If a table column and a metadata column have the same name, the metadata column will never be
* requested. It is recommended that Table implementations reject data column name that conflict
* with metadata column names.
*/
@Evolving
public interface SupportsMetadataColumns extends Table {
/**
* Metadata columns that are supported by this {@link Table}.
* <p>
* The columns returned by this method may be passed as {@link StructField} in requested
* projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
* <p>
* If a table column and a metadata column have the same name, the metadata column will never be
* requested and is ignored. It is recommended that Table implementations reject data column names
* that conflict with metadata column names.
*
* @return an array of {@link MetadataColumn}
*/
MetadataColumn[] metadataColumns();
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
AddMetadataColumns ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
Expand Down Expand Up @@ -916,6 +917,29 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

/**
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
*
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
* but the relation's output does not include the metadata columns until the relation is replaced
* using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
* relation's output, the analyzer will detect that nothing produces the columns.
*
* This rule only adds metadata columns when a node is resolved but is missing input from its
* children. This ensures that metadata columns are not added to the plan unless they are used. By
* checking only resolved nodes, this ensures that * expansion is already done so that metadata
* columns are not accidentally selected by *.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
node resolveOperatorsUp {
case rel: DataSourceV2Relation =>
rel.withMetadataColumns()
}
}
}

/**
* Resolve table relations with concrete relations from v2 catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {

/** Metadata fields that can be projected from this node */
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)

/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming)

Expand Down Expand Up @@ -86,7 +89,8 @@ abstract class LogicalPlan
}
}

private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
private[this] lazy val childAttributes =
AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))

private[this] lazy val outputAttributes = AttributeSeq(output)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,12 @@ case class SubqueryAlias(
val qualifierList = identifier.qualifier :+ alias
child.output.map(_.withQualifier(qualifierList))
}

override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
}
Comment on lines +890 to +893
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this differentiation needed? Won't the metadata columns be part of output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are eventually part of the output, but they can't be at first because * expansion uses all of output. If we added them immediately, we would get metadata columns in a select *.

Instead, we add the metadata columns to this and then update column resolution to look up columns here. The result is that we can resolve everything just like normal, including *, but the columns are missing from output. Then the new analyzer rule adds the columns to the output if they are resolved, but missing. Since the parent node is already resolved, we know that this is safe and happens after * expansion.


override def doCanonicalize(): LogicalPlan = child.canonicalized
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Implicits {
Expand Down Expand Up @@ -78,6 +80,18 @@ object DataSourceV2Implicits {
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
}

implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
def asStruct: StructType = {
val fields = metadata.map { metaCol =>
val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
Option(metaCol.comment).map(field.withComment).getOrElse(field)
}
StructType(fields)
}

def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
}

implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand All @@ -48,6 +49,21 @@ case class DataSourceV2Relation(

import DataSourceV2Implicits._

override lazy val metadataOutput: Seq[AttributeReference] = table match {
case hasMeta: SupportsMetadataColumns =>
val resolve = SQLConf.get.resolver
val outputNames = outputSet.map(_.name)
def isOutputColumn(col: MetadataColumn): Boolean = {
outputNames.exists(name => resolve(col.name, name))
}
// filter out metadata columns that have names conflicting with output columns. if the table
// has a column "line" and the table can produce a metadata column called "line", then the
// data column should be returned, not the metadata column.
hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
case _ =>
Nil
}

override def name: String = table.name()

override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
Expand Down Expand Up @@ -78,6 +94,14 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}

def withMetadataColumns(): DataSourceV2Relation = {
if (metadataOutput.nonEmpty) {
DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
} else {
this
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ import scala.collection.mutable
import org.scalatest.Assertions._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
import org.apache.spark.sql.types.{DataType, DateType, StructType, TimestampType}
import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String

/**
* A simple in-memory table. Rows are stored as a buffered group produced by each output task.
Expand All @@ -45,7 +47,24 @@ class InMemoryTable(
val schema: StructType,
override val partitioning: Array[Transform],
override val properties: util.Map[String, String])
extends Table with SupportsRead with SupportsWrite with SupportsDelete {
extends Table with SupportsRead with SupportsWrite with SupportsDelete
with SupportsMetadataColumns {

private object PartitionKeyColumn extends MetadataColumn {
override def name: String = "_partition"
override def dataType: DataType = StringType
override def comment: String = "Partition key used to store the row"
}

private object IndexColumn extends MetadataColumn {
override def name: String = "index"
override def dataType: DataType = StringType
override def comment: String = "Metadata column used to conflict with a data column"
}

// purposely exposes a metadata column that conflicts with a data column in some tests
override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)

private val allowUnsupportedTransforms =
properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
Expand Down Expand Up @@ -146,7 +165,7 @@ class InMemoryTable(
val key = getKey(row)
dataMap += dataMap.get(key)
.map(key -> _.withRow(row))
.getOrElse(key -> new BufferedRows().withRow(row))
.getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row))
})
this
}
Expand All @@ -160,17 +179,38 @@ class InMemoryTable(
TableCapability.TRUNCATE).asJava

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
() => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]))
new InMemoryScanBuilder(schema)
}

class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
with SupportsPushDownRequiredColumns {
private var schema: StructType = tableSchema

override def build: Scan =
new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema)

override def pruneColumns(requiredSchema: StructType): Unit = {
// if metadata columns are projected, return the table schema and metadata columns
val hasMetadataColumns = requiredSchema.map(_.name).exists(metadataColumnNames.contains)
if (hasMetadataColumns) {
schema = StructType(tableSchema ++ metadataColumnNames
.flatMap(name => metadataColumns.find(_.name == name))
.map(col => StructField(col.name, col.dataType, col.isNullable)))
}
}
}

class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch {
class InMemoryBatchScan(data: Array[InputPartition], schema: StructType) extends Scan with Batch {
override def readSchema(): StructType = schema

override def toBatch: Batch = this

override def planInputPartitions(): Array[InputPartition] = data

override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
override def createReaderFactory(): PartitionReaderFactory = {
val metadataColumns = schema.map(_.name).filter(metadataColumnNames.contains)
new BufferedRowsReaderFactory(metadataColumns)
}
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
Expand Down Expand Up @@ -340,7 +380,8 @@ object InMemoryTable {
}
}

class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
class BufferedRows(
val key: String = "") extends WriterCommitMessage with InputPartition with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()

def withRow(row: InternalRow): BufferedRows = {
Expand All @@ -349,21 +390,32 @@ class BufferedRows extends WriterCommitMessage with InputPartition with Serializ
}
}

private object BufferedRowsReaderFactory extends PartitionReaderFactory {
private class BufferedRowsReaderFactory(
metadataColumns: Seq[String]) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new BufferedRowsReader(partition.asInstanceOf[BufferedRows])
new BufferedRowsReader(partition.asInstanceOf[BufferedRows], metadataColumns)
}
}

private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] {
private class BufferedRowsReader(
partition: BufferedRows,
metadataColumns: Seq[String]) extends PartitionReader[InternalRow] {
private def addMetadata(row: InternalRow): InternalRow = {
val metadataRow = new GenericInternalRow(metadataColumns.map {
case "index" => index
case "_partition" => UTF8String.fromString(partition.key)
}.toArray)
new JoinedRow(row, metadataRow)
}

private var index: Int = -1

override def next(): Boolean = {
index += 1
index < partition.rows.length
}

override def get(): InternalRow = partition.rows(index)
override def get(): InternalRow = addMetadata(partition.rows(index))

override def close(): Unit = {}
}
Expand Down
Loading