Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
68fc364
[SPARK-32919][CORE] Driver side changes for coordinating push based s…
venkata91 Aug 6, 2020
2688df2
Empty commit to add Min Shen to authors list
Victsm Oct 27, 2020
0423970
Addressed some of the comments both from Mridul and Thomas
venkata91 Nov 2, 2020
eb93fe1
Address style check issues
venkata91 Nov 3, 2020
f2f61e2
Address style checks and Mridul's comment
venkata91 Nov 3, 2020
3a6219f
Address Tom's comment
venkata91 Nov 3, 2020
ca44d03
Prefer active executors for merger locations
venkata91 Nov 5, 2020
2172f65
Addressed comments from ngone51 and otterc
venkata91 Nov 5, 2020
1e76824
Address ngone51 review comments
venkata91 Nov 6, 2020
3e03109
Address Mridul's review comments
venkata91 Nov 9, 2020
cbb66eb
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 9, 2020
a1c8831
Addressed review comments
venkata91 Nov 10, 2020
047ad0c
Address review comments
venkata91 Nov 11, 2020
2d6d266
Address ngone51 review comments
venkata91 Nov 12, 2020
2b0c073
pick hosts in random order and remove host from shuffle push merger if a
venkata91 Nov 12, 2020
9ba4dfb
Addressed ngone51 comments
venkata91 Nov 13, 2020
5127d8b
Address review comments
venkata91 Nov 15, 2020
e320ac0
Address attila comments
venkata91 Nov 17, 2020
a2d85ef
Addressed ngone51 review comment
venkata91 Nov 17, 2020
affa8a0
Address attilapiros review comments
venkata91 Nov 17, 2020
46f5670
Add test in UtilsSuite to check push based shuffle is enabled or not
venkata91 Nov 18, 2020
2467a61
Merge branch 'upstream-master' into upstream-SPARK-32919
venkata91 Nov 18, 2020
050a5ae
Address Dongjoon comments
venkata91 Nov 18, 2020
1714829
Merge branch 'upstream-master' into upstream-SPARK-32919
venkata91 Nov 18, 2020
1ba7668
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 19, 2020
5ce2934
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil

def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
if (mergerLocs != null) {
this.mergerLocs = mergerLocs
}
}

def getMergerLocs: Seq[BlockManagerId] = mergerLocs

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1938,4 +1938,42 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)

private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to 'true' to enable push based shuffle on the client side and this works in" +
"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl" +
"which needs to be set with the appropriate" +
"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based" +
"shuffle to be enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
.doc("Maximum number of shuffle push mergers locations cached for push based shuffle." +
"Currently Shuffle push merger locations are nothing but shuffle services where an" +
"executor is launched in the case of Push based shuffle.")
.version("3.1.0")
.intConf
.createWithDefault(500)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
.doc("Minimum percentage of shuffle push mergers locations required to enable push based" +
"shuffle for the stage with respect to number of partitions of the child stage. This is" +
" the number of unique Node Manager locations needed to enable push based shuffle.")
.version("3.1.0")
.doubleConf
.createWithDefault(0.05)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc("Minimum static number of of shuffle push mergers locations should be available in" +
" order to enable push based shuffle for a stage. Note this config works in" +
" conjunction with spark.shuffle.push.mergersMinThresholdRatio")
.version("3.1.0")
.doubleConf
.createWithDefault(5)
}
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/**
* If push based shuffle is enabled, set the shuffle services to be used for the given
* shuffle map stage for block push/merge.
*
* Even with dynamic resource allocation kicking in and significantly reducing the number
* of available active executors, we would still be able to get sufficient shuffle service
* locations for block push/merge by getting the historical locations of past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
// TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot
// TODO: disable shuffle merge for the retry/reuse cases
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
.format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
} else {
logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
}

logDebug(s"List of shuffle push merger locations " +
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -1281,6 +1309,12 @@ private[spark] class DAGScheduler(
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once. This way, even if
// this stage gets retried, it would still be merging blocks using the same set of
// shuffle services.
if (pushBasedShuffleEnabled) {
prepareShuffleServicesForShuffleMapStage(s)
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

/**
* A backend interface for scheduling systems that allows plugging in different ones under
Expand Down Expand Up @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
def maxNumConcurrentTasks(rp: ResourceProfile): Int

/**
* Get the list of both active and dead executors host locations for push based shuffle
*
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
* (for eg: in the case where few partitions are lost due to failure). Hence this method
* should be invoked only once for a ShuffleDependency.
* @return List of external shuffle services locations
*/
def getShufflePushMergerLocations(
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = Nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.storage.BlockManagerMessages.{GetShufflePushMergerLocations, _}
import org.apache.spark.util.{RpcUtils, ThreadUtils}

private[spark]
Expand Down Expand Up @@ -125,6 +125,17 @@ class BlockManagerMaster(
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
* Get a list of unique shuffle service locations where an executor is successfully
* registered in the past for block push/merge with push based shuffle.
*/
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}

def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

// Mapping from host name to shuffle (mergers) services where the current app
// registered an executor in the past. Older hosts are removed when the
// maxRetainedMergerLocations size is reached in favor of newer locations.
private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]()

// Maximum number of merger locations to cache
private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS)

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
Expand Down Expand Up @@ -139,6 +147,9 @@ class BlockManagerMasterEndpoint(
case GetBlockStatus(blockId, askStorageEndpoints) =>
context.reply(blockStatus(blockId, askStorageEndpoints))

case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))

case IsExecutorAlive(executorId) =>
context.reply(blockManagerIdByExecutor.contains(executorId))

Expand Down Expand Up @@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(

}

