Skip to content

Commit c8cdde5

Browse files
author
Liupengcheng
committed
Handle bad disk in DiskBlockManager
1 parent d53e11f commit c8cdde5

File tree

3 files changed

+74
-18
lines changed

3 files changed

+74
-18
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,16 @@ package object config {
284284
.intConf
285285
.createWithDefault(64)
286286

287+
private[spark] val DISK_STORE_BLACKLIST_TIMEOUT =
288+
ConfigBuilder("spark.diskStore.blacklist.timeout")
289+
.timeConf(TimeUnit.SECONDS)
290+
.createWithDefaultString("1d")
291+
292+
private[spark] val DISK_STORE_MAX_RETIRES =
293+
ConfigBuilder("spark.diskStore.maxRetries")
294+
.intConf
295+
.createWithDefault(3)
296+
287297
private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH =
288298
ConfigBuilder("spark.block.failures.beforeLocationRefresh")
289299
.doc("Max number of failures before this block manager refreshes " +

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.storage
2020
import java.io.{File, IOException}
2121
import java.util.UUID
2222

23+
import scala.collection.mutable.{ArrayBuffer, HashMap}
24+
2325
import org.apache.spark.SparkConf
2426
import org.apache.spark.executor.ExecutorExitCode
2527
import org.apache.spark.internal.{config, Logging}
@@ -48,33 +50,57 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
4850
// of subDirs(i) is protected by the lock of subDirs(i)
4951
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
5052

53+
private val badDirs = ArrayBuffer[File]()
54+
private val maxRetries = conf.get(config.DISK_STORE_MAX_RETIRES)
55+
private val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT)
56+
private val dirToBlacklistExpiryTime = new HashMap[File, Long]
57+
5158
private val shutdownHook = addShutdownHook()
5259

5360
/** Looks up a file by hashing it into one of our local subdirectories. */
5461
// This method should be kept in sync with
5562
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
5663
def getFile(filename: String): File = {
57-
// Figure out which local directory it hashes to, and which subdirectory in that
58-
val hash = Utils.nonNegativeHash(filename)
59-
val dirId = hash % localDirs.length
60-
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
61-
62-
// Create the subdirectory if it doesn't already exist
63-
val subDir = subDirs(dirId).synchronized {
64-
val old = subDirs(dirId)(subDirId)
65-
if (old != null) {
66-
old
67-
} else {
68-
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
69-
if (!newDir.exists() && !newDir.mkdir()) {
70-
throw new IOException(s"Failed to create local dir in $newDir.")
64+
var mostRecentFailure: Exception = null
65+
// Update blacklist
66+
val now = System.currentTimeMillis()
67+
badDirs.dropWhile(now > dirToBlacklistExpiryTime(_))
68+
69+
for (attempt <- 0 until maxRetries) {
70+
val goodDirs = localDirs.filterNot(badDirs.contains(_))
71+
if (goodDirs.isEmpty) {
72+
throw new IOException("No good disk directories available")
73+
}
74+
// Figure out which local directory it hashes to, and which subdirectory in that
75+
val hash = Utils.nonNegativeHash(filename)
76+
val dirId = hash % goodDirs.length
77+
val subDirId = (hash / goodDirs.length) % subDirsPerLocalDir
78+
try {
79+
// Create the subdirectory if it doesn't already exist
80+
val subDir = subDirs(dirId).synchronized {
81+
val old = subDirs(dirId)(subDirId)
82+
if (old != null) {
83+
old
84+
} else {
85+
val newDir = new File(goodDirs(dirId), "%02x".format(subDirId))
86+
if (!newDir.exists() && !newDir.mkdir()) {
87+
throw new IOException(s"Failed to create local dir in $newDir.")
88+
}
89+
subDirs(dirId)(subDirId) = newDir
90+
newDir
91+
}
7192
}
72-
subDirs(dirId)(subDirId) = newDir
73-
newDir
93+
return new File(subDir, filename)
94+
} catch {
95+
case e: IOException =>
96+
logError(s"Failed to looking up file $filename in attempt $attempt", e)
97+
badDirs += goodDirs(dirId)
98+
dirToBlacklistExpiryTime.put(goodDirs(dirId),
99+
System.currentTimeMillis() + blacklistTimeout)
100+
mostRecentFailure = e
74101
}
75102
}
76-
77-
new File(subDir, filename)
103+
throw mostRecentFailure
78104
}
79105

80106
def getFile(blockId: BlockId): File = getFile(blockId.name)

core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,24 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
9191
for (i <- 0 until numBytes) writer.write(i)
9292
writer.close()
9393
}
94+
95+
test("test blacklisting bad disk directory") {
96+
val blockId = new TestBlockId("1")
97+
val hash = Utils.nonNegativeHash(blockId)
98+
val (badDiskDir, goodDiskDir) = if (hash % rootDirs.length == 0) {
99+
(rootDir0, rootDir1)
100+
} else {
101+
(rootDir1, rootDir0)
102+
}
103+
104+
// Delete dirs to simulate disk error
105+
Utils.deleteRecursively(badDiskDir)
106+
try {
107+
val file = diskBlockManager.getFile(blockId)
108+
val fileRootDir = file.getParentFile.getParentFile.getParentFile
109+
assert(file != null && file.getParentFile.exists() && fileRootDir === goodDiskDir)
110+
} finally {
111+
badDiskDir.mkdirs()
112+
}
113+
}
94114
}

0 commit comments

Comments
 (0)