Skip to content

Commit 62cf4d4

Browse files
Yaohua628cloud-fan
authored andcommitted
[SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL
### What changes were proposed in this pull request? This PR proposes a new interface in Spark SQL that allows users to query the metadata of the input files for all file formats. Spark SQL will expose them as **built-in hidden columns** meaning **users can only see them when they explicitly reference them**. Currently, This PR proposes to support the following metadata columns inside of a metadata struct `_metadata`: | Name | Type | Description | Example | | ------------- | ------------- | ------------- | ------------- | | _metadata.file_path | String | The absolute file path of the input file. | file:/tmp/spark-7f600b30-b3ec-43a8-8cd2-686491654f9b/f0.csv | | _metadata.file_name | String | The name of the input file along with the extension. | f0.csv | | _metadata.file_size | Long | The length of the input file, in bytes. | 628 | | _metadata.file_modification_time | Timestamp | The modification timestamp of the file. | 2021-12-20 20:05:21 | This proposed hidden file metadata interface has the following behaviors: - **Hidden**: metadata columns are hidden. They will not show up when only selecting data columns or selecting all `(SELECT *)`. In other words, they are not returned unless being explicitly referenced. - **Not overwrite the data schema**: in the case of name collisions with data columns, data columns will be returned instead of the metadata columns. In other words, metadata columns can not overwrite user data in any case. ### Why are the changes needed? To improve the Spark SQL observability for **all file formats** that still leverage DSV1. ### Does this PR introduce _any_ user-facing change? Yes. ``` spark.read.format("csv") .schema(schema) .load("file:/tmp/*") .select("name", "age", "_metadata.file_path", "_metadata.file_name", "_metadata.file_size", "_metadata.file_modification_time") ``` Example return: | name | age | file_path | file_name | file_size | file_modification_time | | ------------- | ------------- | ------------- | ------------- | ------------- | ------------- | | Debbie | 18 | file:/tmp/f0.csv | f0.csv | 12 | 2021-07-02 01:05:21 | | Frank | 24 | file:/tmp/f1.csv | f1.csv | 11 | 2021-12-20 02:06:21 | ### How was this patch tested? Add new testsuite: FileMetadataColumnsSuite Closes #34575 from Yaohua628/spark-37273. Authored-by: yaohua <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 86b9592 commit 62cf4d4

File tree

13 files changed

+633
-22
lines changed

13 files changed

+633
-22
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,7 @@ class Analyzer(override val catalogManager: CatalogManager)
972972
}
973973

