Skip to content

Commit 35caf2d

Browse files
Ngone51dongjoon-hyun
authored andcommitted
Revert "[SPARK-35011][CORE][3.0] Avoid Block Manager registrations when StopExecutor msg is in-flight"
This reverts commit 0a31f1f. ### What changes were proposed in this pull request? Revert #33782 ### Why are the changes needed? It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check: https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass existing tests. Closes #33961 from Ngone51/revert-35011-3.0. Authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8c00df3 commit 35caf2d

4 files changed

Lines changed: 32 additions & 101 deletions

File tree

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
8080
// executor ID -> timestamp of when the last heartbeat from this executor was received
8181
private val executorLastSeen = new HashMap[String, Long]
8282

83-
private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf)
83+
private val executorTimeoutMs = sc.conf.get(
84+
config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT
85+
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
8486

8587
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
8688

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 25 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,6 @@ class BlockManagerMasterEndpoint(
8282
mapper
8383
}
8484

85-
private val executorTimeoutMs = Utils.executorTimeoutMs(conf)
86-
private val blockManagerInfoCleaner = {
87-
val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L)
88-
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner")
89-
executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay,
90-
TimeUnit.MILLISECONDS)
91-
executor
92-
}
93-
9485
val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)
9586

9687
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
@@ -199,12 +190,12 @@ class BlockManagerMasterEndpoint(
199190
}
200191
}
201192
bmIdsExecutor.foreach { bmId =>
202-
aliveBlockManagerInfo(bmId).foreach { bmInfo =>
193+
blockManagerInfo.get(bmId).foreach { bmInfo =>
203194
bmInfo.removeBlock(blockId)
204195
}
205196
}
206197
}
207-
val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo =>
198+
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
208199
bmInfo.slaveEndpoint.ask[Int](removeMsg).recover {
209200
case e: IOException =>
210201
logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " +
@@ -233,7 +224,7 @@ class BlockManagerMasterEndpoint(
233224
// Nothing to do in the BlockManagerMasterEndpoint data structures
234225
val removeMsg = RemoveShuffle(shuffleId)
235226
Future.sequence(
236-
allAliveBlockManagerInfos.map { bm =>
227+
blockManagerInfo.values.map { bm =>
237228
bm.slaveEndpoint.ask[Boolean](removeMsg)
238229
}.toSeq
239230
)
@@ -246,7 +237,7 @@ class BlockManagerMasterEndpoint(
246237
*/
247238
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
248239
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
249-
val requiredBlockManagers = allAliveBlockManagerInfos.filter { info =>
240+
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
250241
removeFromDriver || !info.blockManagerId.isDriver
251242
}
252243
val futures = requiredBlockManagers.map { bm =>
@@ -264,23 +255,12 @@ class BlockManagerMasterEndpoint(
264255
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
265256
val info = blockManagerInfo(blockManagerId)
266257

267-
// SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal
268-
// timestamp of the executor in BlockManagerInfo. This info will be removed from
269-
// blockManagerInfo map by the blockManagerInfoCleaner once
270-
// now() - info.executorRemovalTs > executorTimeoutMs.
271-
//
272-
// We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
273-
// while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
274-
// of executors in Spark.
275-
// Delaying this removal until blockManagerInfoCleaner decides to remove it ensures
276-
// BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed
277-
// executor to reregister on BlockManagerHeartbeat message.
278-
info.setExecutorRemovalTs()
279-
280258
// Remove the block manager from blockManagerIdByExecutor.
281259
blockManagerIdByExecutor -= blockManagerId.executorId
282260

283-
// remove all the blocks.
261+
// Remove it from blockManagerInfo and remove all the blocks.
262+
blockManagerInfo.remove(blockManagerId)
263+
284264
val iterator = info.blocks.keySet.iterator
285265
while (iterator.hasNext) {
286266
val blockId = iterator.next
@@ -301,7 +281,7 @@ class BlockManagerMasterEndpoint(
301281
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
302282
val blockLocations = locations.toSeq
303283
val candidateBMId = blockLocations(i)
304-
aliveBlockManagerInfo(candidateBMId).foreach { bm =>
284+
blockManagerInfo.get(candidateBMId).foreach { bm =>
305285
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
306286
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
307287
bm.slaveEndpoint.ask[Boolean](replicateMsg)
@@ -325,26 +305,27 @@ class BlockManagerMasterEndpoint(
325305
val locations = blockLocations.get(blockId)
326306
if (locations != null) {
327307
locations.foreach { blockManagerId: BlockManagerId =>
328-
aliveBlockManagerInfo(blockManagerId).foreach { bm =>
329-
// Remove the block from the BlockManager.
308+
val blockManager = blockManagerInfo.get(blockManagerId)
309+
if (blockManager.isDefined) {
310+
// Remove the block from the slave's BlockManager.
330311
// Doesn't actually wait for a confirmation and the message might get lost.
331312
// If message loss becomes frequent, we should add retry logic here.
332-
bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
313+
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
333314
}
334315
}
335316
}
336317
}
337318

338319
// Return a map from the block manager id to max memory and remaining memory.
339320
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
340-
allAliveBlockManagerInfos.map { info =>
341-
(info.blockManagerId, (info.maxMem, info.remainingMem))
321+
blockManagerInfo.map { case(blockManagerId, info) =>
322+
(blockManagerId, (info.maxMem, info.remainingMem))
342323
}.toMap
343324
}
344325

345326
private def storageStatus: Array[StorageStatus] = {
346-
allAliveBlockManagerInfos.map { info =>
347-
new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
327+
blockManagerInfo.map { case (blockManagerId, info) =>
328+
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
348329
Some(info.maxOffHeapMem), info.blocks.asScala)
349330
}.toArray
350331
}
@@ -366,7 +347,7 @@ class BlockManagerMasterEndpoint(
366347
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
367348
* that is also waiting for this master endpoint's response to a previous message.
368349
*/
369-
allAliveBlockManagerInfos.map { info =>
350+
blockManagerInfo.values.map { info =>
370351
val blockStatusFuture =
371352
if (askSlaves) {
372353
info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
@@ -390,7 +371,7 @@ class BlockManagerMasterEndpoint(
390371
askSlaves: Boolean): Future[Seq[BlockId]] = {
391372
val getMatchingBlockIds = GetMatchingBlockIds(filter)
392373
Future.sequence(
393-
allAliveBlockManagerInfos.map { info =>
374+
blockManagerInfo.values.map { info =>
394375
val future =
395376
if (askSlaves) {
396377
info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
@@ -452,10 +433,9 @@ class BlockManagerMasterEndpoint(
452433

453434
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
454435
maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus)
455-
456-
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
457-
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
458436
}
437+
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
438+
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
459439
id
460440
}
461441

@@ -525,7 +505,7 @@ class BlockManagerMasterEndpoint(
525505
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
526506
Option(blockStatusByShuffleService(bmId).get(blockId))
527507
} else {
528-
aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId))
508+
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
529509
}
530510
}
531511

@@ -536,7 +516,8 @@ class BlockManagerMasterEndpoint(
536516
// can be used to access this block even when the original executor is already stopped.
537517
loc.host == requesterHost &&
538518
(loc.port == externalShuffleServicePort ||
539-
aliveBlockManagerInfo(loc)
519+
blockManagerInfo
520+
.get(loc)
540521
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
541522
.getOrElse(false))
542523
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
@@ -553,7 +534,7 @@ class BlockManagerMasterEndpoint(
553534

554535
/** Get the list of the peers of the given block manager */
555536
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
556-
val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet
537+
val blockManagerIds = blockManagerInfo.keySet
557538
if (blockManagerIds.contains(blockManagerId)) {
558539
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
559540
} else {
@@ -567,35 +548,15 @@ class BlockManagerMasterEndpoint(
567548
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
568549
for (
569550
blockManagerId <- blockManagerIdByExecutor.get(executorId);
570-
info <- aliveBlockManagerInfo(blockManagerId)
551+
info <- blockManagerInfo.get(blockManagerId)
571552
) yield {
572553
info.slaveEndpoint
573554
}
574555
}
575556

576557
override def onStop(): Unit = {
577558
askThreadPool.shutdownNow()
578-
blockManagerInfoCleaner.shutdownNow()
579-
}
580-
581-
private def cleanBlockManagerInfo(): Unit = {
582-
logDebug("Cleaning blockManagerInfo")
583-
val now = System.currentTimeMillis()
584-
val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
585-
// bmInfo.executorRemovalTs.get cannot be None when BM is not alive
586-
!bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
587-
}.keys
588-
expiredBmIds.foreach { bmId =>
589-
logInfo(s"Cleaning expired $bmId from blockManagerInfo")
590-
blockManagerInfo.remove(bmId)
591-
}
592559
}
593-
594-
@inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
595-
blockManagerInfo.get(bmId).filter(_.isAlive)
596-
597-
@inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] =
598-
blockManagerInfo.values.filter(_.isAlive)
599560
}
600561

601562
@DeveloperApi
@@ -623,7 +584,6 @@ private[spark] class BlockManagerInfo(
623584

624585
private var _lastSeenMs: Long = timeMs
625586
private var _remainingMem: Long = maxMem
626-
private var _executorRemovalTs: Option[Long] = None
627587

628588
// Mapping from block id to its status.
629589
private val _blocks = new JHashMap[BlockId, BlockStatus]
@@ -739,16 +699,4 @@ private[spark] class BlockManagerInfo(
739699
def clear(): Unit = {
740700
_blocks.clear()
741701
}
742-
743-
def executorRemovalTs: Option[Long] = _executorRemovalTs
744-
745-
def isAlive: Boolean = _executorRemovalTs.isEmpty
746-
747-
def setExecutorRemovalTs(): Unit = {
748-
if (!isAlive) {
749-
logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
750-
} else {
751-
_executorRemovalTs = Some(System.currentTimeMillis())
752-
}
753-
}
754702
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2934,13 +2934,6 @@ private[spark] object Utils extends Logging {
29342934
props.forEach((k, v) => resultProps.put(k, v))
29352935
resultProps
29362936
}
2937-
2938-
def executorTimeoutMs(conf: SparkConf): Long = {
2939-
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
2940-
// "milliseconds"
2941-
conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
2942-
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
2943-
}
29442937
}
29452938

29462939
private[util] object CallerContext extends Logging {

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.mutable
2525
import scala.collection.mutable.ArrayBuffer
2626
import scala.concurrent.Future
2727
import scala.concurrent.duration._
28-
import scala.language.{implicitConversions, postfixOps}
28+
import scala.language.implicitConversions
2929
import scala.reflect.ClassTag
3030

3131
import org.apache.commons.lang3.RandomUtils
@@ -93,7 +93,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
9393
.set(MEMORY_STORAGE_FRACTION, 0.999)
9494
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
9595
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
96-
.set(STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "5s")
9796
}
9897

9998
private def makeBlockManager(
@@ -482,7 +481,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
482481
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
483482
}
484483

485-
test("no reregistration on heart beat until executor timeout") {
484+
test("reregistration on heart beat") {
486485
val store = makeBlockManager(2000)
487486
val a1 = new Array[Byte](400)
488487

@@ -493,15 +492,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
493492

494493
master.removeExecutor(store.blockManagerId.executorId)
495494
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
495+
496496
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
497497
BlockManagerHeartbeat(store.blockManagerId))
498-
assert(reregister == false, "master told to re-register")
499-
500-
eventually(timeout(10 seconds), interval(1 seconds)) {
501-
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
502-
BlockManagerHeartbeat(store.blockManagerId))
503-
assert(reregister, "master did not tell to re-register")
504-
}
498+
assert(reregister)
505499
}
506500

507501
test("reregistration on block update") {
@@ -515,12 +509,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
515509
master.removeExecutor(store.blockManagerId.executorId)
516510
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
517511

518-
eventually(timeout(10 seconds), interval(1 seconds)) {
519-
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
520-
BlockManagerHeartbeat(store.blockManagerId))
521-
assert(reregister, "master did not tell to re-register")
522-
}
523-
524512
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
525513
store.waitForAsyncReregister()
526514

0 commit comments

Comments
 (0)