Skip to content

Commit 4170f66

Browse files
JoshRosenuzadude
authored andcommitted
[SPARK-18362][SQL] Use TextFileFormat in implementation of CSVFileFormat
## What changes were proposed in this pull request? This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source. Previously, this data source used the legacy `SparkContext.hadoopFile` and `SparkContext.hadoopRDD` methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver. This patch refactors this logic to use Spark SQL's `text` data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing _any_ extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes. A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR. ## How was this patch tested? Existing unit tests, plus manual benchmarking on a production workload. Author: Josh Rosen <[email protected]> Closes apache#15813 from JoshRosen/use-text-data-source-in-csv-and-json.
1 parent 7c5aaa2 commit 4170f66

File tree

2 files changed

+28
-36
lines changed

2 files changed

+28
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import org.apache.hadoop.mapreduce._
2727

2828
import org.apache.spark.TaskContext
2929
import org.apache.spark.rdd.RDD
30-
import org.apache.spark.sql.SparkSession
30+
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3131
import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.catalyst.util.CompressionCodecs
3333
import org.apache.spark.sql.execution.datasources._
34+
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
35+
import org.apache.spark.sql.functions.{length, trim}
3436
import org.apache.spark.sql.sources._
3537
import org.apache.spark.sql.types._
3638
import org.apache.spark.util.SerializableConfiguration
@@ -52,17 +54,21 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
5254
sparkSession: SparkSession,
5355
options: Map[String, String],
5456
files: Seq[FileStatus]): Option[StructType] = {
57+
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
5558
val csvOptions = new CSVOptions(options)
5659

5760
// TODO: Move filtering.
5861
val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
59-
val rdd = baseRdd(sparkSession, csvOptions, paths)
60-
val firstLine = findFirstLine(csvOptions, rdd)
62+
val lines: Dataset[String] = readText(sparkSession, csvOptions, paths)
63+
val firstLine: String = findFirstLine(csvOptions, lines)
6164
val firstRow = new CsvReader(csvOptions).parseLine(firstLine)
6265
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
6366
val header = makeSafeHeader(firstRow, csvOptions, caseSensitive)
6467

65-
val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
68+
val parsedRdd: RDD[Array[String]] = CSVRelation.univocityTokenizer(
69+
lines,
70+
firstLine = if (csvOptions.headerFlag) firstLine else null,
71+
params = csvOptions)
6672
val schema = if (csvOptions.inferSchemaFlag) {
6773
CSVInferSchema.infer(parsedRdd, header, csvOptions)
6874
} else {
@@ -173,51 +179,37 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
173179
}
174180
}
175181

176-
private def baseRdd(
177-
sparkSession: SparkSession,
178-
options: CSVOptions,
179-
inputPaths: Seq[String]): RDD[String] = {
180-
readText(sparkSession, options, inputPaths.mkString(","))
181-
}
182-
183-
private def tokenRdd(
184-
sparkSession: SparkSession,
185-
options: CSVOptions,
186-
header: Array[String],
187-
inputPaths: Seq[String]): RDD[Array[String]] = {
188-
val rdd = baseRdd(sparkSession, options, inputPaths)
189-
// Make sure firstLine is materialized before sending to executors
190-
val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null
191-
CSVRelation.univocityTokenizer(rdd, firstLine, options)
192-
}
193-
194182
/**
195183
* Returns the first line of the first non-empty file in path
196184
*/
197-
private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = {
185+
private def findFirstLine(options: CSVOptions, lines: Dataset[String]): String = {
186+
import lines.sqlContext.implicits._
187+
val nonEmptyLines = lines.filter(length(trim($"value")) > 0)
198188
if (options.isCommentSet) {
199-
val comment = options.comment.toString
200-
rdd.filter { line =>
201-
line.trim.nonEmpty && !line.startsWith(comment)
202-
}.first()
189+
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).first()
203190
} else {
204-
rdd.filter { line =>
205-
line.trim.nonEmpty
206-
}.first()
191+
nonEmptyLines.first()
207192
}
208193
}
209194

210195
private def readText(
211196
sparkSession: SparkSession,
212197
options: CSVOptions,
213-
location: String): RDD[String] = {
198+
inputPaths: Seq[String]): Dataset[String] = {
214199
if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
215-
sparkSession.sparkContext.textFile(location)
200+
sparkSession.baseRelationToDataFrame(
201+
DataSource.apply(
202+
sparkSession,
203+
paths = inputPaths,
204+
className = classOf[TextFileFormat].getName
205+
).resolveRelation(checkFilesExist = false))
206+
.select("value").as[String](Encoders.STRING)
216207
} else {
217208
val charset = options.charset
218-
sparkSession.sparkContext
219-
.hadoopFile[LongWritable, Text, TextInputFormat](location)
209+
val rdd = sparkSession.sparkContext
210+
.hadoopFile[LongWritable, Text, TextInputFormat](inputPaths.mkString(","))
220211
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
212+
sparkSession.createDataset(rdd)(Encoders.STRING)
221213
}
222214
}
223215

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ import org.apache.spark.sql.types._
3434
object CSVRelation extends Logging {
3535

3636
def univocityTokenizer(
37-
file: RDD[String],
37+
file: Dataset[String],
3838
firstLine: String,
3939
params: CSVOptions): RDD[Array[String]] = {
4040
// If header is set, make sure firstLine is materialized before sending to executors.
4141
val commentPrefix = params.comment.toString
42-
file.mapPartitions { iter =>
42+
file.rdd.mapPartitions { iter =>
4343
val parser = new CsvReader(params)
4444
val filteredIter = iter.filter { line =>
4545
line.trim.nonEmpty && !line.startsWith(commentPrefix)

0 commit comments

Comments
 (0)