Skip to content

Commit 0fd3a47

Browse files
committed
[SPARK-15103][SQL] Refactored FileCatalog class to allow StreamFileCatalog to infer partitioning
## What changes were proposed in this pull request? File Stream Sink writes the list of written files in a metadata log. StreamFileCatalog reads the list of the files for processing. However StreamFileCatalog does not infer partitioning like HDFSFileCatalog. This PR enables that by refactoring HDFSFileCatalog to create an abstract class PartitioningAwareFileCatalog, that has all the functionality to infer partitions from a list of leaf files. - HDFSFileCatalog has been renamed to ListingFileCatalog and it extends PartitioningAwareFileCatalog by providing a list of leaf files from recursive directory scanning. - StreamFileCatalog has been renamed to MetadataLogFileCatalog and it extends PartitioningAwareFileCatalog by providing a list of leaf files from the metadata log. - The above two classes has been moved into their own files as they are not interfaces that should be in fileSourceInterfaces.scala. ## How was this patch tested? - FileStreamSinkSuite was update to see if partitioning gets inferred, and on reading whether the partitions get pruned correctly based on the query. - Other unit tests are unchanged and pass as expected. Author: Tathagata Das <[email protected]> Closes #12879 from tdas/SPARK-15103.
1 parent 6274a52 commit 0fd3a47

8 files changed

Lines changed: 410 additions & 285 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ case class DataSource(
136136
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
137137
SparkHadoopUtil.get.globPathIfNecessary(qualified)
138138
}.toArray
139-
val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None)
139+
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
140140
format.inferSchema(
141141
sparkSession,
142142
caseInsensitiveOptions,
@@ -258,7 +258,7 @@ case class DataSource(
258258
case (format: FileFormat, _)
259259
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
260260
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
261-
val fileCatalog = new StreamFileCatalog(sparkSession, basePath)
261+
val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
262262
val dataSchema = userSpecifiedSchema.orElse {
263263
format.inferSchema(
264264
sparkSession,
@@ -310,8 +310,8 @@ case class DataSource(
310310
})
311311
}
312312

313-
val fileCatalog: FileCatalog =
314-
new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema)
313+
val fileCatalog =
314+
new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema)
315315

