Skip to content

Commit 27c9a6c

Browse files
author
Andrew Or
committed
Address review feedback
Things changed in this commit: (1) No more metadata in log content (2) No more Spark version in log file name (3) Use short name for compression codec in log file name
1 parent 519e51a commit 27c9a6c

17 files changed

Lines changed: 74 additions & 151 deletions

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
237237
private[spark] val eventLogCodec: Option[String] = {
238238
val compress = conf.getBoolean("spark.eventLog.compress", false)
239239
if (compress && isEventLogEnabled) {
240-
Some(CompressionCodec.createCodec(conf)).map(_.getClass.getCanonicalName)
240+
Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
241241
} else {
242242
None
243243
}

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ private[spark] class ApplicationDescription(
2323
val memoryPerSlave: Int,
2424
val command: Command,
2525
var appUiUrl: String,
26-
val sparkVersion: String,
2726
val eventLogDir: Option[String] = None,
27+
// short name of compression codec used when writing event logs, if any (e.g. lzf)
2828
val eventLogCodec: Option[String] = None)
2929
extends Serializable {
3030

@@ -36,11 +36,10 @@ private[spark] class ApplicationDescription(
3636
memoryPerSlave: Int = memoryPerSlave,
3737
command: Command = command,
3838
appUiUrl: String = appUiUrl,
39-
sparkVersion: String = sparkVersion,
4039
eventLogDir: Option[String] = eventLogDir,
4140
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
4241
new ApplicationDescription(
43-
name, maxCores, memoryPerSlave, command, appUiUrl, sparkVersion, eventLogDir, eventLogCodec)
42+
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
4443

4544
override def toString: String = "ApplicationDescription(" + name + ")"
4645
}

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ private[spark] object TestClient {
4949
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
5151
val desc = new ApplicationDescription("TestClient", Some(1), 512,
52-
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()),
53-
"ignored", "1.2.3")
52+
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
5453
val listener = new TestListener
5554
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
5655
client.start()

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
291291
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
292292
val logPath = eventLog.getPath()
293293
logInfo(s"Replaying log path: $logPath")
294-
val (logInput, sparkVersion) =
294+
val logInput =
295295
if (isLegacyLogDirectory(eventLog)) {
296296
openLegacyEventLog(logPath)
297297
} else {
@@ -300,7 +300,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
300300
try {
301301
val appListener = new ApplicationEventListener
302302
bus.addListener(appListener)
303-
bus.replay(logInput, sparkVersion, logPath.toString)
303+
bus.replay(logInput, logPath.toString)
304304
new FsApplicationHistoryInfo(
305305
logPath.getName(),
306306
appListener.appId.getOrElse(logPath.getName()),
@@ -322,28 +322,22 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
322322
*
323323
* @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
324324
*/
325-
private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
325+
private[history] def openLegacyEventLog(dir: Path): InputStream = {
326326
val children = fs.listStatus(dir)
327327
var eventLogPath: Path = null
328328
var codecName: Option[String] = None
329-
var sparkVersion: String = null
330329

331330
children.foreach { child =>
332331
child.getPath().getName() match {
333332
case name if name.startsWith(LOG_PREFIX) =>
334333
eventLogPath = child.getPath()
335-
336334
case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
337335
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
338-
339-
case version if version.startsWith(SPARK_VERSION_PREFIX) =>
340-
sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
341-
342336
case _ =>
343337
}
344338
}
345339

346-
if (eventLogPath == null || sparkVersion == null) {
340+
if (eventLogPath == null) {
347341
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
348342
}
349343

@@ -355,7 +349,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
355349
}
356350

357351
val in = new BufferedInputStream(fs.open(eventLogPath))
358-
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
352+
codec.map(_.compressedInputStream(in)).getOrElse(in)
359353
}
360354

361355
/**

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -756,12 +756,12 @@ private[spark] class Master(
756756
return false
757757
}
758758

759-
val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
759+
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
760760
val replayBus = new ReplayListenerBus()
761761
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
762762
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
763763
try {
764-
replayBus.replay(logInput, sparkVersion, eventLogFile)
764+
replayBus.replay(logInput, eventLogFile)
765765
} finally {
766766
logInput.close()
767767
}

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.annotation.DeveloperApi
2828
import org.apache.spark.util.Utils
29-
import org.apache.spark.Logging
3029

3130
/**
3231
* :: DeveloperApi ::
@@ -53,8 +52,12 @@ private[spark] object CompressionCodec {
5352
"lzf" -> classOf[LZFCompressionCodec].getName,
5453
"snappy" -> classOf[SnappyCompressionCodec].getName)
5554

55+
def getCodecName(conf: SparkConf): String = {
56+
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
57+
}
58+
5659
def createCodec(conf: SparkConf): CompressionCodec = {
57-
createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
60+
createCodec(conf, getCodecName(conf))
5861
}
5962

6063
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
@@ -71,6 +74,21 @@ private[spark] object CompressionCodec {
7174
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
7275
}
7376

77+
/**
78+
* Return the short version of the given codec name.
79+
* If it is already a short name, just return it.
80+
*/
81+
def getShortName(codecName: String): String = {
82+
if (shortCompressionCodecNames.contains(codecName)) {
83+
codecName
84+
} else {
85+
shortCompressionCodecNames
86+
.collect { case (k, v) if v == codecName => k }
87+
.headOption
88+
.getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
89+
}
90+
}
91+
7492
val FALLBACK_COMPRESSION_CODEC = "lzf"
7593
val DEFAULT_COMPRESSION_CODEC = "snappy"
7694
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 20 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,13 @@ import java.net.URI
2323
import scala.collection.mutable
2424
import scala.collection.mutable.ArrayBuffer
2525

26-
import com.google.common.base.Charsets
2726
import org.apache.hadoop.conf.Configuration
2827
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
2928
import org.apache.hadoop.fs.permission.FsPermission
3029
import org.json4s.JsonAST.JValue
3130
import org.json4s.jackson.JsonMethods._
3231

33-
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION}
32+
import org.apache.spark.{Logging, SparkConf}
3433
import org.apache.spark.deploy.SparkHadoopUtil
3534
import org.apache.spark.io.CompressionCodec
3635
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -68,7 +67,9 @@ private[spark] class EventLoggingListener(
6867
} else {
6968
None
7069
}
71-
private val compressionCodecName = compressionCodec.map(_.getClass.getCanonicalName)
70+
private val compressionCodecName = compressionCodec.map { c =>
71+
CompressionCodec.getShortName(c.getClass.getName)
72+
}
7273

7374
// Only defined if the file system scheme is not local
7475
private var hadoopDataStream: Option[FSDataOutputStream] = None
@@ -121,11 +122,8 @@ private[spark] class EventLoggingListener(
121122
try {
122123
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
123124
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
124-
125125
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
126-
127-
val logStream = initEventLog(bstream, compressionCodec)
128-
writer = Some(new PrintWriter(logStream))
126+
writer = Some(new PrintWriter(bstream))
129127
logInfo("Logging events to %s".format(logPath))
130128
} catch {
131129
case e: Exception =>
@@ -214,56 +212,21 @@ private[spark] object EventLoggingListener extends Logging {
214212

215213
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
216214

217-
// To avoid corrupted files causing the heap to fill up. Value is arbitrary.
218-
private val MAX_HEADER_LINE_LENGTH = 4096
219-
220215
// A cache for compression codecs to avoid creating the same codec many times
221216
private val codecMap = new mutable.HashMap[String, CompressionCodec]
222217

223-
/**
224-
* Write metadata about the event log to the given stream.
225-
*
226-
* The header is a single line of JSON in the beginning of the file. Note that this
227-
* assumes all metadata necessary to parse the log is also included in the file name.
228-
* The format needs to be kept in sync with the `openEventLog()` method below. Also, it
229-
* cannot change in new Spark versions without some other way of detecting the change.
230-
*
231-
* @param logStream Raw output stream to the event log file.
232-
* @param compressionCodec Optional compression codec to use.
233-
* @return A stream to which event log data is written. This may be a wrapper around the original
234-
* stream (for example, when compression is enabled).
235-
*/
236-
def initEventLog(
237-
logStream: OutputStream,
238-
compressionCodec: Option[CompressionCodec]): OutputStream = {
239-
val metadata = new mutable.HashMap[String, String]
240-
// Some of these metadata are already encoded in the file name
241-
// Here we include them again within the file itself for completeness
242-
metadata += ("Event" -> Utils.getFormattedClassName(SparkListenerMetadataIdentifier))
243-
metadata += (SPARK_VERSION_KEY -> SPARK_VERSION)
244-
compressionCodec.foreach { codec =>
245-
metadata += (COMPRESSION_CODEC_KEY -> codec.getClass.getCanonicalName)
246-
}
247-
val metadataJson = compact(render(JsonProtocol.mapToJson(metadata)))
248-
val metadataBytes = (metadataJson + "\n").getBytes(Charsets.UTF_8)
249-
if (metadataBytes.length > MAX_HEADER_LINE_LENGTH) {
250-
throw new IOException(s"Event log metadata too long: $metadataJson")
251-
}
252-
logStream.write(metadataBytes, 0, metadataBytes.length)
253-
logStream
254-
}
255-
256218
/**
257219
* Return a file-system-safe path to the log file for the given application.
258220
*
259221
* Note that because we currently only create a single log file for each application,
260222
* we must encode all the information needed to parse this event log in the file name
261223
* instead of within the file itself. Otherwise, if the file is compressed, for instance,
262-
* we won't know which codec to use to decompress the metadata.
224+
* we won't know which codec to use to decompress the metadata needed to open the file in
225+
* the first place.
263226
*
264227
* @param logBaseDir Directory where the log file will be written.
265228
* @param appId A unique app ID.
266-
* @param compressionCodecName Name of the compression codec used to compress the contents
229+
* @param compressionCodecName Name to identify the codec used to compress the contents
267230
* of the log, or None if compression is not enabled.
268231
* @return A path which consists of file-system-safe characters.
269232
*/
@@ -272,22 +235,19 @@ private[spark] object EventLoggingListener extends Logging {
272235
appId: String,
273236
compressionCodecName: Option[String] = None): String = {
274237
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
275-
// e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1
276-
// e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec
277-
val logName = s"${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" +
278-
compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }.getOrElse("")
238+
// e.g. app_123, app_123_COMPRESSION_CODEC_lzf
239+
val logName = sanitizedAppId + compressionCodecName
240+
.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }
241+
.getOrElse("")
279242
Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
280243
}
281244

282245
/**
283246
* Opens an event log file and returns an input stream that contains the event data.
284247
*
285-
* The first line of the returned input stream is a JSON header that describes the metadata
286-
* of the event log.
287-
*
288-
* @return 2-tuple (event input stream, Spark version of event data)
248+
* @return input stream that holds one JSON serialized event per line
289249
*/
290-
def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
250+
def openEventLog(log: Path, fs: FileSystem): InputStream = {
291251
// It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
292252
// IOException when a file does not exist, so try our best to throw a proper exception.
293253
if (!fs.exists(log)) {
@@ -296,21 +256,19 @@ private[spark] object EventLoggingListener extends Logging {
296256

297257
val in = new BufferedInputStream(fs.open(log))
298258

299-
// Parse information from the log name
259+
// Parse compression codec from the log name
300260
val logName = log.getName
301-
val baseRegex = s"(.*)_${SPARK_VERSION_KEY}_(.*)".r
302-
val compressionRegex = (baseRegex + s"_${COMPRESSION_CODEC_KEY}_(.*)").r
303-
val (sparkVersion, codecName) = logName match {
304-
case compressionRegex(_, version, _codecName) => (version, Some(_codecName))
305-
case baseRegex(_, version) => (version, None)
306-
case _ => throw new IllegalArgumentException(s"Malformed event log name: $logName")
261+
val compressionRegex = s".*_${COMPRESSION_CODEC_KEY}_(.*)".r
262+
val codecName: Option[String] = logName match {
263+
case compressionRegex(_codecName) => Some(_codecName)
264+
case _ => None
307265
}
308266
val codec = codecName.map { c =>
309267
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
310268
}
311269

312270
try {
313-
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
271+
codec.map(_.compressedInputStream(in)).getOrElse(in)
314272
} catch {
315273
case e: Exception =>
316274
in.close()

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,16 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
3939
* error is thrown by this method.
4040
*
4141
* @param logData Stream containing event log data.
42-
* @param version Spark version that generated the events.
4342
* @param sourceName Filename (or other source identifier) from whence @logData is being read
4443
*/
45-
def replay(logData: InputStream, version: String, sourceName: String) {
44+
def replay(logData: InputStream, sourceName: String): Unit = {
4645
var currentLine: String = null
4746
var lineNumber: Int = 1
4847
try {
4948
val lines = Source.fromInputStream(logData).getLines()
5049
lines.foreach { line =>
5150
currentLine = line
52-
JsonProtocol.sparkEventFromJson(parse(line)) match {
53-
case SparkListenerMetadataIdentifier => // Ignore metadata for now
54-
case event => postToAll(event)
55-
}
51+
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
5652
lineNumber += 1
5753
}
5854
} catch {

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,6 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn
9898
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
9999
extends SparkListenerEvent
100100

101-
/**
102-
* A special dummy event used to identify the metadata header in event logs.
103-
* This is not actually posted anywhere.
104-
*/
105-
private[spark] case object SparkListenerMetadataIdentifier extends SparkListenerEvent
106-
107101
/**
108102
* Periodic updates from executors.
109103
* @param execId executor id

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
5858
listener.onExecutorAdded(executorAdded)
5959
case executorRemoved: SparkListenerExecutorRemoved =>
6060
listener.onExecutorRemoved(executorRemoved)
61-
case SparkListenerMetadataIdentifier =>
6261
}
6362
}
6463

0 commit comments

Comments
 (0)