Skip to content

Commit 91442fd

Browse files
liyezhang556520srowen
authored andcommitted
[SPARK-6197][CORE] handle json exception when hisotry file not finished writing
For details, please refer to [SPARK-6197](https://issues.apache.org/jira/browse/SPARK-6197) Author: Zhang, Liye <liye.zhang@intel.com> Closes #4927 from liyezhang556520/jsonParseError and squashes the following commits: 5cbdc82 [Zhang, Liye] without unnecessary wrap 2b48831 [Zhang, Liye] small changes with sean owen's comments 2973024 [Zhang, Liye] handle json exception when file not finished writing
1 parent d618df2 commit 91442fd

2 files changed

Lines changed: 23 additions & 5 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,9 @@ private[spark] class Master(
760760
val replayBus = new ReplayListenerBus()
761761
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
762762
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
763+
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
763764
try {
764-
replayBus.replay(logInput, eventLogFile)
765+
replayBus.replay(logInput, eventLogFile, maybeTruncated)
765766
} finally {
766767
logInput.close()
767768
}

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{InputStream, IOException}
2121

2222
import scala.io.Source
2323

24+
import com.fasterxml.jackson.core.JsonParseException
2425
import org.json4s.jackson.JsonMethods._
2526

2627
import org.apache.spark.Logging
@@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
4041
*
4142
* @param logData Stream containing event log data.
4243
* @param sourceName Filename (or other source identifier) from whence @logData is being read
44+
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
45+
* encountered, log file might not finished writing) or not
4346
*/
44-
def replay(logData: InputStream, sourceName: String): Unit = {
47+
def replay(
48+
logData: InputStream,
49+
sourceName: String,
50+
maybeTruncated: Boolean = false): Unit = {
4551
var currentLine: String = null
4652
var lineNumber: Int = 1
4753
try {
4854
val lines = Source.fromInputStream(logData).getLines()
49-
lines.foreach { line =>
50-
currentLine = line
51-
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
55+
while (lines.hasNext) {
56+
currentLine = lines.next()
57+
try {
58+
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
59+
} catch {
60+
case jpe: JsonParseException =>
61+
// We can only ignore exception from last line of the file that might be truncated
62+
if (!maybeTruncated || lines.hasNext) {
63+
throw jpe
64+
} else {
65+
logWarning(s"Got JsonParseException from log file $sourceName" +
66+
s" at line $lineNumber, the file might not have finished writing cleanly.")
67+
}
68+
}
5269
lineNumber += 1
5370
}
5471
} catch {

0 commit comments

Comments
 (0)