Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
e947bcb
[SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs
Nov 20, 2017
53affd4
Simplify length in convertMapStatuses
Nov 21, 2017
e437a26
Simplify totalSize in convertMapStatuses
Nov 24, 2017
12163f3
Modify for external shuffle service
Nov 24, 2017
7aa805b
Solve mima issue
Nov 24, 2017
9cb1f0f
Fix ExternalShuffleBlockResolverSuite
Nov 24, 2017
2799886
Merge remote-tracking branch 'origin/master' into 19788.1
Feb 1, 2018
80c8da9
Remove unnecessary logDebug
Nov 24, 2017
b410f9c
Improve indent
Nov 25, 2017
85a4f49
Using ContinuousShuffleBlockId
Nov 28, 2017
c7db28d
Support ContinuousShuffleBlockId in external shuffle service and fix …
Nov 28, 2017
ff4bece
Fix tests failure
Nov 28, 2017
80bb53c
Add comments for getBlockData in ExternalShuffleBlockResolver
Nov 28, 2017
e18eb59
Update as per internal review
Nov 29, 2017
69cde07
Minor fix
Nov 29, 2017
978e10d
Fix conflict for SPARK-22982
Feb 1, 2018
2a1a97c
Rename ContinuousShuffleBlockId.length to ContinuousShuffleBlockId.nu…
Feb 1, 2018
d8691ee
Rename length to numBlocks
Feb 2, 2018
f30f057
Fix mima issue
Feb 2, 2018
6c684bc
Disable continuous fetch for non-concatenatable compression format
Feb 11, 2018
a9ab62e
continuousBlockBulkFetch should be enabled in adaptive execution only
Feb 11, 2018
87c9af6
Enable ContinuousShuffleBlockId only:
Feb 22, 2018
7fa0fb9
Using isShuffleBlock to simplify
Feb 22, 2018
ab4aa5f
Remove getBlockData, this one is used in ExternalShuffleBlockResolver…
Feb 22, 2018
e5e7c38
Make ShuffleBlockIdBase extend BlockId
Feb 22, 2018
6ee294a
Merge remote-tracking branch 'upstream/master' into shuffle_fetch_opt
Feb 22, 2018
583666b
Fix failure in org.apache.spark.shuffle.BlockStoreShuffleReaderSuite.…
Feb 22, 2018
c133776
Use serializerRelocatable: Boolean
Mar 1, 2018
fc0fe77
Keep FetchFailed unchanged
yucai Mar 1, 2018
7009a5e
Merge remote-tracking branch 'upstream/master' into shuffle_fetch_opt
yucai Mar 3, 2018
4a31e9c
minor
yucai Mar 4, 2018
08d2ca1
Merge remote-tracking branch 'origin/master' into pr19788
yucai Oct 22, 2018
5efa1af
add spark.shuffle.continuousBlockBatchFetch
yucai Oct 24, 2018
3412bcb
make both server and client side back compatible
yucai Dec 26, 2018
63d9eb1
Use ContinuousShuffleBlockId for local shuffle read
yucai Jan 2, 2019
a0cd415
continuousBlockBatchFetch for remote shuffle
yucai Jan 2, 2019
3da48d8
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 2, 2019
c650e65
external shuffle service
yucai Jan 3, 2019
752f90c
remove spark.shuffle.continuousBlockBatchFetch
yucai Jan 4, 2019
b91c3e9
improve encoders
yucai Jan 4, 2019
9ffe59d
rename shuffleBlocksBatchFetch
yucai Jan 4, 2019
9354ed0
address.executorId == blockManager.blockManagerId.executorId
yucai Jan 4, 2019
0f49142
enhance support for failure fetch
yucai Jan 4, 2019
535fe5f
DownloadCallback's onFailure
yucai Jan 5, 2019
087422f
Fix MapOutputTrackerSuite
yucai Jan 5, 2019
2191f8d
Fix tests
yucai Jan 5, 2019
2ca4092
minor
yucai Jan 6, 2019
6496abf
Fix ExternalShuffleIntegrationSuite
yucai Jan 6, 2019
e91dbd5
Fix ShuffleBlockFetcherIteratorSuite
yucai Jan 7, 2019
6cb7110
Fix ExternalShuffleServiceSuite
yucai Jan 7, 2019
05e4465
rename to ShuffleBlockBatchId
yucai Jan 8, 2019
eb752aa
Fix retry scenario
yucai Jan 8, 2019
694ec2d
add tests for ShuffleBlockFetcherIteratorSuite
yucai Jan 8, 2019
45d7096
enable optimization only when AE enables
yucai Jan 8, 2019
4850d33
add testBatchFetchThreeSort
yucai Jan 9, 2019
28355b1
add tests for OpenBlocks and StreamHandle backward compatibility
yucai Jan 9, 2019
7a65cc4
Fix MapOutputTrackerSuite
yucai Jan 9, 2019
a9ffdee
minor
yucai Jan 10, 2019
96b0082
Improve corrupt retry for external shuffle service
yucai Jan 10, 2019
ace48fd
improve getBlockData
yucai Jan 10, 2019
86aa1fb
minor
yucai Jan 10, 2019
fcf434b
minor
yucai Jan 11, 2019
a360bbf
fix all java style issue
yucai Jan 11, 2019
2f28a7e
fix ExternalShuffleBlockHandlerSuite
yucai Jan 11, 2019
5bd6d73
decrease 0-size array creation
yucai Jan 14, 2019
aa6134b
Dont change StreamHandle
yucai Jan 15, 2019
c6ebe0e
address bug
yucai Jan 15, 2019
be54f49
fix java style
yucai Jan 15, 2019
1c91d20
don't use batchId in refetch
yucai Jan 15, 2019
f810e28
java style
yucai Jan 15, 2019
8a1815f
return blockIds when failure
yucai Jan 15, 2019
076894f
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 15, 2019
b33b4dd
using ArrayShuffleBlockId to improve retry
yucai Jan 16, 2019
91aeff9
fetchContinuousShuffleBlocksInBatch == true, OpenBlocks could contain…
yucai Jan 16, 2019
ea1795a
checkout MapOutputTracker.scala
yucai Jan 16, 2019
dd980dd
merge local in ShuffleBlockFetcherIterator
yucai Jan 16, 2019
e9d8620
revert MapOutputTrackerSuite.scala
yucai Jan 16, 2019
5933bf8
local merge
yucai Jan 17, 2019
57fab14
remove ArrayShuffleBlockString
yucai Jan 17, 2019
c75e016
use arrayBlockId in results always
yucai Jan 17, 2019
8398120
minor
yucai Jan 17, 2019
2424be0
remove ShuffleBlockBatchId
yucai Jan 17, 2019
bd9f70e
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 17, 2019
5e4430a
simplify merge function
yucai Jan 19, 2019
401bddb
minor
yucai Jan 20, 2019
3d4fc7e
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 20, 2019
039ae85
address comments
yucai Jan 22, 2019
92c0ab6
minor
yucai Jan 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends Logging {
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
} else {
var totalSize = 0L
for (part <- startPartition until endPartition) {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
totalSize += status.getSizeForBlock(part)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified like: val totalSize = (startPartition until endPartition).map(status.getSizeForXXX).sum.

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n can be numPartitions, and directly get by endPartition - startPartition ?

splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, startPartition, endPartition - startPartition),
totalSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to create some very heavy shuffle fetches - and looks incorrect.
This merge should not be happening here, but in ShuffleBlockFetcherIterator

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
logDebug(s"Fetch block data for $blockId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary to add this, I guess this is mainly for your debug purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will remove it.

Copy link
Contributor Author

@yucai yucai Nov 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this info, it looks hard to know continuous shuffle block read really happens, and getLocalBytes had similar debug info also.

logDebug(s"Getting local block $blockId as bytes")

How about keeping it?

val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
ByteStreams.skipFully(in, (blockId.length - 1) * 8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt this line is not correct, this seems change the semantics, for example if startPartition is 3, endPartition is 8, originally it should be (3*8), now it changes to (4*8), can you please explain more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if length is "1", then this will always be Zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, for example, when startPartition = 3, endPartition = 8, it means we need [3, 8) and length = 5.

Line 204: ByteStreams.skipFully(3 * 8), will skip 0, 1, 2
Line 205: offset = in. readLong, we got startPartition(3)'s offset
Line 206: ByteStreams.skipFully((5 - 1) * 8), will skip 4, 5, 6, 7
Line 207: nextOffset = in.readLong(), now we got endPartition(8)'s offset

When length is "1", zero should be correct. We don't need to skip anything, and Line 207's readLong will get endPartition's offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point, thanks for the explanation.

val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, length: Int = 1)
extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + "_" + length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe s"shuffle_$shuffleId_$mapId_$reduceId_$length"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are semi-public interfaces, can we create a new block id ContinuousShuffleBlockIds?

Copy link
Contributor Author

@yucai yucai Nov 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ContinuousShuffleBlockIds is a good idea, let me try.

}

@DeveloperApi
Expand Down Expand Up @@ -103,7 +104,7 @@ class UnrecognizedBlockId(name: String)
@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
Expand All @@ -116,8 +117,8 @@ object BlockId {
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE(shuffleId, mapId, reduceId, n) =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nit length?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch! I will change here after using ContinuousShuffleBlockId

ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt, n.toInt)
case SHUFFLE_DATA(shuffleId, mapId, reduceId) =>
ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_INDEX(shuffleId, mapId, reduceId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ final class ShuffleBlockFetcherIterator(

private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = {
blockId match {
case ShuffleBlockId(shufId, mapId, reduceId) =>
case ShuffleBlockId(shufId, mapId, reduceId, n) =>
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ class CleanerTester(

private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
case ShuffleBlockId(`shuffleId`, _, _) => true
case ShuffleBlockId(`shuffleId`, _, _, _) => true
case ShuffleIndexBlockId(`shuffleId`, _, _) => true
case _ => false
}, askSlaves = true)
Expand Down
32 changes: 32 additions & 0 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,36 @@ class MapOutputTrackerSuite extends SparkFunSuite {
}
}

test("fetch contiguous partitions") {
val rpcEnv = createRpcEnv("test")
val tracker = newTrackerMaster()
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
tracker.registerShuffle(10, 2)
assert(tracker.containsShuffle(10))
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size2000 = MapStatus.decompressSize(MapStatus.compressSize(2000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1000L, 10000L, 2000L)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(10000L, 2000L, 1000L)))
val statuses1 = tracker.getMapSizesByExecutorId(10, 0, 2)
assert(statuses1.toSet ===
Seq((BlockManagerId("a", "hostA", 1000),
ArrayBuffer((ShuffleBlockId(10, 0, 0, 2), size1000 + size10000))),
(BlockManagerId("b", "hostB", 1000),
ArrayBuffer((ShuffleBlockId(10, 1, 0, 2), size10000 + size2000))))
.toSet)
val statuses2 = tracker.getMapSizesByExecutorId(10, 2, 3)
assert(statuses2.toSet ===
Seq((BlockManagerId("a", "hostA", 1000),
ArrayBuffer((ShuffleBlockId(10, 0, 2, 1), size2000))),
(BlockManagerId("b", "hostB", 1000),
ArrayBuffer((ShuffleBlockId(10, 1, 2, 1), size1000))))
.toSet)
assert(0 == tracker.getNumCachedSerializedBroadcast)
tracker.stop()
rpcEnv.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BlockIdSuite extends SparkFunSuite {
val id = ShuffleBlockId(1, 2, 3)
assertSame(id, ShuffleBlockId(1, 2, 3))
assertDifferent(id, ShuffleBlockId(3, 2, 3))
assert(id.name === "shuffle_1_2_3")
assert(id.name === "shuffle_1_2_3_1")
assert(id.asRDDId === None)
assert(id.shuffleId === 1)
assert(id.mapId === 2)
Expand Down