Skip to content

Commit 328addd

Browse files
committed
Merge remote-tracking branch 'upstream/master' into testMaster22
2 parents e8bf33c + c44eb56 commit 328addd

333 files changed

Lines changed: 7724 additions & 3755 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

R/pkg/R/context.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ setCheckpointDirSC <- function(sc, dirName) {
305305
#' Currently directories are only supported for Hadoop-supported filesystems.
306306
#' Refer Hadoop-supported filesystems at \url{https://wiki.apache.org/hadoop/HCFS}.
307307
#'
308+
#' Note: A path can be added only once. Subsequent additions of the same path are ignored.
309+
#'
308310
#' @rdname spark.addFile
309311
#' @param path The path of the file to be added
310312
#' @param recursive Whether to add files recursively from the path. Default is FALSE.

R/pkg/inst/tests/testthat/test_basic.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
context("basic tests for CRAN")
1919

2020
test_that("create DataFrame from list or data.frame", {
21-
tryCatch( checkJavaVersion(),
21+
tryCatch(checkJavaVersion(),
2222
error = function(e) { skip("error on Java check") },
23-
warning = function(e) { skip("warning on Java check") } )
23+
warning = function(e) { skip("warning on Java check") })
2424

2525
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
2626
sparkConfig = sparkRTestConfig)
@@ -54,9 +54,9 @@ test_that("create DataFrame from list or data.frame", {
5454
})
5555

5656
test_that("spark.glm and predict", {
57-
tryCatch( checkJavaVersion(),
57+
tryCatch(checkJavaVersion(),
5858
error = function(e) { skip("error on Java check") },
59-
warning = function(e) { skip("warning on Java check") } )
59+
warning = function(e) { skip("warning on Java check") })
6060

6161
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
6262
sparkConfig = sparkRTestConfig)

bin/docker-image-tool.sh

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ function build {
4949
# Set image build arguments accordingly if this is a source repo and not a distribution archive.
5050
IMG_PATH=resource-managers/kubernetes/docker/src/main/dockerfiles
5151
BUILD_ARGS=(
52+
${BUILD_PARAMS}
5253
--build-arg
5354
img_path=$IMG_PATH
5455
--build-arg
@@ -57,13 +58,14 @@ function build {
5758
else
5859
# Not passed as an argument to docker, but used to validate the Spark directory.
5960
IMG_PATH="kubernetes/dockerfiles"
60-
BUILD_ARGS=()
61+
BUILD_ARGS=(${BUILD_PARAMS})
6162
fi
6263

6364
if [ ! -d "$IMG_PATH" ]; then
6465
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
6566
fi
6667
local BINDING_BUILD_ARGS=(
68+
${BUILD_PARAMS}
6769
--build-arg
6870
base_img=$(image_ref spark)
6971
)
@@ -101,6 +103,8 @@ Options:
101103
-t tag Tag to apply to the built image, or to identify the image to be pushed.
102104
-m Use minikube's Docker daemon.
103105
-n Build docker image with --no-cache
106+
-b arg Build arg to build or push the image. For multiple build args, this option needs to
107+
be used separately for each build arg.
104108
105109
Using minikube when building images will do so directly into minikube's Docker daemon.
106110
There is no need to push the images into minikube in that case, they'll be automatically
@@ -130,7 +134,8 @@ TAG=
130134
BASEDOCKERFILE=
131135
PYDOCKERFILE=
132136
NOCACHEARG=
133-
while getopts f:mr:t:n option
137+
BUILD_PARAMS=
138+
while getopts f:p:mr:t:n:b: option
134139
do
135140
case "${option}"
136141
in
@@ -139,6 +144,7 @@ do
139144
r) REPO=${OPTARG};;
140145
t) TAG=${OPTARG};;
141146
n) NOCACHEARG="--no-cache";;
147+
b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
142148
m)
143149
if ! which minikube 1>/dev/null; then
144150
error "Cannot find minikube."

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,9 +488,15 @@ private[spark] class ExecutorAllocationManager(
488488
newExecutorTotal = numExistingExecutors
489489
if (testing || executorsRemoved.nonEmpty) {
490490
executorsRemoved.foreach { removedExecutorId =>
491+
// If it is a cached block, it uses cachedExecutorIdleTimeoutS for timeout
492+
val idleTimeout = if (blockManagerMaster.hasCachedBlocks(removedExecutorId)) {
493+
cachedExecutorIdleTimeoutS
494+
} else {
495+
executorIdleTimeoutS
496+
}
491497
newExecutorTotal -= 1
492498
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
493-
s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
499+
s"$idleTimeout seconds (new desired total will be $newExecutorTotal)")
494500
executorsPendingToRemove.add(removedExecutorId)
495501
}
496502
executorsRemoved

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,8 @@ class SparkContext(config: SparkConf) extends Logging {
14961496
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
14971497
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
14981498
* use `SparkFiles.get(fileName)` to find its download location.
1499+
*
1500+
* @note A path can be added only once. Subsequent additions of the same path are ignored.
14991501
*/
15001502
def addFile(path: String): Unit = {
15011503
addFile(path, false)
@@ -1516,11 +1518,17 @@ class SparkContext(config: SparkConf) extends Logging {
15161518
* use `SparkFiles.get(fileName)` to find its download location.
15171519
* @param recursive if true, a directory can be given in `path`. Currently directories are
15181520
* only supported for Hadoop-supported filesystems.
1521+
*
1522+
* @note A path can be added only once. Subsequent additions of the same path are ignored.
15191523
*/
15201524
def addFile(path: String, recursive: Boolean): Unit = {
15211525
val uri = new Path(path).toUri
15221526
val schemeCorrectedPath = uri.getScheme match {
1523-
case null | "local" => new File(path).getCanonicalFile.toURI.toString
1527+
case null => new File(path).getCanonicalFile.toURI.toString
1528+
case "local" =>
1529+
logWarning("File with 'local' scheme is not supported to add to file server, since " +
1530+
"it is already available on every node.")
1531+
return
15241532
case _ => path
15251533
}
15261534

@@ -1555,6 +1563,9 @@ class SparkContext(config: SparkConf) extends Logging {
15551563
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
15561564
env.securityManager, hadoopConfiguration, timestamp, useCache = false)
15571565
postEnvironmentUpdate()
1566+
} else {
1567+
logWarning(s"The path $path has been added already. Overwriting of added paths " +
1568+
"is not supported in the current version.")
15581569
}
15591570
}
15601571

@@ -1803,6 +1814,8 @@ class SparkContext(config: SparkConf) extends Logging {
18031814
*
18041815
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
18051816
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
1817+
*
1818+
* @note A path can be added only once. Subsequent additions of the same path are ignored.
18061819
*/
18071820
def addJar(path: String) {
18081821
def addJarFile(file: File): String = {
@@ -1849,6 +1862,9 @@ class SparkContext(config: SparkConf) extends Logging {
18491862
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
18501863
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
18511864
postEnvironmentUpdate()
1865+
} else {
1866+
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
1867+
"is not supported in the current version.")
18521868
}
18531869
}
18541870
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,8 @@ class JavaSparkContext(val sc: SparkContext)
668668
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
669669
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
670670
* use `SparkFiles.get(fileName)` to find its download location.
671+
*
672+
* @note A path can be added only once. Subsequent additions of the same path are ignored.
671673
*/
672674
def addFile(path: String) {
673675
sc.addFile(path)
@@ -681,6 +683,8 @@ class JavaSparkContext(val sc: SparkContext)
681683
*
682684
* A directory can be given if the recursive option is set to true. Currently directories are only
683685
* supported for Hadoop-supported filesystems.
686+
*
687+
* @note A path can be added only once. Subsequent additions of the same path are ignored.
684688
*/
685689
def addFile(path: String, recursive: Boolean): Unit = {
686690
sc.addFile(path, recursive)
@@ -690,6 +694,8 @@ class JavaSparkContext(val sc: SparkContext)
690694
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
691695
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
692696
* filesystems), or an HTTP, HTTPS or FTP URI.
697+
*
698+
* @note A path can be added only once. Subsequent additions of the same path are ignored.
693699
*/
694700
def addJar(path: String) {
695701
sc.addJar(path)

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,11 @@ package object config {
432432
"external shuffle service, this feature can only be worked when external shuffle" +
433433
"service is newer than Spark 2.2.")
434434
.bytesConf(ByteUnit.BYTE)
435-
.createWithDefault(Long.MaxValue)
435+
// fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might
436+
// as well use fetch-to-disk in that case. The message includes some metadata in addition
437+
// to the block data itself (in particular UploadBlock has a lot of metadata), so we leave
438+
// extra room.
439+
.createWithDefault(Int.MaxValue - 512)
436440

437441
private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
438442
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl(
697697
* do not also submit those same tasks. That also means that a task completion from an earlier
698698
* attempt can lead to the entire stage getting marked as successful.
699699
*/
700-
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
700+
private[scheduler] def markPartitionCompletedInAllTaskSets(
701+
stageId: Int,
702+
partitionId: Int,
703+
taskInfo: TaskInfo) = {
701704
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
702-
tsm.markPartitionCompleted(partitionId)
705+
tsm.markPartitionCompleted(partitionId, taskInfo)
703706
}
704707
}
705708

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ private[spark] class TaskSetManager(
8484
val successful = new Array[Boolean](numTasks)
8585
private val numFailures = new Array[Int](numTasks)
8686

87-
// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
88-
// this happened while we set the `spark.speculation` to true. The task killed by others
87+
// Add the tid of task into this HashSet when the task is killed by other attempt tasks.
88+
// This happened while we set the `spark.speculation` to true. The task killed by others
8989
// should not resubmit while executor lost.
90-
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
90+
private val killedByOtherAttempt = new HashSet[Long]
9191

9292
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
9393
private[scheduler] var tasksSuccessful = 0
@@ -735,7 +735,7 @@ private[spark] class TaskSetManager(
735735
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
736736
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
737737
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
738-
killedByOtherAttempt(index) = true
738+
killedByOtherAttempt += attemptInfo.taskId
739739
sched.backend.killTask(
740740
attemptInfo.taskId,
741741
attemptInfo.executorId,
@@ -758,7 +758,7 @@ private[spark] class TaskSetManager(
758758
}
759759
// There may be multiple tasksets for this stage -- we let all of them know that the partition
760760
// was completed. This may result in some of the tasksets getting completed.
761-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
761+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
762762
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
763763
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
764764
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -769,9 +769,12 @@ private[spark] class TaskSetManager(
769769
maybeFinishTaskSet()
770770
}
771771

772-
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
772+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
773773
partitionToIndex.get(partitionId).foreach { index =>
774774
if (!successful(index)) {
775+
if (speculationEnabled && !isZombie) {
776+
successfulTaskDurations.insert(taskInfo.duration)
777+
}
775778
tasksSuccessful += 1
776779
successful(index) = true
777780
if (tasksSuccessful == numTasks) {
@@ -944,7 +947,7 @@ private[spark] class TaskSetManager(
944947
&& !isZombie) {
945948
for ((tid, info) <- taskInfos if info.executorId == execId) {
946949
val index = taskInfos(tid).index
947-
if (successful(index) && !killedByOtherAttempt(index)) {
950+
if (successful(index) && !killedByOtherAttempt.contains(tid)) {
948951
successful(index) = false
949952
copiesRunning(index) -= 1
950953
tasksSuccessful -= 1

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ private[spark] class BlockManager(
130130

131131
private[spark] val externalShuffleServiceEnabled =
132132
conf.getBoolean("spark.shuffle.service.enabled", false)
133+
private val chunkSize =
134+
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
133135

134136
val diskBlockManager = {
135137
// Only perform cleanup if an external service is not serving our shuffle files.
@@ -660,6 +662,11 @@ private[spark] class BlockManager(
660662
* Get block from remote block managers as serialized bytes.
661663
*/
662664
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
665+
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
666+
// could just use the inputStream on the temp file, rather than memory-mapping the file.
667+
// Until then, replication can cause the process to use too much memory and get killed
668+
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
669+
// we've read the data to disk.
663670
logDebug(s"Getting remote block $blockId")
664671
require(blockId != null, "BlockId is null")
665672
var runningFailureCount = 0
@@ -690,7 +697,7 @@ private[spark] class BlockManager(
690697
logDebug(s"Getting remote block $blockId from $loc")
691698
val data = try {
692699
blockTransferService.fetchBlockSync(
693-
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
700+
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager)
694701
} catch {
695702
case NonFatal(e) =>
696703
runningFailureCount += 1
@@ -724,7 +731,7 @@ private[spark] class BlockManager(
724731
}
725732

726733
if (data != null) {
727-
return Some(new ChunkedByteBuffer(data))
734+
return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize))
728735
}
729736
logDebug(s"The value of block $blockId is null")
730737
}

0 commit comments

Comments
 (0)