Skip to content

Commit 730fd83

Browse files
committed
Merge branch 'master' into SPARK-19276
2 parents bbef893 + 640f942 commit 730fd83

File tree

78 files changed

+1524
-543
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1524
-543
lines changed

R/pkg/R/install.R

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
#' }
5555
#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
5656
#' and force re-install Spark (in case the local directory or file is corrupted)
57-
#' @return \code{install.spark} returns the local directory where Spark is found or installed
57+
#' @return the (invisible) local directory where Spark is found or installed
5858
#' @rdname install.spark
5959
#' @name install.spark
6060
#' @aliases install.spark
@@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
115115
} else {
116116
if (releaseUrl != "") {
117117
message("Downloading from alternate URL:\n- ", releaseUrl)
118-
downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl))
118+
success <- downloadUrl(releaseUrl, packageLocalPath)
119+
if (!success) {
120+
unlink(packageLocalPath)
121+
stop(paste0("Fetch failed from ", releaseUrl))
122+
}
119123
} else {
120124
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
121125
}
122126
}
123127

124128
message(sprintf("Installing to %s", localDir))
125-
untar(tarfile = packageLocalPath, exdir = localDir)
126-
if (!tarExists || overwrite) {
129+
# There are two ways untar can fail - untar could stop() on errors like incomplete block on file
130+
# or, tar command can return failure code
131+
success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
132+
error = function(e) {
133+
message(e)
134+
message()
135+
FALSE
136+
},
137+
warning = function(w) {
138+
# Treat warning as error, add an empty line with message()
139+
message(w)
140+
message()
141+
FALSE
142+
})
143+
if (!tarExists || overwrite || !success) {
127144
unlink(packageLocalPath)
128145
}
146+
if (!success) stop("Extract archive failed.")
129147
message("DONE.")
130148
Sys.setenv(SPARK_HOME = packageLocalDir)
131149
message(paste("SPARK_HOME set to", packageLocalDir))
@@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
135153
robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
136154
# step 1: use user-provided url
137155
if (!is.null(mirrorUrl)) {
138-
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
139-
message(msg)
156+
message("Use user-provided mirror site: ", mirrorUrl)
140157
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
141158
packageName, packageLocalPath)
142159
if (success) {
@@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
156173
packageName, packageLocalPath)
157174
if (success) return()
158175
} else {
159-
message("Unable to find preferred mirror site.")
176+
message("Unable to download from preferred mirror site: ", mirrorUrl)
160177
}
161178

162179
# step 3: use backup option
@@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
165182
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
166183
packageName, packageLocalPath)
167184
if (success) {
168-
return(packageLocalPath)
185+
return()
169186
} else {
187+
# remove any partially downloaded file
188+
unlink(packageLocalPath)
189+
message("Unable to download from default mirror site: ", mirrorUrl)
170190
msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.",
171191
"Please check network connection, Hadoop version,",
172192
"or provide other mirror sites."),
@@ -201,14 +221,20 @@ directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
201221
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
202222
packageRemotePath)
203223
message(msg)
204-
downloadUrl(packageRemotePath, packageLocalPath, paste0("Fetch failed from ", mirrorUrl))
224+
downloadUrl(packageRemotePath, packageLocalPath)
205225
}
206226

207-
downloadUrl <- function(remotePath, localPath, errorMessage) {
227+
downloadUrl <- function(remotePath, localPath) {
208228
isFail <- tryCatch(download.file(remotePath, localPath),
209229
error = function(e) {
210-
message(errorMessage)
211-
print(e)
230+
message(e)
231+
message()
232+
TRUE
233+
},
234+
warning = function(w) {
235+
# Treat warning as error, add an empty line with message()
236+
message(w)
237+
message()
212238
TRUE
213239
})
214240
!isFail
@@ -234,10 +260,9 @@ sparkCachePath <- function() {
234260
if (.Platform$OS.type == "windows") {
235261
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
236262
if (is.na(winAppPath)) {
237-
msg <- paste("%LOCALAPPDATA% not found.",
263+
stop(paste("%LOCALAPPDATA% not found.",
238264
"Please define the environment variable",
239-
"or restart and enter an installation path in localDir.")
240-
stop(msg)
265+
"or restart and enter an installation path in localDir."))
241266
} else {
242267
path <- file.path(winAppPath, "Apache", "Spark", "Cache")
243268
}

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,26 @@ public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved
113113
onEvent(executorRemoved);
114114
}
115115

116+
@Override
117+
public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
118+
onEvent(executorBlacklisted);
119+
}
120+
121+
@Override
122+
public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
123+
onEvent(executorUnblacklisted);
124+
}
125+
126+
@Override
127+
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
128+
onEvent(nodeBlacklisted);
129+
}
130+
131+
@Override
132+
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
133+
onEvent(nodeUnblacklisted);
134+
}
135+
116136
@Override
117137
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
118138
onEvent(blockUpdated);