316316
val dataSchema = userSpecifiedSchema.map { schema =>
317317
val equality =
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import scala.collection.mutable
21+
import scala.util.Try
22+
23+
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
24+
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
25+
26+
import org.apache.spark.sql.SparkSession
27+
import org.apache.spark.sql.types.StructType
28+
29+
30+
/**
31+
* A [[FileCatalog]] that generates the list of files to process by recursively listing all the
32+
* files present in `paths`.
33+
*
34+
* @param parameters as set of options to control discovery
35+
* @param paths a list of paths to scan
36+
* @param partitionSchema an optional partition schema that will be use to provide types for the
37+
* discovered partitions
38+
*/
39+
class ListingFileCatalog(
40+
sparkSession: SparkSession,
41+
override val paths: Seq[Path],
42+
parameters: Map[String, String],
43+
partitionSchema: Option[StructType])
44+
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
45+
46+
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
47+
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
48+
@volatile private var cachedPartitionSpec: PartitionSpec = _
49+
50+
refresh()
51+
52+
override def partitionSpec(): PartitionSpec = {
53+
if (cachedPartitionSpec == null) {
54+
cachedPartitionSpec = inferPartitioning()
55+
}
56+
cachedPartitionSpec
57+
}
58+
59+
override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
60+
cachedLeafFiles
61+
}
62+
63+
override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
64+
cachedLeafDirToChildrenFiles
65+
}
66+
67+
override def refresh(): Unit = {
68+
val files = listLeafFiles(paths)
69+
cachedLeafFiles =
70+
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
71+
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
72+
cachedPartitionSpec = null
73+
}
74+
75+
protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
76+
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
77+
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
78+
} else {
79+
val statuses: Seq[FileStatus] = paths.flatMap { path =>
80+
val fs = path.getFileSystem(hadoopConf)
81+
logInfo(s"Listing $path on driver")
82+
// Dummy jobconf to get to the pathFilter defined in configuration
83+
val jobConf = new JobConf(hadoopConf, this.getClass)
84+
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
85+
86+
val statuses = {
87+
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
88+
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
89+
}
90+
91+
statuses.map {
92+
case f: LocatedFileStatus => f
93+
94+
// NOTE:
95+
//
96+
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
97+
// operations, calling `getFileBlockLocations` does no harm here since these file system
98+
// implementations don't actually issue RPC for this method.
99+
//
100+
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
101+
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
102+
// exceeds threshold.
103+
case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
104+
}
105+
}.filterNot { status =>
106+
val name = status.getPath.getName
107+
HadoopFsRelation.shouldFilterOut(name)
108+
}
109+
110+
val (dirs, files) = statuses.partition(_.isDirectory)
111+
112+
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
113+
if (dirs.isEmpty) {
114+
mutable.LinkedHashSet(files: _*)
115+
} else {
116+
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
117+
}
118+
}
119+
}
120+
121+
override def equals(other: Any): Boolean = other match {
122+
case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
123+
case _ => false
124+
}
125+
126+
override def hashCode(): Int = paths.toSet.hashCode()
127+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.hadoop.fs.{FileStatus, Path}
23+
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
27+
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.types.{StringType, StructType}
29+
30+
31+
/**
32+
* An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
33+
* It provides the necessary methods to parse partition data based on a set of files.
34+
*
35+
* @param parameters as set of options to control partition discovery
36+
* @param partitionSchema an optional partition schema that will be use to provide types for the
37+
* discovered partitions
38+
*/
39+
abstract class PartitioningAwareFileCatalog(
40+
sparkSession: SparkSession,
41+
parameters: Map[String, String],
42+
partitionSchema: Option[StructType])
43+
extends FileCatalog with Logging {
44+
45+
protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
46+
47+
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
48+
49+
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
50+
51+
override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
52+
if (partitionSpec().partitionColumns.isEmpty) {
53+
Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
54+
} else {
55+
prunePartitions(filters, partitionSpec()).map {
56+
case PartitionDirectory(values, path) =>
57+
Partition(
58+
values,
59+
leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
60+
}
61+
}
62+
}
63+
64+
override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
65+
66+
protected def inferPartitioning(): PartitionSpec = {
67+
// We use leaf dirs containing data files to discover the schema.
68+
val leafDirs = leafDirToChildrenFiles.keys.toSeq
69+
partitionSchema match {
70+
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
71+
val spec = PartitioningUtils.parsePartitions(
72+
leafDirs,
73+
PartitioningUtils.DEFAULT_PARTITION_NAME,
74+
typeInference = false,
75+
basePaths = basePaths)
76+
77+
// Without auto inference, all of value in the `row` should be null or in StringType,
78+
// we need to cast into the data type that user specified.
79+
def castPartitionValuesToUserSchema(row: InternalRow) = {
80+
InternalRow((0 until row.numFields).map { i =>
81+
Cast(
82+
Literal.create(row.getUTF8String(i), StringType),
83+
userProvidedSchema.fields(i).dataType).eval()
84+
}: _*)
85+
}
86+
87+
PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
88+
part.copy(values = castPartitionValuesToUserSchema(part.values))
89+
})
90+
case _ =>
91+
PartitioningUtils.parsePartitions(
92+
leafDirs,
93+
PartitioningUtils.DEFAULT_PARTITION_NAME,
94+
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
95+
basePaths = basePaths)
96+
}
97+
}
98+
99+
private def prunePartitions(
100+
predicates: Seq[Expression],
101+
partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
102+
val PartitionSpec(partitionColumns, partitions) = partitionSpec
103+
val partitionColumnNames = partitionColumns.map(_.name).toSet
104+
val partitionPruningPredicates = predicates.filter {
105+
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
106+
}
107+
108+
if (partitionPruningPredicates.nonEmpty) {
109+
val predicate = partitionPruningPredicates.reduce(expressions.And)
110+
111+
val boundPredicate = InterpretedPredicate.create(predicate.transform {
112+
case a: AttributeReference =>
113+
val index = partitionColumns.indexWhere(a.name == _.name)
114+
BoundReference(index, partitionColumns(index).dataType, nullable = true)
115+
})
116+
117+
val selected = partitions.filter {
118+
case PartitionDirectory(values, _) => boundPredicate(values)
119+
}
120+
logInfo {
121+
val total = partitions.length
122+
val selectedSize = selected.length
123+
val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
124+
s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
125+
}
126+
127+
selected
128+
} else {
129+
partitions
130+
}
131+
}
132+
133+
/**
134+
* Contains a set of paths that are considered as the base dirs of the input datasets.
135+
* The partitioning discovery logic will make sure it will stop when it reaches any
136+
* base path. By default, the paths of the dataset provided by users will be base paths.
137+
* For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
138+
* will be `/path/something=true/`, and the returned DataFrame will not contain a column of
139+
* `something`. If users want to override the basePath. They can set `basePath` in the options
140+
* to pass the new base path to the data source.
141+
* For the above example, if the user-provided base path is `/path/`, the returned
142+
* DataFrame will have the column of `something`.
143+
*/
144+
private def basePaths: Set[Path] = {
145+
val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
146+
userDefinedBasePath.getOrElse {
147+
// If the user does not provide basePath, we will just use paths.
148+
paths.toSet
149+
}.map { hdfsPath =>
150+
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
151+
val fs = hdfsPath.getFileSystem(hadoopConf)
152+
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
153+
}
154+
}
155+
}

0 commit comments

Comments
 (0)