-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8029][core] shuffleoutput per attempt #6648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 86 commits
d08c20c
89e8428
70a787b
7fbcefb
2eebbf2
7142242
ccaa159
3585b96
de23530
c91ee10
05c72fd
5dc5436
37eece8
31c21fa
a894be1
93592b1
ea2d972
de0a596
6654c53
dd2839d
e684928
2523431
4d976f4
2b723fd
fd40a93
b5d8ec5
9f01d7e
fae9c0c
06daceb
cd16ee8
2006de8
e905f6d
86e651c
b16e7f2
fdcc92d
66d5bf5
289576d
9a06fe2
9befe51
87d7ddd
1072a44
89a93ae
ece31ba
de62da0
a7f2d9a
4bfbf94
f9a1a31
9bdfdc1
b762e22
8bbda62
ff1870a
e2daa05
54948a8
c231221
52eba21
64ead29
3b4159b
7284589
fd81700
659cb45
f1d5c1c
2720425
bcdbf54
b996802
c29fa57
d56f8d8
55a9bb1
7b465a7
9d1189f
c297c78
e3c8df6
78d9614
529aa95
23af915
c288ff9
657b135
f392acc
9cd9c75
90ee54a
26b6ea6
c60c6d4
5547611
c7b3017
812aa0e
26baad9
f37be91
fac0f1c
a38d760
37ac799
c9a9e08
fbd129b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,6 +75,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { | |
| private final ShuffleWriteMetrics writeMetrics; | ||
| private final int shuffleId; | ||
| private final int mapId; | ||
| private final int stageAttemptId; | ||
| private final TaskContext taskContext; | ||
| private final SparkConf sparkConf; | ||
| private final boolean transferToEnabled; | ||
|
|
@@ -91,6 +92,11 @@ private static final class MyByteArrayOutputStream extends ByteArrayOutputStream | |
|
|
||
| private MyByteArrayOutputStream serBuffer; | ||
| private SerializationStream serOutputStream; | ||
| /** | ||
| * This is just to allow tests to explore more code paths, without requiring too much complexity | ||
| * in the test cases. In normal usage, it will be true. | ||
| */ | ||
| private final boolean allowSpillMove; | ||
|
|
||
| /** | ||
| * Are we in the process of stopping? Because map tasks can call stop() with success = true | ||
|
|
@@ -106,6 +112,7 @@ public UnsafeShuffleWriter( | |
| ShuffleMemoryManager shuffleMemoryManager, | ||
| UnsafeShuffleHandle<K, V> handle, | ||
| int mapId, | ||
| int stageAttemptId, | ||
| TaskContext taskContext, | ||
| SparkConf sparkConf) throws IOException { | ||
| final int numPartitions = handle.dependency().partitioner().numPartitions(); | ||
|
|
@@ -119,6 +126,7 @@ public UnsafeShuffleWriter( | |
| this.memoryManager = memoryManager; | ||
| this.shuffleMemoryManager = shuffleMemoryManager; | ||
| this.mapId = mapId; | ||
| this.stageAttemptId = stageAttemptId; | ||
| final ShuffleDependency<K, V, V> dep = handle.dependency(); | ||
| this.shuffleId = dep.shuffleId(); | ||
| this.serializer = Serializer.getSerializer(dep.serializer()).newInstance(); | ||
|
|
@@ -127,6 +135,7 @@ public UnsafeShuffleWriter( | |
| taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics)); | ||
| this.taskContext = taskContext; | ||
| this.sparkConf = sparkConf; | ||
| this.allowSpillMove = sparkConf.getBoolean("spark.shuffle.unsafe.testing.allowSpillMove", true); | ||
| this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); | ||
| open(); | ||
| } | ||
|
|
@@ -226,8 +235,9 @@ void closeAndWriteOutput() throws IOException { | |
| } | ||
| } | ||
| } | ||
| shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); | ||
| mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); | ||
| shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths); | ||
| mapStatus = MapStatus$.MODULE$.apply( | ||
| blockManager.shuffleServerId(), stageAttemptId, partitionLengths); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -260,7 +270,7 @@ void forceSorterToSpill() throws IOException { | |
| * @return the partition lengths in the merged file. | ||
| */ | ||
| private long[] mergeSpills(SpillInfo[] spills) throws IOException { | ||
| final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); | ||
| final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId, stageAttemptId); | ||
| final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); | ||
| final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); | ||
| final boolean fastMergeEnabled = | ||
|
|
@@ -271,7 +281,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { | |
| if (spills.length == 0) { | ||
| new FileOutputStream(outputFile).close(); // Create an empty file | ||
| return new long[partitioner.numPartitions()]; | ||
| } else if (spills.length == 1) { | ||
| } else if (spills.length == 1 && allowSpillMove) { | ||
| // Here, we don't need to perform any metrics updates because the bytes written to this | ||
| // output file would have already been counted as shuffle bytes written. | ||
| Files.move(spills[0].file, outputFile); | ||
|
|
@@ -336,7 +346,7 @@ private long[] mergeSpillsWithFileStream( | |
| SpillInfo[] spills, | ||
| File outputFile, | ||
| @Nullable CompressionCodec compressionCodec) throws IOException { | ||
| assert (spills.length >= 2); | ||
| assert (spills.length >= 2 || !allowSpillMove); | ||
|
||
| final int numPartitions = partitioner.numPartitions(); | ||
| final long[] partitionLengths = new long[numPartitions]; | ||
| final InputStream[] spillInputStreams = new FileInputStream[spills.length]; | ||
|
|
@@ -390,7 +400,7 @@ private long[] mergeSpillsWithFileStream( | |
| * @return the partition lengths in the merged file. | ||
| */ | ||
| private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException { | ||
| assert (spills.length >= 2); | ||
| assert (spills.length >= 2 || !allowSpillMove); | ||
| final int numPartitions = partitioner.numPartitions(); | ||
| final long[] partitionLengths = new long[numPartitions]; | ||
| final FileChannel[] spillInputChannels = new FileChannel[spills.length]; | ||
|
|
@@ -474,7 +484,7 @@ public Option<MapStatus> stop(boolean success) { | |
| return Option.apply(mapStatus); | ||
| } else { | ||
| // The map task failed, so delete our output data. | ||
| shuffleBlockResolver.removeDataByMap(shuffleId, mapId); | ||
| shuffleBlockResolver.removeDataByMap(shuffleId, mapId, stageAttemptId); | ||
| return Option.apply(null); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,9 @@ private[spark] sealed trait MapStatus { | |
| /** Location where this task was run. */ | ||
| def location: BlockManagerId | ||
|
|
||
| /** stage attempt for the ShuffleMapTask */ | ||
| def stageAttemptId: Int | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I needed to update the serialization of |
||
|
|
||
| /** | ||
| * Estimated size for the reduce block, in bytes. | ||
| * | ||
|
|
@@ -44,11 +47,11 @@ private[spark] sealed trait MapStatus { | |
|
|
||
| private[spark] object MapStatus { | ||
|
|
||
| def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { | ||
| def apply(loc: BlockManagerId, stageAttemptId: Int, uncompressedSizes: Array[Long]): MapStatus = { | ||
| if (uncompressedSizes.length > 2000) { | ||
| HighlyCompressedMapStatus(loc, uncompressedSizes) | ||
| HighlyCompressedMapStatus(loc, stageAttemptId, uncompressedSizes) | ||
| } else { | ||
| new CompressedMapStatus(loc, uncompressedSizes) | ||
| new CompressedMapStatus(loc, stageAttemptId, uncompressedSizes) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -91,29 +94,34 @@ private[spark] object MapStatus { | |
| */ | ||
| private[spark] class CompressedMapStatus( | ||
| private[this] var loc: BlockManagerId, | ||
| private[this] var _stageAttemptId: Int, | ||
| private[this] var compressedSizes: Array[Byte]) | ||
| extends MapStatus with Externalizable { | ||
|
|
||
| protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only | ||
| protected def this() = this(null, 0, null.asInstanceOf[Array[Byte]]) // For deserialization only | ||
|
|
||
| def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { | ||
| this(loc, uncompressedSizes.map(MapStatus.compressSize)) | ||
| def this(loc: BlockManagerId, stageAttemptId: Int, uncompressedSizes: Array[Long]) { | ||
| this(loc, stageAttemptId, uncompressedSizes.map(MapStatus.compressSize)) | ||
| } | ||
|
|
||
| override def location: BlockManagerId = loc | ||
|
|
||
| override def stageAttemptId: Int = _stageAttemptId | ||
|
|
||
| override def getSizeForBlock(reduceId: Int): Long = { | ||
| MapStatus.decompressSize(compressedSizes(reduceId)) | ||
| } | ||
|
|
||
| override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { | ||
| loc.writeExternal(out) | ||
| out.writeInt(_stageAttemptId) | ||
| out.writeInt(compressedSizes.length) | ||
| out.write(compressedSizes) | ||
| } | ||
|
|
||
| override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { | ||
| loc = BlockManagerId(in) | ||
| _stageAttemptId = in.readInt() | ||
| val len = in.readInt() | ||
| compressedSizes = new Array[Byte](len) | ||
| in.readFully(compressedSizes) | ||
|
|
@@ -132,7 +140,8 @@ private[spark] class CompressedMapStatus( | |
| */ | ||
| private[spark] class HighlyCompressedMapStatus private ( | ||
| private[this] var loc: BlockManagerId, | ||
| private[this] var numNonEmptyBlocks: Int, | ||
| private[this] var _stageAttemptId: Int, | ||
| numNonEmptyBlocks: Int, | ||
| private[this] var emptyBlocks: RoaringBitmap, | ||
| private[this] var avgSize: Long) | ||
| extends MapStatus with Externalizable { | ||
|
|
@@ -141,10 +150,12 @@ private[spark] class HighlyCompressedMapStatus private ( | |
| require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, | ||
| "Average size can only be zero for map stages that produced no output") | ||
|
|
||
| protected def this() = this(null, -1, null, -1) // For deserialization only | ||
| protected def this() = this(null, 0, -1, null, -1) // For deserialization only | ||
|
|
||
| override def location: BlockManagerId = loc | ||
|
|
||
| override def stageAttemptId: Int = _stageAttemptId | ||
|
|
||
| override def getSizeForBlock(reduceId: Int): Long = { | ||
| if (emptyBlocks.contains(reduceId)) { | ||
| 0 | ||
|
|
@@ -155,20 +166,25 @@ private[spark] class HighlyCompressedMapStatus private ( | |
|
|
||
| override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { | ||
| loc.writeExternal(out) | ||
| out.writeInt(_stageAttemptId) | ||
| emptyBlocks.writeExternal(out) | ||
| out.writeLong(avgSize) | ||
| } | ||
|
|
||
| override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { | ||
| loc = BlockManagerId(in) | ||
| _stageAttemptId = in.readInt() | ||
| emptyBlocks = new RoaringBitmap() | ||
| emptyBlocks.readExternal(in) | ||
| avgSize = in.readLong() | ||
| } | ||
| } | ||
|
|
||
| private[spark] object HighlyCompressedMapStatus { | ||
| def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { | ||
| def apply( | ||
| loc: BlockManagerId, | ||
| stageAttemptId: Int, | ||
| uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { | ||
| // We must keep track of which blocks are empty so that we don't report a zero-sized | ||
| // block as being non-empty (or vice-versa) when using the average block size. | ||
| var i = 0 | ||
|
|
@@ -194,6 +210,6 @@ private[spark] object HighlyCompressedMapStatus { | |
| } else { | ||
| 0 | ||
| } | ||
| new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) | ||
| new HighlyCompressedMapStatus(loc, stageAttemptId, numNonEmptyBlocks, emptyBlocks, avgSize) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is kinda ugly, its just so I can test the different paths through this code without requiring a really complicated example. Open to other suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment in the code explaining it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at the same time you were adding this comment, I put in a comment, but on the variable declaration: https://github.com/apache/spark/pull/6648/files#diff-642ce9f439435408382c3ac3b5c5e0a0R99