Skip to content

Commit 40de176

Browse files
ScrapCodesgatorsmile
authored andcommitted
[SPARK-16496][SQL] Add wholetext as option for reading text in SQL.
## What changes were proposed in this pull request? In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API. ## How was this patch tested? Added relevant new tests for both scala and Java APIs Author: Prashant Sharma <prashsh1@in.ibm.com> Author: Prashant Sharma <prashant@apache.org> Closes #14151 from ScrapCodes/SPARK-16496/wholetext.
1 parent 606ae49 commit 40de176

7 files changed

Lines changed: 221 additions & 10 deletions

File tree

python/pyspark/sql/readwriter.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ def parquet(self, *paths):
304304

305305
@ignore_unicode_prefix
306306
@since(1.6)
307-
def text(self, paths):
307+
def text(self, paths, wholetext=False):
308308
"""
309309
Loads text files and returns a :class:`DataFrame` whose schema starts with a
310310
string column named "value", and followed by partitioned columns if there
@@ -313,11 +313,16 @@ def text(self, paths):
313313
Each line in the text file is a new row in the resulting DataFrame.
314314
315315
:param paths: string, or list of strings, for input path(s).
316+
:param wholetext: if true, read each file from input path(s) as a single row.
316317
317318
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
318319
>>> df.collect()
319320
[Row(value=u'hello'), Row(value=u'this')]
321+
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
322+
>>> df.collect()
323+
[Row(value=u'hello\\nthis')]
320324
"""
325+
self._set_opts(wholetext=wholetext)
321326
if isinstance(paths, basestring):
322327
paths = [paths]
323328
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
646646
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
647647
* "value", and followed by partitioned columns if there are any.
648648
*
649-
* Each line in the text files is a new row in the resulting DataFrame. For example:
649+
* You can set the following text-specific option(s) for reading text files:
650+
* <ul>
651+
* <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
652+
* </li>
653+
* </ul>
654+
* By default, each line in the text files is a new row in the resulting DataFrame.
655+
*
656+
* Usage example:
650657
* {{{
651658
* // Scala:
652659
* spark.read.text("/path/to/spark/README.md")
@@ -678,7 +685,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
678685
* If the directory structure of the text files contains partitioning information, those are
679686
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
680687
*
681-
* Each line in the text files is a new element in the resulting Dataset. For example:
688+
* You can set the following textFile-specific option(s) for reading text files:
689+
* <ul>
690+
* <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
691+
* </li>
692+
* </ul>
693+
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
682694
* {{{
683695
* // Scala:
684696
* spark.read.textFile("/path/to/spark/README.md")
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import java.io.Closeable
21+
import java.net.URI
22+
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.fs.Path
25+
import org.apache.hadoop.io.Text
26+
import org.apache.hadoop.mapreduce._
27+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
28+
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
29+
30+
import org.apache.spark.input.WholeTextFileRecordReader
31+
32+
/**
33+
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which is all of the lines
34+
* in that file.
35+
*/
36+
class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration)
37+
extends Iterator[Text] with Closeable {
38+
private val iterator = {
39+
val fileSplit = new CombineFileSplit(
40+
Array(new Path(new URI(file.filePath))),
41+
Array(file.start),
42+
Array(file.length),
43+
// TODO: Implement Locality
44+
Array.empty[String])
45+
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
46+
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
47+
val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0)
48+
reader.initialize(fileSplit, hadoopAttemptContext)
49+
new RecordReaderIterator(reader)
50+
}
51+
52+
override def hasNext: Boolean = iterator.hasNext
53+
54+
override def next(): Text = iterator.next()
55+
56+
override def close(): Unit = iterator.close()
57+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.spark.sql.execution.datasources.text
1919

20+
import java.io.Closeable
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.hadoop.fs.{FileStatus, Path}
24+
import org.apache.hadoop.io.Text
2225
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2326

2427
import org.apache.spark.TaskContext
25-
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
28+
import org.apache.spark.broadcast.Broadcast
29+
import org.apache.spark.sql.{AnalysisException, SparkSession}
2630
import org.apache.spark.sql.catalyst.InternalRow
2731
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2832
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
@@ -53,6 +57,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
5357
}
5458
}
5559

60+
override def isSplitable(
61+
sparkSession: SparkSession,
62+
options: Map[String, String],
63+
path: Path): Boolean = {
64+
val textOptions = new TextOptions(options)
65+
super.isSplitable(sparkSession, options, path) && !textOptions.wholeText
66+
}
67+
5668
override def inferSchema(
5769
sparkSession: SparkSession,
5870
options: Map[String, String],
@@ -97,14 +109,25 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
97109
assert(
98110
requiredSchema.length <= 1,
99111
"Text data source only produces a single data column named \"value\".")
100-
112+
val textOptions = new TextOptions(options)
101113
val broadcastedHadoopConf =
102114
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
103115

116+
readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText)
117+
}
118+
119+
private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration],
120+
requiredSchema: StructType, wholeTextMode: Boolean):
121+
(PartitionedFile) => Iterator[UnsafeRow] = {
122+
104123
(file: PartitionedFile) => {
105-
val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
124+
val confValue = conf.value.value
125+
val reader = if (!wholeTextMode) {
126+
new HadoopFileLinesReader(file, confValue)
127+
} else {
128+
new HadoopFileWholeTextReader(file, confValue)
129+
}
106130
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close()))
107-
108131
if (requiredSchema.isEmpty) {
109132
val emptyUnsafeRow = new UnsafeRow(0)
110133
reader.map(_ => emptyUnsafeRow)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,15 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
3333
* Compression codec to use.
3434
*/
3535
val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName)
36+
37+
/**
38+
* wholetext - If true, read a file as a single row and not split by "\n".
39+
*/
40+
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
41+
3642
}
3743

