Skip to content

Commit ad7ef81

Browse files
committed
FileStreamSource should not infer partitions in every batch
1 parent 57e97fc commit ad7ef81

3 files changed

Lines changed: 21 additions & 9 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ case class DataSource(
7575
bucketSpec: Option[BucketSpec] = None,
7676
options: Map[String, String] = Map.empty) extends Logging {
7777

78-
case class SourceInfo(name: String, schema: StructType)
78+
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
7979

8080
lazy val providingClass: Class[_] = lookupDataSource(className)
8181
lazy val sourceInfo = sourceSchema()
@@ -186,8 +186,11 @@ case class DataSource(
186186
}
187187
}
188188

189-
private def inferFileFormatSchema(format: FileFormat): StructType = {
190-
userSpecifiedSchema.orElse {
189+
/**
190+
* Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
191+
*/
192+
private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
193+
userSpecifiedSchema.map(_ -> partitionColumns).orElse {
191194
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
192195
val allPaths = caseInsensitiveOptions.get("path")
193196
val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -197,14 +200,14 @@ case class DataSource(
197200
SparkHadoopUtil.get.globPathIfNecessary(qualified)
198201
}.toArray
199202
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
200-
val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
203+
val partitionSchema = fileCatalog.partitionSpec().partitionColumns
201204
val inferred = format.inferSchema(
202205
sparkSession,
203206
caseInsensitiveOptions,
204207
fileCatalog.allFiles())
205208

206209
inferred.map { inferredSchema =>
207-
StructType(inferredSchema ++ partitionCols)
210+
StructType(inferredSchema ++ partitionSchema) -> partitionSchema.map(_.name)
208211
}
209212
}.getOrElse {
210213
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
@@ -217,7 +220,7 @@ case class DataSource(
217220
case s: StreamSourceProvider =>
218221
val (name, schema) = s.sourceSchema(
219222
sparkSession.sqlContext, userSpecifiedSchema, className, options)
220-
SourceInfo(name, schema)
223+
SourceInfo(name, schema, Nil)
221224

222225
case format: FileFormat =>
223226
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
@@ -246,7 +249,8 @@ case class DataSource(
246249
"you may be able to create a static DataFrame on that directory with " +
247250
"'spark.read.load(directory)' and infer schema from it.")
248251
}
249-
SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
252+
val (schema, partCols) = inferFileFormatSchema(format)
253+
SourceInfo(s"FileSource[$path]", schema, partCols)
250254

251255
case _ =>
252256
throw new UnsupportedOperationException(
@@ -266,7 +270,13 @@ case class DataSource(
266270
throw new IllegalArgumentException("'path' is not specified")
267271
})
268272
new FileStreamSource(
269-
sparkSession, path, className, sourceInfo.schema, metadataPath, options)
273+
sparkSession = sparkSession,
274+
path = path,
275+
fileFormatClassName = className,
276+
schema = sourceInfo.schema,
277+
partitionColumns = sourceInfo.partitionColumns,
278+
metadataPath = metadataPath,
279+
options = options)
270280
case _ =>
271281
throw new UnsupportedOperationException(
272282
s"Data source $className does not support streamed reading")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class FileStreamSource(
3535
path: String,
3636
fileFormatClassName: String,
3737
override val schema: StructType,
38+
partitionColumns: Seq[String],
3839
metadataPath: String,
3940
options: Map[String, String]) extends Source with Logging {
4041

@@ -142,6 +143,7 @@ class FileStreamSource(
142143
sparkSession,
143144
paths = files.map(_.path),
144145
userSpecifiedSchema = Some(schema),
146+
partitionColumns = partitionColumns,
145147
className = fileFormatClassName,
146148
options = optionsWithPartitionBasePath)
147149
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
9494
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
9595
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))
9696

97-
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
97+
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil,
9898
dir.getAbsolutePath, Map.empty)
9999
// this method should throw an exception if `fs.exists` is called during resolveRelation
100100
newSource.getBatch(None, LongOffset(1))

0 commit comments

Comments
 (0)