Skip to content

Commit e0939f0

Browse files
Kimahrimansrowen
authored andcommitted
[SPARK-38640][CORE] Fix NPE with memory-only cache blocks and RDD fetching
### What changes were proposed in this pull request? Fixes a bug where if `spark.shuffle.service.fetch.rdd.enabled=true`, memory-only cached blocks will fail to unpersist. ### Why are the changes needed? In #33020, when all RDD blocks are removed from `externalShuffleServiceBlockStatus`, the underlying Map is nulled to reduce memory. When persisting blocks we check if it's using disk before adding it to `externalShuffleServiceBlockStatus`, but when removing them there is no check, so a memory-only cache block will keep `externalShuffleServiceBlockStatus` null, and when unpersisting it throw an NPE because it tries to remove from the null Map. This adds checks to the removal as well to only remove if the block is on disk, and therefore should have been added to `externalShuffleServiceBlockStatus` in the first place. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and updated UT Closes #35959 from Kimahriman/fetch-rdd-memory-only-unpersist. Authored-by: Adam Binford <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 619b7b4 commit e0939f0

3 files changed

Lines changed: 29 additions & 3 deletions

File tree

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -838,9 +838,11 @@ private[spark] class BlockStatusPerBlockId {
838838
}
839839

840840
def remove(blockId: BlockId): Unit = {
841-
blocks.remove(blockId)
842-
if (blocks.isEmpty) {
843-
blocks = null
841+
if (blocks != null) {
842+
blocks.remove(blockId)
843+
if (blocks.isEmpty) {
844+
blocks = null
845+
}
844846
}
845847
}
846848

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,26 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
255255
}
256256
}
257257
}
258+
259+
test("SPARK-38640: memory only blocks can unpersist using shuffle service cache fetching") {
260+
for (enabled <- Seq(true, false)) {
261+
val confWithRddFetch =
262+
conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, enabled)
263+
sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetch)
264+
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
265+
sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
266+
try {
267+
val rdd = sc.parallelize(0 until 100, 2)
268+
.map { i => (i, 1) }
269+
.persist(StorageLevel.MEMORY_ONLY)
270+
271+
rdd.count()
272+
rdd.unpersist(true)
273+
assert(sc.persistentRdds.isEmpty)
274+
} finally {
275+
rpcHandler.applicationRemoved(sc.conf.getAppId, true)
276+
sc.stop()
277+
}
278+
}
279+
}
258280
}

core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class BlockManagerInfoSuite extends SparkFunSuite {
6363
if (svcEnabled) {
6464
assert(getEssBlockStatus(bmInfo, rddId).isEmpty)
6565
}
66+
bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 0)
67+
assert(bmInfo.remainingMem === 30000)
6668
}
6769

6870
testWithShuffleServiceOnOff("RDD block with MEMORY_AND_DISK") { (svcEnabled, bmInfo) =>

0 commit comments

Comments
 (0)