Skip to content

Commit 1202075

Browse files
committed
[SPARK-17484] Prevent invalid block locations from being reported after put() exceptions
## What changes were proposed in this pull request? If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before #15037 was fixed. This patch addresses this issue via multiple small changes: - The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup). - When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present. - Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls. This patch depends upon the refactorings in #15036, so that other patch will also have to be backported when backporting this fix. For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484). ## How was this patch tested? Two new regression tests in BlockManagerSuite. Author: Josh Rosen <[email protected]> Closes #15085 from JoshRosen/SPARK-17484.
1 parent a6b8182 commit 1202075

2 files changed

Lines changed: 63 additions & 8 deletions

File tree

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,12 @@ private[spark] class BlockManager(
283283
} else {
284284
getLocalBytes(blockId) match {
285285
case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer)
286-
case None => throw new BlockNotFoundException(blockId.toString)
286+
case None =>
287+
// If this block manager receives a request for a block that it doesn't have then it's
288+
// likely that the master has outdated block statuses for this block. Therefore, we send
289+
// an RPC so that this block is marked as being unavailable from this block manager.
290+
reportBlockStatus(blockId, BlockStatus.empty)
291+
throw new BlockNotFoundException(blockId.toString)
287292
}
288293
}
289294
}
@@ -859,22 +864,38 @@ private[spark] class BlockManager(
859864
}
860865

861866
val startTimeMs = System.currentTimeMillis
862-
var blockWasSuccessfullyStored: Boolean = false
867+
var exceptionWasThrown: Boolean = true
863868
val result: Option[T] = try {
864869
val res = putBody(putBlockInfo)
865-
blockWasSuccessfullyStored = res.isEmpty
866-
res
867-
} finally {
868-
if (blockWasSuccessfullyStored) {
870+
exceptionWasThrown = false
871+
if (res.isEmpty) {
872+
// the block was successfully stored
869873
if (keepReadLock) {
870874
blockInfoManager.downgradeLock(blockId)
871875
} else {
872876
blockInfoManager.unlock(blockId)
873877
}
874878
} else {
875-
blockInfoManager.removeBlock(blockId)
879+
removeBlockInternal(blockId, tellMaster = false)
876880
logWarning(s"Putting block $blockId failed")
877881
}
882+
res
883+
} finally {
884+
// This cleanup is performed in a finally block rather than a `catch` to avoid having to
885+
// catch and properly re-throw InterruptedException.
886+
if (exceptionWasThrown) {
887+
logWarning(s"Putting block $blockId failed due to an exception")
888+
// If an exception was thrown then it's possible that the code in `putBody` has already
889+
// notified the master about the availability of this block, so we need to send an update
890+
// to remove this block location.
891+
removeBlockInternal(blockId, tellMaster = tellMaster)
892+
// The `putBody` code may have also added a new block status to TaskMetrics, so we need
893+
// to cancel that out by overwriting it with an empty block status. We only do this if
894+
// the finally block was entered via an exception because doing this unconditionally would
895+
// cause us to send empty block statuses for every block that failed to be cached due to
896+
// a memory shortage (which is an expected failure, unlike an uncaught exception).
897+
addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
898+
}
878899
}
879900
if (level.replication > 1) {
880901
logDebug("Putting block %s with replication took %s"
@@ -1173,7 +1194,7 @@ private[spark] class BlockManager(
11731194
done = true // specified number of peers have been replicated to
11741195
}
11751196
} catch {
1176-
case e: Exception =>
1197+
case NonFatal(e) =>
11771198
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
11781199
failures += 1
11791200
replicationFailed = true

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
861861
serializerManager, conf, memoryManager, mapOutputTracker,
862862
shuffleManager, transfer, securityMgr, 0)
863863
memoryManager.setMemoryStore(store.memoryStore)
864+
store.initialize("app-id")
864865

865866
// The put should fail since a1 is not serializable.
866867
class UnserializableClass
@@ -1206,6 +1207,39 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
12061207
verify(mockBlockManagerMaster, times(2)).getLocations("item")
12071208
}
12081209

1210+
test("SPARK-17484: block status is properly updated following an exception in put()") {
1211+
val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) {
1212+
override def uploadBlock(
1213+
hostname: String,
1214+
port: Int, execId: String,
1215+
blockId: BlockId,
1216+
blockData: ManagedBuffer,
1217+
level: StorageLevel,
1218+
classTag: ClassTag[_]): Future[Unit] = {
1219+
throw new InterruptedException("Intentional interrupt")
1220+
}
1221+
}
1222+
store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
1223+
store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService))
1224+
intercept[InterruptedException] {
1225+
store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true)
1226+
}
1227+
assert(store.getLocalBytes("item").isEmpty)
1228+
assert(master.getLocations("item").isEmpty)
1229+
assert(store2.getRemoteBytes("item").isEmpty)
1230+
}
1231+
1232+
test("SPARK-17484: master block locations are updated following an invalid remote block fetch") {
1233+
store = makeBlockManager(8000, "executor1")
1234+
store2 = makeBlockManager(8000, "executor2")
1235+
store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true)
1236+
assert(master.getLocations("item").nonEmpty)
1237+
store.removeBlock("item", tellMaster = false)
1238+
assert(master.getLocations("item").nonEmpty)
1239+
assert(store2.getRemoteBytes("item").isEmpty)
1240+
assert(master.getLocations("item").isEmpty)
1241+
}
1242+
12091243
class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
12101244
var numCalls = 0
12111245

0 commit comments

Comments
 (0)