File tree Expand file tree Collapse file tree
core/src/main/scala/org/apache/spark Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -300,6 +300,13 @@ package object config {
300300 .booleanConf
301301 .createWithDefault(false )
302302
303+ private [spark] val SHUFFLE_HIGHLY_COMPRESSED_MAPSTATUS_THRESHOLD =
304+ ConfigBuilder (" spark.shuffle.highlyCompressedMapStatusThreshold" )
305+ .doc(" Compress the size of shuffle blocks in HighlyCompressedMapStatus when the number of" +
306+ " reduce partitions is above this threshold." )
307+ .intConf
308+ .createWithDefault(2000 )
309+
303310 private [spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
304311 ConfigBuilder (" spark.shuffle.accurateBlockThreshold" )
305312 .doc(" When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
Original file line number Diff line number Diff line change @@ -50,7 +50,10 @@ private[spark] sealed trait MapStatus {
5050private [spark] object MapStatus {
5151
5252 def apply (loc : BlockManagerId , uncompressedSizes : Array [Long ]): MapStatus = {
53- if (uncompressedSizes.length > 2000 ) {
53+ val threshold = Option (SparkEnv .get)
54+ .map(_.conf.get(config.SHUFFLE_HIGHLY_COMPRESSED_MAPSTATUS_THRESHOLD ))
55+ .getOrElse(config.SHUFFLE_HIGHLY_COMPRESSED_MAPSTATUS_THRESHOLD .defaultValue.get)
56+ if (uncompressedSizes.length > threshold) {
5457 HighlyCompressedMapStatus (loc, uncompressedSizes)
5558 } else {
5659 new CompressedMapStatus (loc, uncompressedSizes)
You can’t perform that action at this time.
0 commit comments