3844
private[text] object TextOptions {
3945
val COMPRESSION = "compression"
46+
val WHOLETEXT = "wholetext"
4047
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,9 @@ class TextSuite extends QueryTest with SharedSQLContext {
185185
val data = df.collect()
186186
assert(data(0) == Row("This is a test file for the text data source"))
187187
assert(data(1) == Row("1+1"))
188-
// non ascii characters are not allowed in the code, so we disable the scalastyle here.
189-
// scalastyle:off
188+
// scalastyle:off nonascii
190189
assert(data(2) == Row("数据砖头"))
191-
// scalastyle:on
190+
// scalastyle:on nonascii
192191
assert(data(3) == Row("\"doh\""))
193192
assert(data.length == 4)
194193
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.text
19+
20+
import java.io.File
21+
22+
import org.apache.spark.sql.{QueryTest, Row}
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.test.SharedSQLContext
25+
import org.apache.spark.sql.types.{StringType, StructType}
26+
27+
class WholeTextFileSuite extends QueryTest with SharedSQLContext {
28+
29+
// Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which
30+
// can cause Filesystem.get(Configuration) to return a cached instance created with a different
31+
// configuration than the one passed to get() (see HADOOP-8490 for more details). This caused
32+
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
33+
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
34+
// we disable FileSystem caching in this suite.
35+
protected override def sparkConf =
36+
super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true")
37+
38+
private def testFile: String = {
39+
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
40+
}
41+
42+
test("reading text file with option wholetext=true") {
43+
val df = spark.read.option("wholetext", "true")
44+
.format("text").load(testFile)
45+
// schema
46+
assert(df.schema == new StructType().add("value", StringType))
47+
48+
// verify content
49+
val data = df.collect()
50+
assert(data(0) ==
51+
Row(
52+
// scalastyle:off nonascii
53+
"""This is a test file for the text data source
54+
|1+1
55+
|数据砖头
56+
|"doh"
57+
|""".stripMargin))
58+
// scalastyle:on nonascii
59+
assert(data.length == 1)
60+
}
61+
62+
test("correctness of wholetext option") {
63+
import org.apache.spark.sql.catalyst.util._
64+
withTempDir { dir =>
65+
val file1 = new File(dir, "text1.txt")
66+
stringToFile(file1,
67+
"""text file 1 contents.
68+
|From: None to: ??
69+
""".stripMargin)
70+
val file2 = new File(dir, "text2.txt")
71+
stringToFile(file2, "text file 2 contents.")
72+
val file3 = new File(dir, "text3.txt")
73+
stringToFile(file3, "text file 3 contents.")
74+
val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath)
75+
// Since wholetext option reads each file into a single row, df.length should be no. of files.
76+
val data = df.sort("value").collect()
77+
assert(data.length == 3)
78+
// Each files should represent a single Row/element in Dataframe/Dataset
79+
assert(data(0) == Row(
80+
"""text file 1 contents.
81+
|From: None to: ??
82+
""".stripMargin))
83+
assert(data(1) == Row(
84+
"""text file 2 contents.""".stripMargin))
85+
assert(data(2) == Row(
86+
"""text file 3 contents.""".stripMargin))
87+
}
88+
}
89+
90+
91+
test("Correctness of wholetext option with gzip compression mode.") {
92+
withTempDir { dir =>
93+
val path = dir.getCanonicalPath
94+
val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s").repartition(1)
95+
df1.write.option("compression", "gzip").mode("overwrite").text(path)
96+
// On reading through wholetext mode, one file will be read as a single row, i.e. not
97+
// delimited by "next line" character.
98+
val expected = Row(Range(0, 1000).mkString("", "\n", "\n"))
99+
Seq(10, 100, 1000).foreach { bytes =>
100+
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) {
101+
val df2 = spark.read.option("wholetext", "true").format("text").load(path)
102+
val result = df2.collect().head
103+
assert(result === expected)
104+
}
105+
}
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)