-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace #19311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -407,4 +407,119 @@ class MemoryStoreSuite | |
| }) | ||
| assert(memoryStore.getSize(blockId) === 10000) | ||
| } | ||
|
|
||
| test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { | ||
| // Setup a memory store with many blocks cached, and then one request which leads to multiple | ||
| // blocks getting evicted. We'll make the eviction throw an exception, and make sure that | ||
| // all locks are released. | ||
| val ct = implicitly[ClassTag[Array[Byte]]] | ||
| def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { | ||
| val tc = TaskContext.empty() | ||
| val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) | ||
| val blockInfoManager = new BlockInfoManager | ||
| blockInfoManager.registerTask(tc.taskAttemptId) | ||
| var droppedSoFar = 0 | ||
| val blockEvictionHandler = new BlockEvictionHandler { | ||
| var memoryStore: MemoryStore = _ | ||
|
|
||
| override private[storage] def dropFromMemory[T: ClassTag]( | ||
| blockId: BlockId, | ||
| data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { | ||
| if (droppedSoFar < failAfterDroppingNBlocks) { | ||
| droppedSoFar += 1 | ||
| memoryStore.remove(blockId) | ||
| if (readLockAfterDrop) { | ||
| // for testing purposes, we act like another thread gets the read lock on the new | ||
| // block | ||
| StorageLevel.DISK_ONLY | ||
| } else { | ||
| StorageLevel.NONE | ||
| } | ||
| } else { | ||
| throw new RuntimeException(s"Mock error dropping block $droppedSoFar") | ||
| } | ||
| } | ||
| } | ||
| val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, | ||
| blockEvictionHandler) { | ||
| override def afterDropAction(blockId: BlockId): Unit = { | ||
| if (readLockAfterDrop) { | ||
| // pretend that we get a read lock on the block (now on disk) in another thread | ||
| TaskContext.setTaskContext(tc) | ||
| blockInfoManager.lockForReading(blockId) | ||
| TaskContext.unset() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| blockEvictionHandler.memoryStore = memoryStore | ||
| memManager.setMemoryStore(memoryStore) | ||
|
|
||
| // Put in some small blocks to fill up the memory store | ||
| val initialBlocks = (1 to 10).map { id => | ||
|
||
| val blockId = BlockId(s"rdd_1_$id") | ||
| val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) | ||
| val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) | ||
| assert(initialWriteLock) | ||
| val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { | ||
| new ChunkedByteBuffer(ByteBuffer.allocate(10)) | ||
| }) | ||
| assert(success) | ||
| blockInfoManager.unlock(blockId, None) | ||
| } | ||
| assert(blockInfoManager.size === 10) | ||
|
|
||
|
|
||
| // Add one big block, which will require evicting everything in the memorystore. However our | ||
| // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. | ||
| val largeBlockId = BlockId(s"rdd_2_1") | ||
| val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) | ||
| val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) | ||
| assert(initialWriteLock) | ||
| if (failAfterDroppingNBlocks < 10) { | ||
| val exc = intercept[RuntimeException] { | ||
| memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { | ||
| new ChunkedByteBuffer(ByteBuffer.allocate(100)) | ||
| }) | ||
| } | ||
| assert(exc.getMessage().startsWith("Mock error dropping block"), exc) | ||
| // BlockManager.doPut takes care of releasing the lock for the newly written block -- not | ||
| // testing that here, so do it manually | ||
| blockInfoManager.removeBlock(largeBlockId) | ||
| } else { | ||
| memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { | ||
| new ChunkedByteBuffer(ByteBuffer.allocate(100)) | ||
| }) | ||
| // BlockManager.doPut takes care of releasing the lock for the newly written block -- not | ||
| // testing that here, so do it manually | ||
| blockInfoManager.unlock(largeBlockId) | ||
| } | ||
|
|
||
| val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 | ||
| val expBlocks = 10 + | ||
| (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) + | ||
| largeBlockInMemory | ||
| assert(blockInfoManager.size === expBlocks) | ||
|
|
||
| val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) => | ||
| assert(info.writerTask === BlockInfo.NO_WRITER, id) | ||
| // in this test, all the blocks in memory have no reader, but everything dropped to disk | ||
| // had another thread read the block. We shouldn't lose the other thread's reader lock. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am curious about this part of the test. Am I missing something ? Are we testing for write lock release resulting in read unlock for other task's as well ? (To nitpick, the write lock release and read lock acquire can be interspersed by another read or write acquire (ofcourse not in this test) )
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In an earlier version of this, I was always unconditionally releasing all locks that were held by anything in the Yes, there are many other possible interleavings of locks possible with other threads, but thats not the point of this test case. Its to make sure that the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for clarifying @squito ... I was assuming the test was for something along those line, but good to know I did not misunderstand ! |
||
| if (memoryStore.contains(id)) { | ||
| assert(info.readerCount === 0, id) | ||
| true | ||
| } else { | ||
| assert(info.readerCount === 1, id) | ||
| false | ||
| } | ||
| } | ||
| assert(blocksStillInMemory.size === (10 - failAfterDroppingNBlocks + largeBlockInMemory)) | ||
| } | ||
|
|
||
| Seq(0, 3, 10).foreach { failAfterDropping => | ||
| Seq(true, false).foreach { readLockAfterDropping => | ||
| testFailureOnNthDrop(failAfterDropping, readLockAfterDropping) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: failAfterDroppingNBlocks -> numValidBlocks, readLockAfterDrop -> validBlock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think
validBlockcaptures the intent here -- I don't see anything valid or invalid about it either way. The part of the behavior which changes is whether or not another thread grabs a reader lock on the thread after it gets dropped to disk.(To go along with that, we drop the block to disk, rather than just evicting it completely, as otherwise there is nothing to grab a lock of. I could always drop the block to disk, instead of having that depend on this, it just seemed like another useful thing to check, whether the number of blocks was successfully updated in
blockInfoManager, when the block was dropped completely.)