Skip to content

Commit a5a4b52

Browse files
committed
Merge remote-tracking branch 'upstream/master' into newJson
2 parents 4325475 + d45e0c6 commit a5a4b52

127 files changed

Lines changed: 2194 additions & 459 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
sbt/*.jar
88
.settings
99
.cache
10-
.generated-mima-excludes
10+
.generated-mima*
1111
/build/
1212
work/
1313
out/

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
8080
test("large number of iterations") {
8181
// This tests whether jobs with a large number of iterations finish in a reasonable time,
8282
// because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
83-
failAfter(10 seconds) {
83+
failAfter(30 seconds) {
8484
sc = new SparkContext("local", "test")
8585
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
8686
val msgs = sc.parallelize(Array[(String, TestMessage)]())
@@ -101,7 +101,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
101101
sc = new SparkContext("local", "test")
102102
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
103103
val msgs = sc.parallelize(Array[(String, TestMessage)]())
104-
val numSupersteps = 50
104+
val numSupersteps = 20
105105
val result =
106106
Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
107107
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>

bin/pyspark

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fi
4545
. $FWDIR/bin/load-spark-env.sh
4646

4747
# Figure out which Python executable to use
48-
if [ -z "$PYSPARK_PYTHON" ] ; then
48+
if [[ -z "$PYSPARK_PYTHON" ]]; then
4949
PYSPARK_PYTHON="python"
5050
fi
5151
export PYSPARK_PYTHON
@@ -59,7 +59,7 @@ export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
5959
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
6060

6161
# If IPython options are specified, assume user wants to run IPython
62-
if [ -n "$IPYTHON_OPTS" ]; then
62+
if [[ -n "$IPYTHON_OPTS" ]]; then
6363
IPYTHON=1
6464
fi
6565

@@ -76,6 +76,16 @@ for i in "$@"; do
7676
done
7777
export PYSPARK_SUBMIT_ARGS
7878

79+
# For pyspark tests
80+
if [[ -n "$SPARK_TESTING" ]]; then
81+
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
82+
exec "$PYSPARK_PYTHON" -m doctest $1
83+
else
84+
exec "$PYSPARK_PYTHON" $1
85+
fi
86+
exit
87+
fi
88+
7989
# If a python file is provided, directly run spark-submit.
8090
if [[ "$1" =~ \.py$ ]]; then
8191
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,9 +823,11 @@ class SparkContext(config: SparkConf) extends Logging {
823823
}
824824

825825
/**
826+
* :: DeveloperApi ::
826827
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
827828
* they take, etc.
828829
*/
830+
@DeveloperApi
829831
def getRDDStorageInfo: Array[RDDInfo] = {
830832
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
831833
}
@@ -837,8 +839,10 @@ class SparkContext(config: SparkConf) extends Logging {
837839
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
838840

839841
/**
842+
* :: DeveloperApi ::
840843
* Return information about blocks stored in all of the slaves
841844
*/
845+
@DeveloperApi
842846
def getExecutorStorageStatus: Array[StorageStatus] = {
843847
env.blockManager.master.getStorageStatus
844848
}

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import akka.actor.ActorRef
2323
import com.google.common.base.Charsets
2424
import com.google.common.io.Files
2525

26-
import org.apache.spark.Logging
26+
import org.apache.spark.{SparkConf, Logging}
2727
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
2828
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
29+
import org.apache.spark.util.logging.FileAppender
2930

3031
/**
3132
* Manages the execution of one executor process.
@@ -42,12 +43,15 @@ private[spark] class ExecutorRunner(
4243
val sparkHome: File,
4344
val workDir: File,
4445
val workerUrl: String,
46+
val conf: SparkConf,
4547
var state: ExecutorState.Value)
4648
extends Logging {
4749

4850
val fullId = appId + "/" + execId
4951
var workerThread: Thread = null
5052
var process: Process = null
53+
var stdoutAppender: FileAppender = null
54+
var stderrAppender: FileAppender = null
5155

5256
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
5357
// make sense to remove this in the future.
@@ -76,6 +80,13 @@ private[spark] class ExecutorRunner(
7680
if (process != null) {
7781
logInfo("Killing process!")
7882
process.destroy()
83+
process.waitFor()
84+
if (stdoutAppender != null) {
85+
stdoutAppender.stop()
86+
}
87+
if (stderrAppender != null) {
88+
stderrAppender.stop()
89+
}
7990
val exitCode = process.waitFor()
8091
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
8192
}
@@ -137,11 +148,11 @@ private[spark] class ExecutorRunner(
137148

138149
// Redirect its stdout and stderr to files
139150
val stdout = new File(executorDir, "stdout")
140-
CommandUtils.redirectStream(process.getInputStream, stdout)
151+
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
141152

142153
val stderr = new File(executorDir, "stderr")
143154
Files.write(header, stderr, Charsets.UTF_8)
144-
CommandUtils.redirectStream(process.getErrorStream, stderr)
155+
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
145156

146157
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
147158
// long-lived processes only. However, in the future, we might restart the executor a few

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ private[spark] class Worker(
235235
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
236236
self, workerId, host,
237237
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
238-
workDir, akkaUrl, ExecutorState.RUNNING)
238+
workDir, akkaUrl, conf, ExecutorState.RUNNING)
239239
executors(appId + "/" + execId) = manager
240240
manager.start()
241241
coresUsed += cores_

core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import scala.xml.Node
2424

2525
import org.apache.spark.ui.{WebUIPage, UIUtils}
2626
import org.apache.spark.util.Utils
27+
import org.apache.spark.Logging
28+
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
2729

28-
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
30+
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
2931
private val worker = parent.worker
3032
private val workDir = parent.workDir
3133

@@ -39,21 +41,18 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
3941
val offset = Option(request.getParameter("offset")).map(_.toLong)
4042
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
4143

42-
val path = (appId, executorId, driverId) match {
44+
val logDir = (appId, executorId, driverId) match {
4345
case (Some(a), Some(e), None) =>
44-
s"${workDir.getPath}/$appId/$executorId/$logType"
46+
s"${workDir.getPath}/$appId/$executorId/"
4547
case (None, None, Some(d)) =>
46-
s"${workDir.getPath}/$driverId/$logType"
48+
s"${workDir.getPath}/$driverId/"
4749
case _ =>
4850
throw new Exception("Request must specify either application or driver identifiers")
4951
}
5052

51-
val (startByte, endByte) = getByteRange(path, offset, byteLength)
52-
val file = new File(path)
53-
val logLength = file.length
54-
55-
val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
56-
pre + Utils.offsetBytes(path, startByte, endByte)
53+
val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
54+
val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n"
55+
pre + logText
5756
}
5857

5958
def render(request: HttpServletRequest): Seq[Node] = {
@@ -65,19 +64,16 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
6564
val offset = Option(request.getParameter("offset")).map(_.toLong)
6665
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
6766

68-
val (path, params) = (appId, executorId, driverId) match {
67+
val (logDir, params) = (appId, executorId, driverId) match {
6968
case (Some(a), Some(e), None) =>
70-
(s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
69+
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
7170
case (None, None, Some(d)) =>
72-
(s"${workDir.getPath}/$d/$logType", s"driverId=$d")
71+
(s"${workDir.getPath}/$d/", s"driverId=$d")
7372
case _ =>
7473
throw new Exception("Request must specify either application or driver identifiers")
7574
}
7675

77-
val (startByte, endByte) = getByteRange(path, offset, byteLength)
78-
val file = new File(path)
79-
val logLength = file.length
80-
val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
76+
val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
8177
val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
8278
val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
8379

@@ -127,23 +123,37 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
127123
UIUtils.basicSparkPage(content, logType + " log page for " + appId)
128124
}
129125

130-
/** Determine the byte range for a log or log page. */
131-
private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
132-
val defaultBytes = 100 * 1024
133-
val maxBytes = 1024 * 1024
134-
val file = new File(path)
135-
val logLength = file.length()
136-
val getOffset = offset.getOrElse(logLength - defaultBytes)
137-
val startByte =
138-
if (getOffset < 0) {
139-
0L
140-
} else if (getOffset > logLength) {
141-
logLength
142-
} else {
143-
getOffset
126+
/** Get the part of the log files given the offset and desired length of bytes */
127+
private def getLog(
128+
logDirectory: String,
129+
logType: String,
130+
offsetOption: Option[Long],
131+
byteLength: Int
132+
): (String, Long, Long, Long) = {
133+
try {
134+
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
135+
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
136+
137+
val totalLength = files.map { _.length }.sum
138+
val offset = offsetOption.getOrElse(totalLength - byteLength)
139+
val startIndex = {
140+
if (offset < 0) {
141+
0L
142+
} else if (offset > totalLength) {
143+
totalLength
144+
} else {
145+
offset
146+
}
144147
}
145-
val logPageLength = math.min(byteLength, maxBytes)
146-
val endByte = math.min(startByte + logPageLength, logLength)
147-
(startByte, endByte)
148+
val endIndex = math.min(startIndex + totalLength, totalLength)
149+
logDebug(s"Getting log from $startIndex to $endIndex")
150+
val logText = Utils.offsetBytes(files, startIndex, endIndex)
151+
logDebug(s"Got log of length ${logText.length} bytes")
152+
(logText, startIndex, endIndex, totalLength)
153+
} catch {
154+
case e: Exception =>
155+
logError(s"Error getting $logType logs from directory $logDirectory", e)
156+
("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
157+
}
148158
}
149159
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,59 @@ private[spark] object Utils extends Logging {
862862
Source.fromBytes(buff).mkString
863863
}
864864

865+
/**
866+
* Return a string containing data across a set of files. The `startIndex`
867+
* and `endIndex` is based on the cumulative size of all the files take in
868+
* the given order. See figure below for more details.
869+
*/
870+
def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
871+
val fileLengths = files.map { _.length }
872+
val startIndex = math.max(start, 0)
873+
val endIndex = math.min(end, fileLengths.sum)
874+
val fileToLength = files.zip(fileLengths).toMap
875+
logDebug("Log files: \n" + fileToLength.mkString("\n"))
876+
877+
val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
878+
var sum = 0L
879+
for (file <- files) {
880+
val startIndexOfFile = sum
881+
val endIndexOfFile = sum + fileToLength(file)
882+
logDebug(s"Processing file $file, " +
883+
s"with start index = $startIndexOfFile, end index = $endIndex")
884+
885+
/*
886+
____________
887+
range 1: | |
888+
| case A |
889+
890+
files: |==== file 1 ====|====== file 2 ======|===== file 3 =====|
891+
892+
| case B . case C . case D |
893+
range 2: |___________.____________________.______________|
894+
*/
895+
896+
if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
897+
// Case C: read the whole file
898+
stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
899+
} else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
900+
// Case A and B: read from [start of required range] to [end of file / end of range]
901+
val effectiveStartIndex = startIndex - startIndexOfFile
902+
val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
903+
stringBuffer.append(Utils.offsetBytes(
904+
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
905+
} else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
906+
// Case D: read from [start of file] to [end of require range]
907+
val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
908+
val effectiveEndIndex = endIndex - startIndexOfFile
909+
stringBuffer.append(Utils.offsetBytes(
910+
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
911+
}
912+
sum += fileToLength(file)
913+
logDebug(s"After processing file $file, string built is ${stringBuffer.toString}}")
914+
}
915+
stringBuffer.toString
916+
}
917+
865918
/**
866919
* Clone an object using a Spark serializer.
867920
*/

0 commit comments

Comments
 (0)