Skip to content

Commit d84f1a3

Browse files
cxzl25Mridul Muralidharan
authored andcommitted
[SPARK-49217][CORE] Support separate buffer size configuration in UnsafeShuffleWriter
### What changes were proposed in this pull request? This PR aims to support separate buffer size configuration in UnsafeShuffleWriter. Introduce `spark.shuffle.file.merge.buffer` configuration. ### Why are the changes needed? `UnsafeShuffleWriter#mergeSpillsWithFileStream` uses `spark.shuffle.file.buffer` as the buffer for reading spill files, and this buffer is an off-heap buffer. In the spill process, we hope that the buffer size is larger, but once there are too many files in the spill, `UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of off-heap memory, which makes the executor easily killed by YARN. https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Production environment verification ### Was this patch authored or co-authored using generative AI tooling? No Closes #47733 from cxzl25/SPARK-49217. Authored-by: sychen <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 70c9b94 commit d84f1a3

File tree

3 files changed

+22
-4
lines changed

3 files changed

+22
-4
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
8787
private final SparkConf sparkConf;
8888
private final boolean transferToEnabled;
8989
private final int initialSortBufferSize;
90-
private final int inputBufferSizeInBytes;
90+
private final int mergeBufferSizeInBytes;
9191

9292
@Nullable private MapStatus mapStatus;
9393
@Nullable private ShuffleExternalSorter sorter;
@@ -140,8 +140,8 @@ public UnsafeShuffleWriter(
140140
this.transferToEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
141141
this.initialSortBufferSize =
142142
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
143-
this.inputBufferSizeInBytes =
144-
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
143+
this.mergeBufferSizeInBytes =
144+
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_MERGE_BUFFER_SIZE()) * 1024;
145145
open();
146146
}
147147

@@ -372,7 +372,7 @@ private void mergeSpillsWithFileStream(
372372
for (int i = 0; i < spills.length; i++) {
373373
spillInputStreams[i] = new NioBufferedFileInputStream(
374374
spills[i].file,
375-
inputBufferSizeInBytes);
375+
mergeBufferSizeInBytes);
376376
// Only convert the partitionLengths when debug level is enabled.
377377
if (logger.isDebugEnabled()) {
378378
logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i,

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,14 @@ package object config {
14661466
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
14671467
.createWithDefaultString("32k")
14681468

1469+
private[spark] val SHUFFLE_FILE_MERGE_BUFFER_SIZE =
1470+
ConfigBuilder("spark.shuffle.file.merge.buffer")
1471+
.doc("Size of the in-memory buffer for each shuffle file input stream, in KiB unless " +
1472+
"otherwise specified. These buffers use off-heap buffers and are related to the number " +
1473+
"of files in the shuffle file. Too large buffers should be avoided.")
1474+
.version("4.0.0")
1475+
.fallbackConf(SHUFFLE_FILE_BUFFER_SIZE)
1476+
14691477
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
14701478
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
14711479
.doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)")

docs/configuration.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,16 @@ Apart from these, the following properties are also available, and may be useful
10291029
</td>
10301030
<td>1.4.0</td>
10311031
</tr>
1032+
<tr>
1033+
<td><code>spark.shuffle.file.merge.buffer</code></td>
1034+
<td>32k</td>
1035+
<td>
1036+
Size of the in-memory buffer for each shuffle file input stream, in KiB unless otherwise
1037+
specified. These buffers use off-heap buffers and are related to the number of files in
1038+
the shuffle file. Too large buffers should be avoided.
1039+
</td>
1040+
<td>4.0.0</td>
1041+
</tr>
10321042
<tr>
10331043
<td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
10341044
<td>32k</td>

0 commit comments

Comments
 (0)