private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) {
val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host,
StorageUtils.externalShuffleServicePort(conf))
if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
shuffleMergerLocations -= shuffleMergerLocations.head._1
}
shuffleMergerLocations(shuffleServerId.host) = shuffleServerId
}
}

private def removeExecutor(execId: String): Unit = {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down Expand Up @@ -526,6 +548,8 @@ class BlockManagerMasterEndpoint(

blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)

addMergerLocation(id)
Copy link
Member

@Ngone51 Ngone51 Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about only adding the merger location when we know there're no active executors on it. Since we'd prefer active executor locations first now, I think it's almost a redundant copy of the active executor locations normally.

The idea is we could add it when we find there're no active executors after removeExecutor and remove it when there're new executors register on the same host. This's definitely helpful for static resource allocation, although I'm a little bit hesitant about the possible overhead for dynamic resource allocation.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a copy, but a materialized view of candidate hosts where external shuffle has been configured for the current application. It becomes a copy only when there is only 1 executor per host.
The cardinality of this map is, btw, low in comparison to total executors - given multi tenancy and given maxRetainedMergerLocations threshold.

}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
Expand Down Expand Up @@ -657,6 +681,31 @@ class BlockManagerMasterEndpoint(
}
}

private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host)
.mapValues(_.head).values.map(_._2).toSet
val filteredBlockManagersWithExecutors = blockManagersWithExecutors
.filterNot(x => hostsToFilter.contains(x.host))
val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map(
x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf)))

// Enough mergers are available as part of active executors list
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
filteredMergersWithExecutors.toSeq
} else {
// Delta mergers added from inactive mergers list to the active mergers list
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
val filteredMergersWithoutExecutors = shuffleMergerLocations.values
.filterNot(
mergerHost => filteredMergersWithExecutorsHosts.contains(mergerHost.host))
filteredMergersWithExecutors.toSeq ++
filteredMergersWithoutExecutors.toSeq
.take(numMergersNeeded - filteredMergersWithExecutors.size)
}
}

/**
* Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,8 @@ private[spark] object BlockManagerMessages {
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster

case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])
extends ToBlockManagerMaster

}
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.SecureRandom
import java.util.{Arrays, Locale, Properties, Random, UUID}
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.zip.GZIPInputStream
Expand Down Expand Up @@ -2541,6 +2541,15 @@ private[spark] object Utils extends Logging {
master == "local" || master.startsWith("local[")
}

/**
* Push based shuffle can only be enabled when external shuffle service is enabled.
* In the initial version, we cannot support pushed based shuffle and adaptive execution
* at the same time. Will improve this in a later version.
*/
def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
conf.get(PUSH_BASED_SHUFFLE_ENABLED) && conf.get(SHUFFLE_SERVICE_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I look forward to the day when the second condition will be disabled :-)
It will be relevant for both k8s and spark streaming !

+CC @dongjoon-hyun you might be interested in this in future.

}

/**
* Return whether dynamic allocation is enabled in the given conf.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,48 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}

test("Shuffle push merger locations should be bounded with in" +
" spark.shuffle.push.retainedMergerLocations") {
assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty)
makeBlockManager(100, "execA",
transferService = Some(new MockBlockTransferService(10, "hostA")))
makeBlockManager(100, "execB",
transferService = Some(new MockBlockTransferService(10, "hostB")))
makeBlockManager(100, "execC",
transferService = Some(new MockBlockTransferService(10, "hostC")))
makeBlockManager(100, "execD",
transferService = Some(new MockBlockTransferService(10, "hostD")))
makeBlockManager(100, "execE",
transferService = Some(new MockBlockTransferService(10, "hostA")))
assert(master.getShufflePushMergerLocations(10, Set.empty).size == 4)
assert(master.getShufflePushMergerLocations(10, Set.empty)
.exists(x => Seq("hostC", "hostD", "hostA", "hostB").contains(x.host)))
assert(master.getShufflePushMergerLocations(10, Set("hostB")).size == 3)
}

test("Prefer active executors locations for shuffle push mergers") {
makeBlockManager(100, "execA",
transferService = Some(new MockBlockTransferService(10, "hostA")))
makeBlockManager(100, "execB",
transferService = Some(new MockBlockTransferService(10, "hostB")))
makeBlockManager(100, "execC",
transferService = Some(new MockBlockTransferService(10, "hostC")))
makeBlockManager(100, "execD",
transferService = Some(new MockBlockTransferService(10, "hostD")))
makeBlockManager(100, "execE",
transferService = Some(new MockBlockTransferService(10, "hostA")))
assert(master.getShufflePushMergerLocations(5, Set.empty).size == 4)

master.removeExecutor("execA")
master.removeExecutor("execE")

assert(master.getShufflePushMergerLocations(3, Set.empty).size == 3)
val expectedHosts = Set("hostB", "hostC", "hostD")
val shufflePushMergers = master
.getShufflePushMergerLocations(3, Set.empty).map(x => x.host).toSet
assert(expectedHosts.forall(x => shufflePushMergers.contains(x)))
}

test("SPARK-33387 Support ordered shuffle block migration") {
val blocks: Seq[ShuffleBlockInfo] = Seq(
ShuffleBlockInfo(1, 0L),
Expand All @@ -1995,7 +2037,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(sortedBlocks.sameElements(decomManager.shufflesToMigrate.asScala.map(_._1)))
}

class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
class MockBlockTransferService(
val maxFailures: Int,
hostname: String = "MockBlockTransferServiceHost") extends BlockTransferService {
var numCalls = 0
var tempFileManager: DownloadFileManager = null

Expand All @@ -2013,7 +2057,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

override def close(): Unit = {}

override def hostName: String = { "MockBlockTransferServiceHost" }
override def hostName: String = { hostname }

override def port: Int = { 63332 }

Expand Down
Loading