Skip to content

Commit f5f21e8

Browse files
Glen Takahashicloud-fan
authored andcommitted
[SPARK-23249][SQL] Improved block merging logic for partitions
## What changes were proposed in this pull request? Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions. ## How was this patch tested? Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3 Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346 ## Rationale The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes. As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2. More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition. This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD. Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board Author: Glen Takahashi <[email protected]> Closes #20372 from glentakahashi/feature/improved-block-merging. (cherry picked from commit 8c21170) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 33f17b2 commit f5f21e8

2 files changed

Lines changed: 27 additions & 17 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -445,16 +445,29 @@ case class FileSourceScanExec(
445445
currentSize = 0
446446
}
447447

448-
// Assign files to partitions using "Next Fit Decreasing"
449-
splitFiles.foreach { file =>
450-
if (currentSize + file.length > maxSplitBytes) {
451-
closePartition()
448+
def addFile(file: PartitionedFile): Unit = {
449+
currentFiles += file
450+
currentSize += file.length + openCostInBytes
451+
}
452+
453+
var frontIndex = 0
454+
var backIndex = splitFiles.length - 1
455+
456+
while (frontIndex <= backIndex) {
457+
addFile(splitFiles(frontIndex))
458+
frontIndex += 1
459+
while (frontIndex <= backIndex &&
460+
currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
461+
addFile(splitFiles(frontIndex))
462+
frontIndex += 1
463+
}
464+
while (backIndex > frontIndex &&
465+
currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
466+
addFile(splitFiles(backIndex))
467+
backIndex -= 1
452468
}
453-
// Add the given file to the current partition.
454-
currentSize += file.length + openCostInBytes
455-
currentFiles += file
469+
closePartition()
456470
}
457-
closePartition()
458471

459472
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
460473
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
141141
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
142142
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
143143
checkScan(table.select('c1)) { partitions =>
144-
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
145-
assert(partitions.size == 4, "when checking partitions")
146-
assert(partitions(0).files.size == 1, "when checking partition 1")
144+
// Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
145+
assert(partitions.size == 3, "when checking partitions")
146+
assert(partitions(0).files.size == 2, "when checking partition 1")
147147
assert(partitions(1).files.size == 2, "when checking partition 2")
148148
assert(partitions(2).files.size == 2, "when checking partition 3")
149-
assert(partitions(3).files.size == 1, "when checking partition 4")
150149

151-
// First partition reads (file1)
150+
// First partition reads (file1, file6)
152151
assert(partitions(0).files(0).start == 0)
153152
assert(partitions(0).files(0).length == 2)
153+
assert(partitions(0).files(1).start == 0)
154+
assert(partitions(0).files(1).length == 1)
154155

155156
// Second partition reads (file2, file3)
156157
assert(partitions(1).files(0).start == 0)
@@ -163,10 +164,6 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
163164
assert(partitions(2).files(0).length == 1)
164165
assert(partitions(2).files(1).start == 0)
165166
assert(partitions(2).files(1).length == 1)
166-
167-
// Final partition reads (file6)
168-
assert(partitions(3).files(0).start == 0)
169-
assert(partitions(3).files(0).length == 1)
170167
}
171168

172169
checkPartitionSchema(StructType(Nil))

0 commit comments

Comments
 (0)