Skip to content

Commit 66d5b45

Browse files
committed
Added a WholeTextFileSuite, covering more cases from the corresponding RDD version of the option.
1 parent 7e91020 commit 66d5b45

File tree

2 files changed

+108
-48
lines changed

2 files changed

+108
-48
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -39,54 +39,6 @@ class TextSuite extends QueryTest with SharedSQLContext {
3939
verifyFrame(spark.read.text(testFile))
4040
}
4141

42-
test("reading text file with option wholetext=true") {
43-
val df = spark.read.option("wholetext", "true")
44-
.format("text").load(testFile)
45-
// schema
46-
assert(df.schema == new StructType().add("value", StringType))
47-
48-
// verify content
49-
val data = df.collect()
50-
assert(data(0) ==
51-
Row(
52-
// scalastyle:off nonascii
53-
"""This is a test file for the text data source
54-
|1+1
55-
|数据砖头
56-
|"doh"
57-
|""".stripMargin))
58-
// scalastyle:on nonascii
59-
assert(data.length == 1)
60-
}
61-
62-
test("reading multiple text files with option wholetext=true") {
63-
import org.apache.spark.sql.catalyst.util._
64-
withTempDir { dir =>
65-
val file1 = new File(dir, "text1.txt")
66-
stringToFile(file1,
67-
"""text file 1 contents.
68-
|From: None to: ??
69-
""".stripMargin)
70-
val file2 = new File(dir, "text2.txt")
71-
stringToFile(file2, "text file 2 contents.")
72-
val file3 = new File(dir, "text3.txt")
73-
stringToFile(file3, "text file 3 contents.")
74-
val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath)
75-
// Since wholetext option reads each file into a single row, df.length should be no. of files.
76-
val data = df.sort("value").collect()
77-
assert(data.length == 3)
78-
// Each files should represent a single Row/element in Dataframe/Dataset
79-
assert(data(0) == Row(
80-
"""text file 1 contents.
81-
|From: None to: ??
82-
""".stripMargin))
83-
assert(data(1) == Row(
84-
"""text file 2 contents.""".stripMargin))
85-
assert(data(2) == Row(
86-
"""text file 3 contents.""".stripMargin))
87-
}
88-
}
89-
9042
test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
9143
val df = spark.read.text(testFile).withColumnRenamed("value", "adwrasdf")
9244

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.text
19+
20+
import java.io.File
21+
22+
import org.apache.spark.sql.{QueryTest, Row}
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.test.SharedSQLContext
25+
import org.apache.spark.sql.types.{StringType, StructType}
26+
27+
class WholeTextFileSuite extends QueryTest with SharedSQLContext {
28+
29+
// Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which
30+
// can cause Filesystem.get(Configuration) to return a cached instance created with a different
31+
// configuration than the one passed to get() (see HADOOP-8490 for more details). This caused
32+
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
33+
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
34+
// we disable FileSystem caching in this suite.
35+
protected override def sparkConf =
36+
super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true")
37+
38+
private def testFile: String = {
39+
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
40+
}
41+
42+
test("reading text file with option wholetext=true") {
43+
val df = spark.read.option("wholetext", "true")
44+
.format("text").load(testFile)
45+
// schema
46+
assert(df.schema == new StructType().add("value", StringType))
47+
48+
// verify content
49+
val data = df.collect()
50+
assert(data(0) ==
51+
Row(
52+
// scalastyle:off nonascii
53+
"""This is a test file for the text data source
54+
|1+1
55+
|数据砖头
56+
|"doh"
57+
|""".stripMargin))
58+
// scalastyle:on nonascii
59+
assert(data.length == 1)
60+
}
61+
62+
test("correctness of wholetext option") {
63+
import org.apache.spark.sql.catalyst.util._
64+
withTempDir { dir =>
65+
val file1 = new File(dir, "text1.txt")
66+
stringToFile(file1,
67+
"""text file 1 contents.
68+
|From: None to: ??
69+
""".stripMargin)
70+
val file2 = new File(dir, "text2.txt")
71+
stringToFile(file2, "text file 2 contents.")
72+
val file3 = new File(dir, "text3.txt")
73+
stringToFile(file3, "text file 3 contents.")
74+
val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath)
75+
// Since wholetext option reads each file into a single row, df.length should be no. of files.
76+
val data = df.sort("value").collect()
77+
assert(data.length == 3)
78+
// Each files should represent a single Row/element in Dataframe/Dataset
79+
assert(data(0) == Row(
80+
"""text file 1 contents.
81+
|From: None to: ??
82+
""".stripMargin))
83+
assert(data(1) == Row(
84+
"""text file 2 contents.""".stripMargin))
85+
assert(data(2) == Row(
86+
"""text file 3 contents.""".stripMargin))
87+
}
88+
}
89+
90+
91+
test("Correctness of wholetext option with gzip compression mode.") {
92+
withTempDir { dir =>
93+
val path = dir.getCanonicalPath
94+
val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s")
95+
df1.write.option("compression", "gzip").mode("overwrite").text(path)
96+
97+
val expected = df1.collect()
98+
Seq(10, 100, 1000).foreach { bytes =>
99+
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) {
100+
val df2 = spark.read.option("wholetext", "true").format("text").load(path)
101+
checkAnswer(df2, expected)
102+
}
103+
}
104+
}
105+
}
106+
107+
108+
}

0 commit comments

Comments
 (0)