From bfea9f59fd7587b87de0ddb4601f76786671f38a Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 18 May 2017 23:29:13 +0800 Subject: [PATCH 1/3] Record accurate size of blocks in MapStatus when it's above threshold. --- .../spark/internal/config/package.scala | 18 ++++++ .../apache/spark/scheduler/MapStatus.scala | 57 +++++++++++++++++-- .../spark/scheduler/MapStatusSuite.scala | 56 +++++++++++++++++- docs/configuration.md | 20 +++++++ 4 files changed, 144 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7f7921d56f49..d29b2ff58d40 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -278,4 +278,22 @@ package object config { "spark.io.compression.codec.") .booleanConf .createWithDefault(false) + + private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD = + ConfigBuilder("spark.shuffle.accurateBlockThreshold") + .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + + "record the size accurately if it's above this config and " + + "spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to prevent" + + " OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(100 * 1024 * 1024) + + private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE = + ConfigBuilder("spark.shuffle.accurateBlockThresholdByTimesAverage") + .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + + "record the size accurately if it's above this config * averageSize and " + + "spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding " + + "underestimating shuffle block size when fetch shuffle blocks.") + .intConf + .createWithDefault(2) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index b2e9a97129f0..08ebbeca988e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,8 +19,13 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.roaringbitmap.RoaringBitmap +import org.apache.spark.SparkEnv +import org.apache.spark.internal.config import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -121,34 +126,42 @@ private[spark] class CompressedMapStatus( } /** - * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, - * plus a bitmap for tracking which blocks are empty. + * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger + * than both spark.shuffle.accurateBlockThreshold and + * spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. It stores the + * average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks + * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, - private[this] var avgSize: Long) + private[this] var avgSize: Long, + @transient private var hugeBlockSizes: Map[Int, Byte]) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + protected def this() = this(null, -1, null, -1, null) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { + assert(hugeBlockSizes != null) if (emptyBlocks.contains(reduceId)) { 0 } else { - avgSize + hugeBlockSizes.get(reduceId) match { + case Some(size) => MapStatus.decompressSize(size) + case None => avgSize + } } } @@ -156,6 +169,11 @@ private[spark] class HighlyCompressedMapStatus private ( loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) + out.writeInt(hugeBlockSizes.size) + hugeBlockSizes.foreach { kv => + out.writeInt(kv._1) + out.writeByte(kv._2) + } } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -163,6 +181,14 @@ private[spark] class HighlyCompressedMapStatus private ( emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() + val count = in.readInt() + val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]() + (0 until count).foreach { _ => + val block = in.readInt() + val size = in.readByte() + hugeBlockSizesArray += Tuple2(block, size) + } + hugeBlockSizes = hugeBlockSizesArray.toMap } } @@ -193,8 +219,27 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } + val threshold1 = Option(SparkEnv.get) + .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) + .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) + val threshold2 = avgSize * Option(SparkEnv.get) + .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE)) + .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.defaultValue.get) + val threshold = math.max(threshold1, threshold2) + val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]() + if (numNonEmptyBlocks > 0) { + i = 0 + while (i < totalNumBlocks) { + if (uncompressedSizes(i) > threshold) { + hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i))) + + } + i += 1 + } + } emptyBlocks.trim() emptyBlocks.runOptimize() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, + hugeBlockSizesArray.toMap) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 759d52fca5ce..c940f7d88f83 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -17,11 +17,15 @@ package org.apache.spark.scheduler +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import scala.util.Random +import org.mockito.Mockito._ import org.roaringbitmap.RoaringBitmap -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.BlockManagerId @@ -128,4 +132,54 @@ class MapStatusSuite extends SparkFunSuite { assert(size1 === size2) assert(!success) } + + test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD is 0, blocks which are bigger than " + + "SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE * averageSize should not be " + + "underestimated.") { + val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "0") + .set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "2") + val env = mock(classOf[SparkEnv]) + doReturn(conf).when(env).conf + SparkEnv.set(env) + // Value of element in sizes is equal to the corresponding index when index >= 1000. + val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray) + val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) + val arrayStream = new ByteArrayOutputStream(102400) + val objectOutputStream = new ObjectOutputStream(arrayStream) + assert(status1.isInstanceOf[HighlyCompressedMapStatus]) + objectOutputStream.writeObject(status1) + objectOutputStream.flush() + val array = arrayStream.toByteArray + val objectInput = new ObjectInputStream(new ByteArrayInputStream(array)) + val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus] + val avg = sizes.sum / 2001 + ((2 * avg + 1) to 2000).foreach { + case part => + assert(status2.getSizeForBlock(part.toInt) >= sizes(part.toInt)) + } + } + + test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE is 0, blocks which are bigger than" + + " SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.") + { + val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000") + .set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "0") + val env = mock(classOf[SparkEnv]) + doReturn(conf).when(env).conf + SparkEnv.set(env) + // Value of element in sizes is equal to the corresponding index. + val sizes = (0L to 2000L).toArray + val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) + val arrayStream = new ByteArrayOutputStream(102400) + val objectOutputStream = new ObjectOutputStream(arrayStream) + assert(status1.isInstanceOf[HighlyCompressedMapStatus]) + objectOutputStream.writeObject(status1) + objectOutputStream.flush() + val array = arrayStream.toByteArray + val objectInput = new ObjectInputStream(new ByteArrayInputStream(array)) + val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus] + (1001 to 2000).foreach { + case part => assert(status2.getSizeForBlock(part) >= sizes(part)) + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 1d8d963016c7..936de3645680 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -612,6 +612,26 @@ Apart from these, the following properties are also available, and may be useful spark.io.compression.codec. + + spark.shuffle.accurateBlockThreshold + 100 * 1024 * 1024 + + When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the + size accurately if it's above this config and + spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to + prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks. + + + + spark.shuffle.accurateBlockThresholdByTimesAverage + 2 + + When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the + size accurately if it's above this config * averageSize and + spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding + underestimating shuffle block size when fetch shuffle blocks. + + spark.io.encryption.enabled false From ca65544fb9d31bcccf51bf360218af9e8eafdf94 Mon Sep 17 00:00:00 2001 From: jinxing Date: Mon, 22 May 2017 10:52:43 +0800 Subject: [PATCH 2/3] remove spark.shuffle.accurateBlockThresholdByTimesAverage --- .../spark/internal/config/package.scala | 13 +----- .../apache/spark/scheduler/MapStatus.scala | 41 ++++++++----------- .../spark/scheduler/MapStatusSuite.scala | 30 +------------- docs/configuration.md | 15 +------ 4 files changed, 21 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d29b2ff58d40..e193ed222e22 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -282,18 +282,9 @@ package object config { private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD = ConfigBuilder("spark.shuffle.accurateBlockThreshold") .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + - "record the size accurately if it's above this config and " + - "spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to prevent" + - " OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.") + "record the size accurately if it's above this config. This helps to prevent OOM by " + + "avoiding underestimating shuffle block size when fetch shuffle blocks.") .bytesConf(ByteUnit.BYTE) .createWithDefault(100 * 1024 * 1024) - private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE = - ConfigBuilder("spark.shuffle.accurateBlockThresholdByTimesAverage") - .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + - "record the size accurately if it's above this config * averageSize and " + - "spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding " + - "underestimating shuffle block size when fetch shuffle blocks.") - .intConf - .createWithDefault(2) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 08ebbeca988e..b93f2232a9d8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -127,14 +127,13 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger - * than both spark.shuffle.accurateBlockThreshold and - * spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. It stores the - * average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty. + * than both spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty + * blocks, plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty - * @param avgSize average size of the non-empty blocks + * @param avgSize average size of the non-empty and non-huge blocks * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private ( extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization - require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, + require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") protected def this() = this(null, -1, null, -1, null) // For deserialization only @@ -204,11 +203,21 @@ private[spark] object HighlyCompressedMapStatus { // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. val emptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length + val threshold = Option(SparkEnv.get) + .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) + .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) + val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]() while (i < totalNumBlocks) { - var size = uncompressedSizes(i) + val size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 - totalSize += size + // Remove the huge blocks from the calculation for average size and have accurate size for + // smaller blocks. + if (size < threshold) { + totalSize += size + } else { + hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i))) + } } else { emptyBlocks.add(i) } @@ -219,24 +228,6 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - val threshold1 = Option(SparkEnv.get) - .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) - .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) - val threshold2 = avgSize * Option(SparkEnv.get) - .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE)) - .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.defaultValue.get) - val threshold = math.max(threshold1, threshold2) - val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]() - if (numNonEmptyBlocks > 0) { - i = 0 - while (i < totalNumBlocks) { - if (uncompressedSizes(i) > threshold) { - hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i))) - - } - i += 1 - } - } emptyBlocks.trim() emptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index c940f7d88f83..3ec37f674c77 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -133,37 +133,9 @@ class MapStatusSuite extends SparkFunSuite { assert(!success) } - test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD is 0, blocks which are bigger than " + - "SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE * averageSize should not be " + + test("Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be " + "underestimated.") { - val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "0") - .set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "2") - val env = mock(classOf[SparkEnv]) - doReturn(conf).when(env).conf - SparkEnv.set(env) - // Value of element in sizes is equal to the corresponding index when index >= 1000. - val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray) - val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) - val arrayStream = new ByteArrayOutputStream(102400) - val objectOutputStream = new ObjectOutputStream(arrayStream) - assert(status1.isInstanceOf[HighlyCompressedMapStatus]) - objectOutputStream.writeObject(status1) - objectOutputStream.flush() - val array = arrayStream.toByteArray - val objectInput = new ObjectInputStream(new ByteArrayInputStream(array)) - val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus] - val avg = sizes.sum / 2001 - ((2 * avg + 1) to 2000).foreach { - case part => - assert(status2.getSizeForBlock(part.toInt) >= sizes(part.toInt)) - } - } - - test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE is 0, blocks which are bigger than" + - " SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.") - { val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000") - .set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "0") val env = mock(classOf[SparkEnv]) doReturn(conf).when(env).conf SparkEnv.set(env) diff --git a/docs/configuration.md b/docs/configuration.md index 936de3645680..a6b6d5dfa5f9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -617,21 +617,10 @@ Apart from these, the following properties are also available, and may be useful 100 * 1024 * 1024 When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the - size accurately if it's above this config and - spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to - prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks. - - - - spark.shuffle.accurateBlockThresholdByTimesAverage - 2 - - When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the - size accurately if it's above this config * averageSize and - spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding + size accurately if it's above this config. This helps to prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks. - + spark.io.encryption.enabled false From 66aa56fd4d23645ec1d2f0e253ea4888b4882f9d Mon Sep 17 00:00:00 2001 From: jinxing Date: Mon, 22 May 2017 18:14:35 +0800 Subject: [PATCH 3/3] Refine docs --- .../main/scala/org/apache/spark/scheduler/MapStatus.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index b93f2232a9d8..048e0d018659 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -127,8 +127,8 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger - * than both spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty - * blocks, plus a bitmap for tracking which blocks are empty. + * than spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty blocks, + * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks @@ -211,8 +211,8 @@ private[spark] object HighlyCompressedMapStatus { val size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 - // Remove the huge blocks from the calculation for average size and have accurate size for - // smaller blocks. + // Huge blocks are not included in the calculation for average size, thus size for smaller + // blocks is more accurate. if (size < threshold) { totalSize += size } else {