Skip to content

Commit 8db5a06

Browse files
author
Andrew Or
committed
Embed metadata in the event log file name instead
This makes the event logs much easier to parse than before. As of this commit the whole file is either entirely compressed or not compressed, but not somewhere in between.
1 parent 57566d0 commit 8db5a06

11 files changed

Lines changed: 103 additions & 96 deletions

File tree

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
5151
import org.apache.spark.executor.TriggerThreadDump
5252
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
5353
FixedLengthBinaryInputFormat}
54+
import org.apache.spark.io.CompressionCodec
5455
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5556
import org.apache.spark.rdd._
5657
import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
233234
None
234235
}
235236
}
237+
private[spark] val eventLogCodec: Option[String] = {
238+
val compress = conf.getBoolean("spark.eventLog.compress", false)
239+
if (compress && isEventLogEnabled) {
240+
Some(CompressionCodec.createCodec(conf)).map(_.getClass.getCanonicalName)
241+
} else {
242+
None
243+
}
244+
}
236245

237246
// Generate the random name for a temp folder in Tachyon
238247
// Add a timestamp as the suffix here to make it more safe

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
2323
val memoryPerSlave: Int,
2424
val command: Command,
2525
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None)
26+
val sparkVersion: String,
27+
val eventLogDir: Option[String] = None,
28+
val eventLogCodec: Option[String] = None)
2729
extends Serializable {
2830

2931
val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,11 @@ private[spark] class ApplicationDescription(
3436
memoryPerSlave: Int = memoryPerSlave,
3537
command: Command = command,
3638
appUiUrl: String = appUiUrl,
37-
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
38-
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
39+
sparkVersion: String = sparkVersion,
40+
eventLogDir: Option[String] = eventLogDir,
41+
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
42+
new ApplicationDescription(
43+
name, maxCores, memoryPerSlave, command, appUiUrl, sparkVersion, eventLogDir, eventLogCodec)
3944

4045
override def toString: String = "ApplicationDescription(" + name + ")"
4146
}

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ private[spark] object TestClient {
4949
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
5151
val desc = new ApplicationDescription("TestClient", Some(1), 512,
52-
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
52+
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()),
53+
"ignored", "1.2.3")
5354
val listener = new TestListener
5455
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
5556
client.start()

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
8181
= new mutable.LinkedHashMap()
8282

8383
// Constants used to parse Spark 1.0.0 log directories.
84-
private[history] val LOG_PREFIX = "EVENT_LOG_"
85-
private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
86-
private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
84+
private[history] val LOG_PREFIX = EventLoggingListener.EVENT_LOG_KEY + "_"
85+
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
86+
private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
8787
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
8888

8989
/**

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,13 +737,13 @@ private[spark] class Master(
737737
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
738738
try {
739739
val eventLogFile = app.desc.eventLogDir
740-
.map { dir => EventLoggingListener.getLogPath(dir, app.id) }
740+
.map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) }
741741
.getOrElse {
742742
// Event logging is not enabled for this application
743743
app.desc.appUiUrl = notFoundBasePath
744744
return false
745745
}
746-
746+
747747
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
748748

749749
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 63 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ private[spark] class EventLoggingListener(
6262
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
6363
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
6464
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
65+
private val compressionCodec =
66+
if (shouldCompress) {
67+
Some(CompressionCodec.createCodec(sparkConf))
68+
} else {
69+
None
70+
}
71+
private val compressionCodecName = compressionCodec.map(_.getClass.getCanonicalName)
6572

6673
// Only defined if the file system scheme is not local
6774
private var hadoopDataStream: Option[FSDataOutputStream] = None
@@ -80,7 +87,7 @@ private[spark] class EventLoggingListener(
8087
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
8188

8289
// Visible for tests only.
83-
private[scheduler] val logPath = getLogPath(logBaseDir, appId)
90+
private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
8491

8592
/**
8693
* Creates the log file in the configured log directory.
@@ -110,17 +117,12 @@ private[spark] class EventLoggingListener(
110117
hadoopDataStream = Some(fileSystem.create(path))
111118
hadoopDataStream.get
112119
}
113-
114-
val compressionCodec =
115-
if (shouldCompress) {
116-
Some(CompressionCodec.createCodec(sparkConf))
117-
} else {
118-
None
119-
}
120+
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
121+
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
120122

121123
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
122-
val logStream = initEventLog(new BufferedOutputStream(dstream, outputBufferSize),
123-
compressionCodec)
124+
125+
val logStream = initEventLog(bstream, compressionCodec)
124126
writer = Some(new PrintWriter(logStream))
125127

126128
logInfo("Logging events to %s".format(logPath))
@@ -202,11 +204,11 @@ private[spark] object EventLoggingListener extends Logging {
202204
val IN_PROGRESS = ".inprogress"
203205
val DEFAULT_LOG_DIR = "/tmp/spark-events"
204206

205-
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
207+
val EVENT_LOG_KEY = "EVENT_LOG"
208+
val SPARK_VERSION_KEY = "SPARK_VERSION"
209+
val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
206210

207-
// Marker for the end of header data in a log file. After this marker, log data, potentially
208-
// compressed, will be found.
209-
private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
211+
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
210212

211213
// To avoid corrupted files causing the heap to fill up. Value is arbitrary.
212214
private val MAX_HEADER_LINE_LENGTH = 4096
@@ -217,53 +219,60 @@ private[spark] object EventLoggingListener extends Logging {
217219
/**
218220
* Write metadata about the event log to the given stream.
219221
*
220-
* The header is a serialized version of a map, except it does not use Java serialization to
221-
* avoid incompatibilities between different JDKs. It writes one map entry per line, in
222-
* "key=value" format.
223-
*
224-
* The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code
225-
* can know when to stop.
222+
* The header is a single line of JSON in the beginning of the file. Note that this
223+
* assumes all metadata necessary to parse the log is also included in the file name.
224+
* The format needs to be kept in sync with the `openEventLog()` method below. Also, it
225+
* cannot change in new Spark versions without some other way of detecting the change.
226226
*
227-
* The format needs to be kept in sync with the openEventLog() method below. Also, it cannot
228-
* change in new Spark versions without some other way of detecting the change (like some
229-
* metadata encoded in the file name).
230-
*
231-
* @param logStream Raw output stream to the even log file.
227+
* @param logStream Raw output stream to the event log file.
232228
* @param compressionCodec Optional compression codec to use.
233-
* @return A stream where to write event log data. This may be a wrapper around the original
229+
* @return A stream to which event log data is written. This may be a wrapper around the original
234230
* stream (for example, when compression is enabled).
235231
*/
236232
def initEventLog(
237233
logStream: OutputStream,
238234
compressionCodec: Option[CompressionCodec]): OutputStream = {
239-
val meta = mutable.HashMap(("version" -> SPARK_VERSION))
235+
val metadata = new mutable.HashMap[String, String]
236+
// Some of these metadata are already encoded in the file name
237+
// Here we include them again within the file itself for completeness
238+
metadata += ("Event" -> Utils.getFormattedClassName(SparkListenerMetadataIdentifier))
239+
metadata += (SPARK_VERSION_KEY -> SPARK_VERSION)
240240
compressionCodec.foreach { codec =>
241-
meta += ("compressionCodec" -> codec.getClass().getName())
241+
metadata += (COMPRESSION_CODEC_KEY -> codec.getClass.getCanonicalName)
242242
}
243-
244-
def write(entry: String) = {
245-
val bytes = entry.getBytes(Charsets.UTF_8)
246-
if (bytes.length > MAX_HEADER_LINE_LENGTH) {
247-
throw new IOException(s"Header entry too long: ${entry}")
248-
}
249-
logStream.write(bytes, 0, bytes.length)
243+
val metadataJson = compact(render(JsonProtocol.mapToJson(metadata)))
244+
val metadataBytes = (metadataJson + "\n").getBytes(Charsets.UTF_8)
245+
if (metadataBytes.length > MAX_HEADER_LINE_LENGTH) {
246+
throw new IOException(s"Event log metadata too long: $metadataJson")
250247
}
251-
252-
meta.foreach { case (k, v) => write(s"$k=$v\n") }
253-
write(s"$HEADER_END_MARKER\n")
254-
compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
248+
logStream.write(metadataBytes, 0, metadataBytes.length)
249+
logStream
255250
}
256251

257252
/**
258253
* Return a file-system-safe path to the log file for the given application.
259254
*
255+
* Note that because we currently only create a single log file for each application,
256+
* we must encode all the information needed to parse this event log in the file name
257+
* instead of within the file itself. Otherwise, if the file is compressed, for instance,
258+
* we won't know which codec to use to decompress the metadata.
259+
*
260260
* @param logBaseDir Directory where the log file will be written.
261261
* @param appId A unique app ID.
262+
* @param compressionCodecName Name of the compression codec used to compress the contents
263+
* of the log, or None if compression is not enabled.
262264
* @return A path which consists of file-system-safe characters.
263265
*/
264-
def getLogPath(logBaseDir: String, appId: String): String = {
265-
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
266-
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
266+
def getLogPath(
267+
logBaseDir: String,
268+
appId: String,
269+
compressionCodecName: Option[String]): String = {
270+
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
271+
// e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1
272+
// e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec
273+
val logName = s"${EVENT_LOG_KEY}_${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" +
274+
compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }.getOrElse("")
275+
Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName.stripSuffix("/")
267276
}
268277

269278
/**
@@ -279,51 +288,21 @@ private[spark] object EventLoggingListener extends Logging {
279288
}
280289

281290
val in = new BufferedInputStream(fs.open(log))
282-
// Read a single line from the input stream without buffering.
283-
// We cannot use BufferedReader because we must avoid reading
284-
// beyond the end of the header, after which the content of the
285-
// file may be compressed.
286-
def readLine(): String = {
287-
val bytes = new ByteArrayOutputStream()
288-
var next = in.read()
289-
var count = 0
290-
while (next != '\n') {
291-
if (next == -1) {
292-
throw new IOException("Unexpected end of file.")
293-
}
294-
bytes.write(next)
295-
count = count + 1
296-
if (count > MAX_HEADER_LINE_LENGTH) {
297-
throw new IOException("Maximum header line length exceeded.")
298-
}
299-
next = in.read()
300-
}
301-
new String(bytes.toByteArray(), Charsets.UTF_8)
291+
292+
// Parse information from the log name
293+
val logName = log.getName
294+
val baseRegex = s"${EVENT_LOG_KEY}_(.*)_${SPARK_VERSION_KEY}_(.*)".r
295+
val compressionRegex = (baseRegex + s"_${COMPRESSION_CODEC_KEY}_(.*)").r
296+
val (sparkVersion, codecName) = logName match {
297+
case compressionRegex(_, version, _codecName) => (version, Some(_codecName))
298+
case baseRegex(_, version) => (version, None)
299+
case _ => throw new IllegalArgumentException(s"Malformed event log name: $logName")
300+
}
301+
val codec = codecName.map { c =>
302+
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
302303
}
303304

304-
// Parse the header metadata in the form of k=v pairs
305-
// This assumes that every line before the header end marker follows this format
306305
try {
307-
val meta = new mutable.HashMap[String, String]()
308-
var foundEndMarker = false
309-
while (!foundEndMarker) {
310-
readLine() match {
311-
case HEADER_END_MARKER =>
312-
foundEndMarker = true
313-
case entry =>
314-
val prop = entry.split("=", 2)
315-
if (prop.length != 2) {
316-
throw new IllegalArgumentException("Invalid metadata in log file.")
317-
}
318-
meta += (prop(0) -> prop(1))
319-
}
320-
}
321-
322-
val sparkVersion = meta.get("version").getOrElse(
323-
throw new IllegalArgumentException("Missing Spark version in log metadata."))
324-
val codec = meta.get("compressionCodec").map { codecName =>
325-
codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new SparkConf, codecName))
326-
}
327306
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
328307
} catch {
329308
case e: Exception =>

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
4949
val lines = Source.fromInputStream(logData).getLines()
5050
lines.foreach { line =>
5151
currentLine = line
52-
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
52+
JsonProtocol.sparkEventFromJson(parse(line)) match {
53+
case SparkListenerMetadataIdentifier => // Ignore metadata for now
54+
case event => postToAll(event)
55+
}
5356
lineNumber += 1
5457
}
5558
} catch {

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn
9898
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
9999
extends SparkListenerEvent
100100

101+
/**
102+
* A special dummy event used to identify the metadata header in event logs.
103+
* This is not actually posted anywhere.
104+
*/
105+
private[spark] case object SparkListenerMetadataIdentifier extends SparkListenerEvent
106+
101107
/**
102108
* Periodic updates from executors.
103109
* @param execId executor id

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
5858
listener.onExecutorAdded(executorAdded)
5959
case executorRemoved: SparkListenerExecutorRemoved =>
6060
listener.onExecutorRemoved(executorRemoved)
61+
case SparkListenerMetadataIdentifier =>
6162
}
6263
}
6364

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
1919

2020
import java.util.concurrent.Semaphore
2121

22-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
22+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, SPARK_VERSION}
2323
import org.apache.spark.deploy.{ApplicationDescription, Command}
2424
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
2525
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend(
8585
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
8686
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
8787
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
88-
appUIAddress, sc.eventLogDir)
88+
appUIAddress, SPARK_VERSION, sc.eventLogDir, sc.eventLogCodec)
8989

9090
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
9191
client.start()

0 commit comments

Comments
 (0)