Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -874,11 +874,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
Expand Down Expand Up @@ -923,11 +923,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
Expand Down Expand Up @@ -1100,13 +1100,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// The call to NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

Expand Down Expand Up @@ -1369,7 +1369,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.getFileStatus(hadoopPath).isDir
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.Date
import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskType

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
Expand All @@ -37,10 +38,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {

private val now = new Date()
private val conf = new SerializableJobConf(jobConf)
Expand Down Expand Up @@ -131,7 +129,7 @@ class SparkHadoopWriter(jobConf: JobConf)

private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
jobContext = new JobContextImpl(conf.value, jID.value)
}
jobContext
}
Expand All @@ -143,14 +141,20 @@ class SparkHadoopWriter(jobConf: JobConf)
taskContext
}

protected def newTaskAttemptContext(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed? It seems you replaced the call with a direct call to the constructor in a lot of places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's overridden in a mock object in OutputCommitCoordinatorSuite so I kept this

conf: JobConf,
attemptId: TaskAttemptID): TaskAttemptContext = {
new TaskAttemptContextImpl(conf, attemptId)
}

private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
jobID = jobid
splitID = splitid
attemptID = attemptid

jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
}

Expand All @@ -168,9 +172,9 @@ object SparkHadoopWriter {
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (outputPath == null || fs == null) {
if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs)
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
41 changes: 4 additions & 37 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -76,9 +73,6 @@ class SparkHadoopUtil extends Logging {
}
}

@deprecated("use newConfiguration with SparkConf argument", "1.2.0")
def newConfiguration(): Configuration = newConfiguration(null)

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
Expand Down Expand Up @@ -190,33 +184,6 @@ class SparkHadoopUtil extends Logging {
statisticsDataClass.getDeclaredMethod(methodName)
}

/**
* Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
* call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
* for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
// scalastyle:off jobconfig
val method = context.getClass.getMethod("getConfiguration")
// scalastyle:on jobconfig
method.invoke(context).asInstanceOf[Configuration]
}

/**
* Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly
* call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes
* for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+
* while it's interface in Hadoop 2.+.
*/
def getTaskAttemptIDFromTaskAttemptContext(
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
// scalastyle:off jobconfig
val method = context.getClass.getMethod("getTaskAttemptID")
// scalastyle:on jobconfig
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
Expand All @@ -233,11 +200,11 @@ class SparkHadoopUtil extends Logging {
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}

if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
}

def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
Expand All @@ -246,12 +213,12 @@ class SparkHadoopUtil extends Logging {

def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}

assert(baseStatus.isDir)
assert(baseStatus.isDirectory)
recurse(baseStatus)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -167,7 +168,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
throw new IllegalArgumentException(msg)
}
if (!fs.getFileStatus(path).isDir) {
if (!fs.getFileStatus(path).isDirectory) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}
Expand Down Expand Up @@ -304,7 +305,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
} finally {
if (!fs.delete(path)) {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
}
}
Expand Down Expand Up @@ -603,7 +604,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
* See SPARK-2261 for more detail.
*/
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory

/**
* Returns the modification time of the given event log. If the status points at an empty
Expand Down Expand Up @@ -648,8 +649,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

/**
* Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
* so we have to resort to ugly reflection (as usual...).
* Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
Expand All @@ -663,11 +663,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// For testing.
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class)
val action = actionClass.getField("SAFEMODE_GET").get(null)
val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
method.invoke(dfs, action).asInstanceOf[Boolean]
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil

/**
* Custom Input Format for reading and splitting flat binary files that contain records,
Expand All @@ -36,7 +35,7 @@ private[spark] object FixedLengthBinaryInputFormat {

/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.spark.deploy.SparkHadoopUtil

/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
Expand Down Expand Up @@ -83,16 +82,16 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
val conf = context.getConfiguration
// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
val codec = new CompressionCodecFactory(conf).getCodec(file)
if (codec != null) {
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
}
// get the record length
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
// get the filesystem
val fs = file.getFileSystem(job)
val fs = file.getFileSystem(conf)
// open the File
fileInputStream = fs.open(file)
// seek to the splitStart position
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}

import org.apache.spark.deploy.SparkHadoopUtil

/**
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
Expand All @@ -44,7 +42,7 @@ private[spark] abstract class StreamFileInputFormat[T]
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
super.setMaxSplitSize(maxSplitSize)
}
Expand Down Expand Up @@ -135,8 +133,7 @@ class PortableDataStream(

private val confBytes = {
val baos = new ByteArrayOutputStream()
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
write(new DataOutputStream(baos))
context.getConfiguration.write(new DataOutputStream(baos))
baos.toByteArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class WholeTextFileInputFormat
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.deploy.SparkHadoopUtil


/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
Expand All @@ -52,8 +50,7 @@ private[spark] class WholeTextFileRecordReader(
extends RecordReader[Text, Text] with Configurable {

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(
SparkHadoopUtil.get.getConfigurationFromJobContext(context))
private[this] val fs = path.getFileSystem(context.getConfiguration)

// True means the current file has been processed, then skip it.
private[this] var processed = false
Expand Down
Loading