Skip to content

Commit b550b2a

Browse files
MaxGekkgatorsmile
authored andcommitted
[SPARK-24325] Tests for Hadoop's LinesReader
## What changes were proposed in this pull request? The tests cover basic functionality of [Hadoop LinesReader](https://github.com/apache/spark/blob/8d79113b812a91073d2c24a3a9ad94cc3b90b24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala#L42). In particular, the added tests check: - A split slices a line or delimiter - A split slices two consecutive lines and cover a delimiter between the lines - Two splits slice a line and there are no duplicates - Internal buffer size (`io.file.buffer.size`) is less than line length - Constrain of maximum line length - `mapreduce.input.linerecordreader.line.maxlength` Author: Maxim Gekk <maxim.gekk@databricks.com> Closes apache#21377 from MaxGekk/line-reader-tests.
1 parent ffaefe7 commit b550b2a

1 file changed

Lines changed: 137 additions & 0 deletions

File tree

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import java.io.File
21+
import java.nio.charset.StandardCharsets
22+
import java.nio.file.Files
23+
24+
import org.apache.hadoop.conf.Configuration
25+
26+
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.test.SharedSQLContext
28+
29+
class HadoopFileLinesReaderSuite extends SharedSQLContext {
30+
def getLines(
31+
path: File,
32+
text: String,
33+
ranges: Seq[(Long, Long)],
34+
delimiter: Option[String] = None,
35+
conf: Option[Configuration] = None): Seq[String] = {
36+
val delimOpt = delimiter.map(_.getBytes(StandardCharsets.UTF_8))
37+
Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8))
38+
39+
val lines = ranges.map { case (start, length) =>
40+
val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length)
41+
val hadoopConf = conf.getOrElse(spark.sparkContext.hadoopConfiguration)
42+
val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)
43+
44+
reader.map(_.toString)
45+
}.flatten
46+
47+
lines
48+
}
49+
50+
test("A split ends at the delimiter") {
51+
withTempPath { path =>
52+
val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 1), (1, 3)))
53+
assert(lines == Seq("a", "b"))
54+
}
55+
}
56+
57+
test("A split cuts the delimiter") {
58+
withTempPath { path =>
59+
val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 2), (2, 2)))
60+
assert(lines == Seq("a", "b"))
61+
}
62+
}
63+
64+
test("A split ends at the end of the delimiter") {
65+
withTempPath { path =>
66+
val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 3), (3, 1)))
67+
assert(lines == Seq("a", "b"))
68+
}
69+
}
70+
71+
test("A split covers two lines") {
72+
withTempPath { path =>
73+
val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 4), (4, 1)))
74+
assert(lines == Seq("a", "b"))
75+
}
76+
}
77+
78+
test("A split ends at the custom delimiter") {
79+
withTempPath { path =>
80+
val lines = getLines(path, text = "a^_^b", ranges = Seq((0, 1), (1, 4)), Some("^_^"))
81+
assert(lines == Seq("a", "b"))
82+
}
83+
}
84+
85+
test("A split slices the custom delimiter") {
86+
withTempPath { path =>
87+
val lines = getLines(path, text = "a^_^b", ranges = Seq((0, 2), (2, 3)), Some("^_^"))
88+
assert(lines == Seq("a", "b"))
89+
}
90+
}
91+
92+
test("The first split covers the first line and the custom delimiter") {
93+
withTempPath { path =>
94+
val lines = getLines(path, text = "a^_^b", ranges = Seq((0, 4), (4, 1)), Some("^_^"))
95+
assert(lines == Seq("a", "b"))
96+
}
97+
}
98+
99+
test("A split cuts the first line") {
100+
withTempPath { path =>
101+
val lines = getLines(path, text = "abc,def", ranges = Seq((0, 1)), Some(","))
102+
assert(lines == Seq("abc"))
103+
}
104+
}
105+
106+
test("The split cuts both lines") {
107+
withTempPath { path =>
108+
val lines = getLines(path, text = "abc,def", ranges = Seq((2, 2)), Some(","))
109+
assert(lines == Seq("def"))
110+
}
111+
}
112+
113+
test("io.file.buffer.size is less than line length") {
114+
val conf = spark.sparkContext.hadoopConfiguration
115+
conf.set("io.file.buffer.size", "2")
116+
withTempPath { path =>
117+
val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5)))
118+
assert(lines == Seq("123456"))
119+
}
120+
}
121+
122+
test("line cannot be longer than line.maxlength") {
123+
val conf = spark.sparkContext.hadoopConfiguration
124+
conf.set("mapreduce.input.linerecordreader.line.maxlength", "5")
125+
withTempPath { path =>
126+
val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15)))
127+
assert(lines == Seq("1234"))
128+
}
129+
}
130+
131+
test("default delimiter is 0xd or 0xa or 0xd0xa") {
132+
withTempPath { path =>
133+
val lines = getLines(path, text = "1\r2\n3\r\n4", ranges = Seq((0, 3), (3, 5)))
134+
assert(lines == Seq("1", "2", "3", "4"))
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)