core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ <h4 style="clear: left; display: inline-block;">Summary</h4>
4545
title="Bytes and records written to disk in order to be read by a shuffle in a future stage.">
4646
Shuffle Write</span>
4747
</th>
48+
<th>
49+
<span data-toggle="tooltip" data-placement="left"
50+
title="Number of executors blacklisted by the scheduler due to task failures.">
51+
Blacklisted</span>
52+
</th>
4853
</thead>
4954
<tbody>
5055
</tbody>

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ $(document).ready(function () {
182182
executorsSummary = $("#active-executors");
183183

184184
getStandAloneppId(function (appId) {
185-
185+
186186
var endPoint = createRESTEndPoint(appId);
187187
$.getJSON(endPoint, function (response, status, jqXHR) {
188188
var summary = [];
@@ -202,7 +202,8 @@ $(document).ready(function () {
202202
var allTotalInputBytes = 0;
203203
var allTotalShuffleRead = 0;
204204
var allTotalShuffleWrite = 0;
205-
205+
var allTotalBlacklisted = 0;
206+
206207
var activeExecCnt = 0;
207208
var activeRDDBlocks = 0;
208209
var activeMemoryUsed = 0;
@@ -219,7 +220,8 @@ $(document).ready(function () {
219220
var activeTotalInputBytes = 0;
220221
var activeTotalShuffleRead = 0;
221222
var activeTotalShuffleWrite = 0;
222-
223+
var activeTotalBlacklisted = 0;
224+
223225
var deadExecCnt = 0;
224226
var deadRDDBlocks = 0;
225227
var deadMemoryUsed = 0;
@@ -236,7 +238,8 @@ $(document).ready(function () {
236238
var deadTotalInputBytes = 0;
237239
var deadTotalShuffleRead = 0;
238240
var deadTotalShuffleWrite = 0;
239-
241+
var deadTotalBlacklisted = 0;
242+
240243
response.forEach(function (exec) {
241244
allExecCnt += 1;
242245
allRDDBlocks += exec.rddBlocks;
@@ -254,6 +257,7 @@ $(document).ready(function () {
254257
allTotalInputBytes += exec.totalInputBytes;
255258
allTotalShuffleRead += exec.totalShuffleRead;
256259
allTotalShuffleWrite += exec.totalShuffleWrite;
260+
allTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
257261
if (exec.isActive) {
258262
activeExecCnt += 1;
259263
activeRDDBlocks += exec.rddBlocks;
@@ -271,6 +275,7 @@ $(document).ready(function () {
271275
activeTotalInputBytes += exec.totalInputBytes;
272276
activeTotalShuffleRead += exec.totalShuffleRead;
273277
activeTotalShuffleWrite += exec.totalShuffleWrite;
278+
activeTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
274279
} else {
275280
deadExecCnt += 1;
276281
deadRDDBlocks += exec.rddBlocks;
@@ -288,9 +293,10 @@ $(document).ready(function () {
288293
deadTotalInputBytes += exec.totalInputBytes;
289294
deadTotalShuffleRead += exec.totalShuffleRead;
290295
deadTotalShuffleWrite += exec.totalShuffleWrite;
296+
deadTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
291297
}
292298
});
293-
299+
294300
var totalSummary = {
295301
"execCnt": ( "Total(" + allExecCnt + ")"),
296302
"allRDDBlocks": allRDDBlocks,
@@ -307,7 +313,8 @@ $(document).ready(function () {
307313
"allTotalGCTime": allTotalGCTime,
308314
"allTotalInputBytes": allTotalInputBytes,
309315
"allTotalShuffleRead": allTotalShuffleRead,
310-
"allTotalShuffleWrite": allTotalShuffleWrite
316+
"allTotalShuffleWrite": allTotalShuffleWrite,
317+
"allTotalBlacklisted": allTotalBlacklisted
311318
};
312319
var activeSummary = {
313320
"execCnt": ( "Active(" + activeExecCnt + ")"),
@@ -325,7 +332,8 @@ $(document).ready(function () {
325332
"allTotalGCTime": activeTotalGCTime,
326333
"allTotalInputBytes": activeTotalInputBytes,
327334
"allTotalShuffleRead": activeTotalShuffleRead,
328-
"allTotalShuffleWrite": activeTotalShuffleWrite
335+
"allTotalShuffleWrite": activeTotalShuffleWrite,
336+
"allTotalBlacklisted": activeTotalBlacklisted
329337
};
330338
var deadSummary = {
331339
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
@@ -343,12 +351,13 @@ $(document).ready(function () {
343351
"allTotalGCTime": deadTotalGCTime,
344352
"allTotalInputBytes": deadTotalInputBytes,
345353
"allTotalShuffleRead": deadTotalShuffleRead,
346-
"allTotalShuffleWrite": deadTotalShuffleWrite
354+
"allTotalShuffleWrite": deadTotalShuffleWrite,
355+
"allTotalBlacklisted": deadTotalBlacklisted
347356
};
348-
357+
349358
var data = {executors: response, "execSummary": [activeSummary, deadSummary, totalSummary]};
350359
$.get(createTemplateURI(appId), function (template) {
351-
360+
352361
executorsSummary.append(Mustache.render($(template).filter("#executors-summary-template").html(), data));
353362
var selector = "#active-executors-table";
354363
var conf = {
@@ -360,7 +369,12 @@ $(document).ready(function () {
360369
}
361370
},
362371
{data: 'hostPort'},
363-
{data: 'isActive', render: formatStatus},
372+
{data: 'isActive', render: function (data, type, row) {
373+
if (type !== 'display') return data;
374+
if (row.isBlacklisted) return "Blacklisted";
375+
else return formatStatus (data, type);
376+
}
377+
},
364378
{data: 'rddBlocks'},
365379
{
366380
data: function (row, type) {
@@ -474,7 +488,8 @@ $(document).ready(function () {
474488
},
475489
{data: 'allTotalInputBytes', render: formatBytes},
476490
{data: 'allTotalShuffleRead', render: formatBytes},
477-
{data: 'allTotalShuffleWrite', render: formatBytes}
491+
{data: 'allTotalShuffleWrite', render: formatBytes},
492+
{data: 'allTotalBlacklisted'}
478493
],
479494
"paging": false,
480495
"searching": false,

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
2121

2222
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2323

24-
import org.apache.spark.SparkConf
24+
import org.apache.spark.{SparkConf, SparkContext}
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.internal.config
2727
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -48,9 +48,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
4848
* one exception is [[nodeBlacklist()]], which can be called without holding a lock.
4949
*/
5050
private[scheduler] class BlacklistTracker (
51+
private val listenerBus: LiveListenerBus,
5152
conf: SparkConf,
5253
clock: Clock = new SystemClock()) extends Logging {
5354

55+
def this(sc: SparkContext) = {
56+
this(sc.listenerBus, sc.conf)
57+
}
58+
5459
BlacklistTracker.validateBlacklistConfs(conf)
5560
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
5661
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
@@ -103,6 +108,7 @@ private[scheduler] class BlacklistTracker (
103108
execsToUnblacklist.foreach { exec =>
104109
val status = executorIdToBlacklistStatus.remove(exec).get
105110
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
111+
listenerBus.post(SparkListenerExecutorUnblacklisted(now, exec))
106112
failedExecsOnNode.remove(exec)
107113
if (failedExecsOnNode.isEmpty) {
108114
nodeToBlacklistedExecs.remove(status.node)
@@ -114,7 +120,10 @@ private[scheduler] class BlacklistTracker (
114120
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
115121
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
116122
s"has timed out")
117-
nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
123+
nodesToUnblacklist.foreach { node =>
124+
nodeIdToBlacklistExpiryTime.remove(node)
125+
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
126+
}
118127
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
119128
}
120129
updateNextExpiryTime()
@@ -161,6 +170,8 @@ private[scheduler] class BlacklistTracker (
161170
s" task failures in successful task sets")
162171
val node = failuresInTaskSet.node
163172
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
173+
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
174+
executorIdToFailureList.remove(exec)
164175
updateNextExpiryTime()
165176

166177
// In addition to blacklisting the executor, we also update the data for failures on the
@@ -174,6 +185,7 @@ private[scheduler] class BlacklistTracker (
174185
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
175186
s"executors blacklisted: ${blacklistedExecsOnNode}")
176187
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
188+
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
177189
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
178190
}
179191
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,22 @@ private[spark] class EventLoggingListener(
193193
logEvent(event, flushLogger = true)
194194
}
195195

196+
override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
197+
logEvent(event, flushLogger = true)
198+
}
199+
200+
override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
201+
logEvent(event, flushLogger = true)
202+
}
203+
204+
override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
205+
logEvent(event, flushLogger = true)
206+
}
207+
208+
override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
209+
logEvent(event, flushLogger = true)
210+
}
211+
196212
// No-op because logging every update would be overkill
197213
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
198214

0 commit comments

Comments
 (0)