974974
private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
975-
case r: DataSourceV2Relation => r.withMetadataColumns()
975+
case s: ExposesMetadataColumns => s.withMetadataColumns()
976976
case p: Project =>
977977
p.copy(
978978
projectList = p.metadataOutput ++ p.projectList,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
2525
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
2626
import org.apache.spark.sql.catalyst.trees.TreePattern
2727
import org.apache.spark.sql.catalyst.trees.TreePattern._
28-
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
28+
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY}
2929
import org.apache.spark.sql.errors.QueryExecutionErrors
3030
import org.apache.spark.sql.types._
3131
import org.apache.spark.util.collection.BitSet
@@ -432,3 +432,22 @@ object VirtualColumn {
432432
val groupingIdName: String = "spark_grouping_id"
433433
val groupingIdAttribute: UnresolvedAttribute = UnresolvedAttribute(groupingIdName)
434434
}
435+
436+
/**
437+
* The internal representation of the hidden metadata struct:
438+
* set `__metadata_col` to `true` in AttributeReference metadata
439+
* - apply() will create a metadata attribute reference
440+
* - unapply() will check if an attribute reference is the metadata attribute reference
441+
*/
442+
object MetadataAttribute {
443+
def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference =
444+
AttributeReference(name, dataType, nullable,
445+
new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, value = true).build())()
446+
447+
def unapply(attr: AttributeReference): Option[AttributeReference] = {
448+
if (attr.metadata.contains(METADATA_COL_ATTR_KEY)
449+
&& attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)) {
450+
Some(attr)
451+
} else None
452+
}
453+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,10 @@ object LogicalPlanIntegrity {
277277
checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
278278
}
279279
}
280+
281+
/**
282+
* A logical plan node that can generate metadata columns
283+
*/
284+
trait ExposesMetadataColumns extends LogicalPlan {
285+
def withMetadataColumns(): LogicalPlan
286+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
1919

2020
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
2121
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
22-
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
22+
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
2323
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
2424
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
2525
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
@@ -44,7 +44,7 @@ case class DataSourceV2Relation(
4444
catalog: Option[CatalogPlugin],
4545
identifier: Option[Identifier],
4646
options: CaseInsensitiveStringMap)
47-
extends LeafNode with MultiInstanceRelation with NamedRelation {
47+
extends LeafNode with MultiInstanceRelation with NamedRelation with ExposesMetadataColumns {
4848

4949
import DataSourceV2Implicits._
5050

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources._
3535
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
3636
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
3737
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
38+
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
3839
import org.apache.spark.sql.internal.SQLConf
3940
import org.apache.spark.sql.sources.{BaseRelation, Filter}
4041
import org.apache.spark.sql.types.StructType
@@ -198,6 +199,9 @@ case class FileSourceScanExec(
198199
disableBucketedScan: Boolean = false)
199200
extends DataSourceScanExec {
200201

202+
lazy val metadataColumns: Seq[AttributeReference] =
203+
output.collect { case MetadataAttribute(attr) => attr }
204+
201205
// Note that some vals referring the file-based relation are lazy intentionally
202206
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
203207
override lazy val supportsColumnar: Boolean = {
@@ -216,7 +220,10 @@ case class FileSourceScanExec(
216220
relation.fileFormat.vectorTypes(
217221
requiredSchema = requiredSchema,
218222
partitionSchema = relation.partitionSchema,
219-
relation.sparkSession.sessionState.conf)
223+
relation.sparkSession.sessionState.conf).map { vectorTypes =>
224+
// for column-based file format, append metadata struct column's vector type classes if any
225+
vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[OnHeapColumnVector].getName)
226+
}
220227

221228
private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
222229

@@ -359,7 +366,11 @@ case class FileSourceScanExec(
359366
@transient
360367
private lazy val pushedDownFilters = {
361368
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
362-
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
369+
// TODO: should be able to push filters containing metadata columns down to skip files
370+
dataFilters.filterNot(_.references.exists {
371+
case MetadataAttribute(_) => true
372+
case _ => false
373+
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
363374
}
364375

365376
override lazy val metadata: Map[String, String] = {
@@ -601,7 +612,8 @@ case class FileSourceScanExec(
601612
}
602613
}
603614

604-
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
615+
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
616+
requiredSchema, metadataColumns)
605617
}
606618

607619
/**
@@ -657,7 +669,8 @@ case class FileSourceScanExec(
657669
val partitions =
658670
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
659671

660-
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
672+
new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
673+
requiredSchema, metadataColumns)
661674
}
662675

663676
// Filters unused DynamicPruningExpression expressions - one which has been replaced

sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ object PartitionedFileUtil {
3636
val remaining = file.getLen - offset
3737
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
3838
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
39-
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
39+
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts,
40+
file.getModificationTime, file.getLen)
4041
}
4142
} else {
4243
Seq(getPartitionedFile(file, filePath, partitionValues))
@@ -48,7 +49,8 @@ object PartitionedFileUtil {
4849
filePath: Path,
4950
partitionValues: InternalRow): PartitionedFile = {
5051
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
51-
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts)
52+
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts,
53+
file.getModificationTime, file.getLen)
5254
}
5355

5456
private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
2929
import org.apache.spark.sql.errors.QueryExecutionErrors
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.sources.Filter
32-
import org.apache.spark.sql.types.{DataType, StructType}
32+
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
3333

3434

3535
/**
@@ -171,6 +171,29 @@ trait FileFormat {
171171
def supportFieldName(name: String): Boolean = true
172172
}
173173

174+
object FileFormat {
175+
176+
val FILE_PATH = "file_path"
177+
178+
val FILE_NAME = "file_name"
179+
180+
val FILE_SIZE = "file_size"
181+
182+
val FILE_MODIFICATION_TIME = "file_modification_time"
183+
184+
val METADATA_NAME = "_metadata"
185+
186+
// supported metadata struct fields for hadoop fs relation
187+
val METADATA_STRUCT: StructType = new StructType()
188+
.add(StructField(FILE_PATH, StringType))
189+
.add(StructField(FILE_NAME, StringType))
190+
.add(StructField(FILE_SIZE, LongType))
191+
.add(StructField(FILE_MODIFICATION_TIME, TimestampType))
192+
193+
// create a file metadata struct col
194+
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
195+
}
196+
174197
/**
175198
* The base class file format that is based on text file.
176199
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,20 @@ import java.io.{Closeable, FileNotFoundException, IOException}
2121

2222
import scala.util.control.NonFatal
2323

24+
import org.apache.hadoop.fs.Path
25+
2426
import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext}
2527
import org.apache.spark.deploy.SparkHadoopUtil
2628
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
2729
import org.apache.spark.sql.SparkSession
2830
import org.apache.spark.sql.catalyst.InternalRow
31+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow}
2932
import org.apache.spark.sql.errors.QueryExecutionErrors
33+
import org.apache.spark.sql.execution.datasources.FileFormat._
34+
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
35+
import org.apache.spark.sql.types.{LongType, StringType, StructType}
3036
import org.apache.spark.sql.vectorized.ColumnarBatch
37+
import org.apache.spark.unsafe.types.UTF8String
3138
import org.apache.spark.util.NextIterator
3239

3340
/**
@@ -38,14 +45,17 @@ import org.apache.spark.util.NextIterator
3845
* @param filePath URI of the file to read
3946
* @param start the beginning offset (in bytes) of the block.
4047
* @param length number of bytes to read.
41-
* @param locations locality information (list of nodes that have the data).
48+
* @param modificationTime The modification time of the input file, in milliseconds.
49+
* @param fileSize The length of the input file (not the block), in bytes.
4250
*/
4351
case class PartitionedFile(
4452
partitionValues: InternalRow,
4553
filePath: String,
4654
start: Long,
4755
length: Long,
48-
@transient locations: Array[String] = Array.empty) {
56+
@transient locations: Array[String] = Array.empty,
57+
modificationTime: Long = 0L,
58+
fileSize: Long = 0L) {
4959
override def toString: String = {
5060
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
5161
}
@@ -57,7 +67,9 @@ case class PartitionedFile(
5767
class FileScanRDD(
5868
@transient private val sparkSession: SparkSession,
5969
readFunction: (PartitionedFile) => Iterator[InternalRow],
60-
@transient val filePartitions: Seq[FilePartition])
70+
@transient val filePartitions: Seq[FilePartition],
71+
val readDataSchema: StructType,
72+
val metadataColumns: Seq[AttributeReference] = Seq.empty)
6173
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
6274

6375
private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
@@ -103,6 +115,101 @@ class FileScanRDD(
103115
context.killTaskIfInterrupted()
104116
(currentIterator != null && currentIterator.hasNext) || nextIterator()
105117
}
118+
119+
///////////////////////////
120+
// FILE METADATA METHODS //
121+
///////////////////////////
122+
123+
// a metadata internal row, will only be updated when the current file is changed
124+
val metadataRow: InternalRow = new GenericInternalRow(metadataColumns.length)
125+
126+
// an unsafe projection to convert a joined internal row to an unsafe row
127+
private lazy val projection = {
128+
val joinedExpressions =
129+
readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType)
130+
UnsafeProjection.create(joinedExpressions)
131+
}
132+
133+
/**
134+
* For each partitioned file, metadata columns for each record in the file are exactly same.
135+
* Only update metadata row when `currentFile` is changed.
136+
*/
137+
private def updateMetadataRow(): Unit = {
138+
if (metadataColumns.nonEmpty && currentFile != null) {
139+
val path = new Path(currentFile.filePath)
140+
metadataColumns.zipWithIndex.foreach { case (attr, i) =>
141+
attr.name match {
142+
case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString))
143+
case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName))
144+
case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
145+
case FILE_MODIFICATION_TIME =>
146+
// the modificationTime from the file is in millisecond,
147+
// while internally, the TimestampType is stored in microsecond
148+
metadataRow.update(i, currentFile.modificationTime * 1000L)
149+
}
150+
}
151+
}
152+
}
153+
154+
/**
155+
* Create a writable column vector containing all required metadata columns
156+
*/
157+
private def createMetadataColumnVector(c: ColumnarBatch): Array[WritableColumnVector] = {
158+
val path = new Path(currentFile.filePath)
159+
val filePathBytes = path.toString.getBytes
160+
val fileNameBytes = path.getName.getBytes
161+
var rowId = 0
162+
metadataColumns.map(_.name).map {
163+
case FILE_PATH =>
164+
val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
165+
rowId = 0
166+
// use a tight-loop for better performance
167+
while (rowId < c.numRows()) {
168+
columnVector.putByteArray(rowId, filePathBytes)
169+
rowId += 1
170+
}
171+
columnVector
172+
case FILE_NAME =>
173+
val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
174+
rowId = 0
175+
// use a tight-loop for better performance
176+
while (rowId < c.numRows()) {
177+
columnVector.putByteArray(rowId, fileNameBytes)
178+
rowId += 1
179+
}
180+
columnVector
181+
case FILE_SIZE =>
182+
val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
183+
columnVector.putLongs(0, c.numRows(), currentFile.fileSize)
184+
columnVector
185+
case FILE_MODIFICATION_TIME =>
186+
val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
187+
// the modificationTime from the file is in millisecond,
188+
// while internally, the TimestampType is stored in microsecond
189+
columnVector.putLongs(0, c.numRows(), currentFile.modificationTime * 1000L)
190+
columnVector
191+
}.toArray
192+
}
193+
194+
/**
195+
* Add metadata columns at the end of nextElement if needed.
196+
* For different row implementations, use different methods to update and append.
197+
*/
198+
private def addMetadataColumnsIfNeeded(nextElement: Object): Object = {
199+
if (metadataColumns.nonEmpty) {
200+
nextElement match {
201+
case c: ColumnarBatch =>
202+
new ColumnarBatch(
203+
Array.tabulate(c.numCols())(c.column) ++ createMetadataColumnVector(c),
204+
c.numRows())
205+
case u: UnsafeRow => projection.apply(new JoinedRow(u, metadataRow))
206+
case i: InternalRow => new JoinedRow(i, metadataRow)
207+
}
208+
} else {
209+
nextElement
210+
}
211+
}
212+
106213
def next(): Object = {
107214
val nextElement = currentIterator.next()
108215
// TODO: we should have a better separation of row based and batch based scan, so that we
@@ -118,7 +225,7 @@ class FileScanRDD(
118225
}
119226
inputMetrics.incRecordsRead(1)
120227
}
121-
nextElement
228+
addMetadataColumnsIfNeeded(nextElement)
122229
}
123230

124231
private def readCurrentFile(): Iterator[InternalRow] = {
@@ -134,6 +241,7 @@ class FileScanRDD(
134241
private def nextIterator(): Boolean = {
135242
if (files.hasNext) {
136243
currentFile = files.next()
244+
updateMetadataRow()
137245
logInfo(s"Reading File $currentFile")
138246
// Sets InputFileBlockHolder for the file block's information
139247
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
@@ -201,6 +309,7 @@ class FileScanRDD(
201309
}
202310
} else {
203311
currentFile = null
312+
updateMetadataRow()
204313
InputFileBlockHolder.unset()
205314
false
206315
}

0 commit comments

Comments
 (0)