Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
p.files.map { f =>
p.files.filter(_.getLen > 0).map { f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do the filtering inside the map?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean changing filter...map... to flatMap? I don't have a strong preference about it.

The updated test cases and the new test case are for this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer filter + map as it's shorter and clearer. I don't know if one is faster; two transformations vs having to return Some/None. For a Dataset operation I'd favor one operation, but this is just local Scala code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-critical path in terms of performance. Should be okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This createBucketedReadRDD is for the bucket table, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and the same change is also in createNonBucketedReadRDD

val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}
Expand Down Expand Up @@ -438,7 +438,7 @@ case class FileSourceScanExec(
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
partition.files.filter(_.getLen > 0).flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1843,7 +1843,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
.write
.text(path)

Expand Down Expand Up @@ -1898,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.text(path)

val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
assert(jsonDF.count() === corruptRecordCount)
assert(jsonDF.schema === new StructType()
.add("_corrupt_record", StringType)
.add("dummy", StringType))
Expand All @@ -1911,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
checkAnswer(counts, Row(1, 4, 6))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.sources

import java.io.File
import java.nio.file.{Files, Paths}

import org.scalatest.BeforeAndAfter

Expand Down Expand Up @@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog"))
}
}

test("skip empty files in load") {
withTempDir { dir =>
val path = dir.getCanonicalPath
Files.write(Paths.get(path, "empty"), Array.empty[Byte])
Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for consistency: .getBytes(StandardCharsets.UTF_8)

val readback = spark.read.option("wholetext", true).text(path)

assert(readback.rdd.getNumPartitions == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need 1 === ... to get the right assert message? it's tiny.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems expected value should be on right. I changed the order and got the following:

assert(123 === readback.rdd.getNumPartitions)
123 did not equal 1
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155)
Expected :1
Actual   :123

Current assert triggers correct message:

assert(readback.rdd.getNumPartitions == 123)
1 did not equal 123
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155)
Expected :123
Actual   :1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just referring to === and the order of args. I'm sure the test was right as-is in what it asserts.

}
}
}