@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
2727import org .json4s .jackson .JsonMethods ._
2828import org .scalatest .{BeforeAndAfter , FunSuite }
2929
30- import org .apache .spark .{Logging , SparkConf , SparkContext }
30+ import org .apache .spark .{Logging , SparkConf , SparkContext , SPARK_VERSION }
3131import org .apache .spark .deploy .SparkHadoopUtil
3232import org .apache .spark .io ._
3333import org .apache .spark .util .{JsonProtocol , Utils }
@@ -104,6 +104,18 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
104104 testEventLogging(extraConf = Map (" spark.eventLog.overwrite" -> " true" ))
105105 }
106106
107+ test(" Event log name" ) {
108+ // without compression
109+ assert(s " file:/base-dir/EVENT_LOG_app1_SPARK_VERSION_ $SPARK_VERSION" ===
110+ EventLoggingListener .getLogPath(" /base-dir" , " app1" ))
111+ // with compression
112+ assert(s " file:/base-dir/EVENT_LOG_app1_SPARK_VERSION_ ${SPARK_VERSION }_COMPRESSION_CODEC_lzf " ===
113+ EventLoggingListener .getLogPath(" /base-dir" , " app1" , Some (" lzf" )))
114+ // illegal characters in app ID
115+ assert(s " file:/base-dir/EVENT_LOG_a-fine-mind_dollar_bills_1_SPARK_VERSION_ $SPARK_VERSION" ===
116+ EventLoggingListener .getLogPath(" /base-dir" , " a fine:mind$dollar{bills}1" ))
117+ }
118+
107119 /* ----------------- *
108120 * Actual test logic *
109121 * ----------------- */
@@ -145,13 +157,26 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
145157 assert(lines(0 ).contains(" SparkListenerMetadataIdentifier" ))
146158 assert(lines(1 ).contains(" SparkListenerApplicationStart" ))
147159 assert(lines(2 ).contains(" SparkListenerApplicationEnd" ))
160+ assertMetadataValid(lines(0 ), compressionCodec)
161+ assert(JsonProtocol .sparkEventFromJson(parse(lines(0 ))) === SparkListenerMetadataIdentifier )
148162 assert(JsonProtocol .sparkEventFromJson(parse(lines(1 ))) === applicationStart)
149163 assert(JsonProtocol .sparkEventFromJson(parse(lines(2 ))) === applicationEnd)
150164 } finally {
151165 logData.close()
152166 }
153167 }
154168
169+ /**
170+ * Assert that the line is a correct JSON representation of the event log metadata.
171+ */
172+ private def assertMetadataValid (line : String , compressionCodec : Option [String ] = None ): Unit = {
173+ val metadata = JsonProtocol .mapFromJson(parse(line))
174+ assert(metadata.size === 2 + compressionCodec.size)
175+ assert(metadata.get(" Event" ) === Some (SparkListenerMetadataIdentifier .toString))
176+ assert(metadata.get(EventLoggingListener .SPARK_VERSION_KEY ) === Some (SPARK_VERSION ))
177+ assert(metadata.get(EventLoggingListener .COMPRESSION_CODEC_KEY ) === compressionCodec)
178+ }
179+
155180 /**
156181 * Test end-to-end event logging functionality in an application.
157182 * This runs a simple Spark job and asserts that the expected events are logged when expected.
@@ -161,8 +186,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
161186 val sc = new SparkContext (" local-cluster[2,2,512]" , " test" , conf)
162187 assert(sc.eventLogger.isDefined)
163188 val eventLogger = sc.eventLogger.get
189+ val eventLogPath = eventLogger.logPath
164190 val expectedLogDir = testDir.toURI().toString()
165- assert(eventLogger.logPath.startsWith(expectedLogDir))
191+ assert(eventLogPath === EventLoggingListener .getLogPath(
192+ expectedLogDir, sc.applicationId, compressionCodec))
166193
167194 // Begin listening for events that trigger asserts
168195 val eventExistenceListener = new EventExistenceListener (eventLogger)
@@ -191,6 +218,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
191218 SparkListenerTaskStart ,
192219 SparkListenerTaskEnd ,
193220 SparkListenerApplicationEnd ).map(Utils .getFormattedClassName)
221+ // Verify that the first line is valid metadata
222+ assertMetadataValid(lines(0 ), compressionCodec)
194223 lines.foreach { line =>
195224 eventSet.foreach { event =>
196225 if (line.contains(event)) {
0 commit comments