Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.Utils

import scala.collection.mutable.ArrayBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this scala.* import should go into their own block in between the java.* and org.* imports. See our style guide.


/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
* locations. By default, one block is mapped to one file with a name given by its BlockId.
Expand All @@ -44,6 +46,10 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
private val localDirs: Array[File] = createLocalDirs()
if (localDirs.isEmpty) {
logError("Failed to create any local dir")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null

Expand Down Expand Up @@ -115,8 +121,9 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD

private def createLocalDirs(): Array[File] = {
logDebug(s"Creating local directories at root dirs '$rootDirs'")
val localDirsResult = ArrayBuffer[File]()
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
rootDirs.split(",").foreach { rootDir =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala style thing, you can use flatMap instead of foreach here and return None in the case where directory creation failed and Some(localDir) in the case where it worked.

var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
Expand All @@ -137,11 +144,12 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
}
if (!foundLocalDir) {
logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add to this log to say that you are ignoring this directory moving forward. e.g., something simple like,

logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir."
    + " Ignoring this directory.")

System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
} else {
logInfo(s"Created local directory at $localDir")
localDirsResult += localDir
}
logInfo(s"Created local directory at $localDir")
localDir
}
localDirsResult.toArray
}

private def addShutdownHook() {
Expand Down