Skip to content

Commit e0ba2f8

Browse files
committed
Fix test failures caused by race condition in processing/mutating events
An event can be mutated by the DAGScheduler in between being procssed by one listener and being processed by another. This causes the ReplayListenerSuite to be flaky. This commit ensures that the event logged is the same as the original event received by the EventLoggingListener.
1 parent b990453 commit e0ba2f8

3 files changed

Lines changed: 61 additions & 55 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.scheduler
1919

2020
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.{FileSystem, Path}
25+
import org.json4s.JsonAST.JValue
2426
import org.json4s.jackson.JsonMethods._
2527

2628
import org.apache.spark.{Logging, SparkConf, SparkContext}
@@ -48,14 +50,18 @@ private[spark] class EventLoggingListener(
4850

4951
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
5052
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
53+
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
5154
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
5255
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
5356
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
5457
val logDir = logBaseDir + "/" + name
5558

56-
private val logger =
59+
protected val logger =
5760
new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite)
5861

62+
// For testing. Keep track of all JSON serialized events that have been logged.
63+
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
64+
5965
/**
6066
* Begin logging events.
6167
* If compression is used, log a file that indicates which compression library is used.
@@ -73,11 +79,14 @@ private[spark] class EventLoggingListener(
7379

7480
/** Log the event as JSON. */
7581
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
76-
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
77-
logger.logLine(eventJson)
82+
val eventJson = JsonProtocol.sparkEventToJson(event)
83+
logger.logLine(compact(render(eventJson)))
7884
if (flushLogger) {
7985
logger.flush()
8086
}
87+
if (testing) {
88+
loggedEvents += eventJson
89+
}
8190
}
8291

8392
// Events that do not trigger a flush

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ object EventLoggingListenerSuite {
380380
compressionCodec: Option[String] = None) = {
381381
val conf = new SparkConf
382382
conf.set("spark.eventLog.enabled", "true")
383+
conf.set("spark.eventLog.testing", "true")
383384
logDir.foreach { dir =>
384385
conf.set("spark.eventLog.dir", dir)
385386
}

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ package org.apache.spark.scheduler
1919

2020
import java.io.PrintWriter
2121

22-
import scala.collection.mutable.ArrayBuffer
22+
import scala.util.Try
2323

2424
import org.apache.hadoop.fs.Path
2525
import org.json4s.jackson.JsonMethods._
2626
import org.scalatest.{BeforeAndAfter, FunSuite}
2727

2828
import org.apache.spark.SparkContext._
2929
import org.apache.spark.{SparkConf, SparkContext}
30-
import org.apache.spark.util.{JsonProtocol, Utils}
3130
import org.apache.spark.io.CompressionCodec
31+
import org.apache.spark.util.{JsonProtocol, Utils}
3232

3333
/**
3434
* Test for whether ReplayListenerBus replays events from logs correctly.
@@ -41,7 +41,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
4141
)
4242

4343
after {
44-
44+
Try { fileSystem.delete(new Path("/tmp/events.txt"), true) }
45+
Try { fileSystem.delete(new Path("/tmp/test-replay"), true) }
4546
}
4647

4748
test("Simple replay") {
@@ -54,10 +55,18 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
5455
}
5556
}
5657

58+
// This assumes the correctness of EventLoggingListener
5759
test("End-to-end replay") {
5860
testApplicationReplay()
5961
}
6062

63+
// This assumes the correctness of EventLoggingListener
64+
test("End-to-end replay with compression") {
65+
allCompressionCodecs.foreach { codec =>
66+
testApplicationReplay(Some(codec))
67+
}
68+
}
69+
6170

6271
/* ----------------- *
6372
* Actual test logic *
@@ -78,31 +87,37 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
7887
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
7988
writer.close()
8089
val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
81-
val eventKeeper = new EventKeeper
82-
replayer.addListener(eventKeeper)
90+
val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName)
91+
val eventMonster = new EventMonster(conf)
92+
replayer.addListener(eventMonster)
8393
replayer.replay()
84-
assert(eventKeeper.events.size === 2)
85-
assert(eventKeeper.events(0) === applicationStart)
86-
assert(eventKeeper.events(1) === applicationEnd)
94+
assert(eventMonster.loggedEvents.size === 2)
95+
assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
96+
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
8797
}
8898

8999
/**
100+
* Test end-to-end replaying of events.
90101
*
102+
* This test runs a few simple jobs with event logging enabled, and compares each emitted
103+
* event to the corresponding event replayed from the event logs. This test makes the
104+
* assumption that the event logging behavior is correct (tested in a separate suite).
91105
*/
92106
private def testApplicationReplay(codecName: Option[String] = None) {
93107
val logDir = "/tmp/test-replay"
94-
val logDirPath = new Path(logDir)
95108
val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName)
96109
val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
97-
val eventKeeper = new EventKeeper
98-
sc.addSparkListener(eventKeeper)
99110

100-
// Run a job
101-
sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().cache().count()
111+
// Run a few jobs
112+
sc.parallelize(1 to 100, 1).count()
113+
sc.parallelize(1 to 100, 2).map(i => (i, i)).count()
114+
sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count()
115+
sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count()
102116
sc.stop()
103117

104-
// Find the log file
105-
val applications = fileSystem.listStatus(logDirPath)
118+
// Prepare information needed for replay
119+
val codec = codecName.map(getCompressionCodec)
120+
val applications = fileSystem.listStatus(new Path(logDir))
106121
assert(applications != null && applications.size > 0)
107122
val eventLogDir =
108123
applications.filter(_.getPath.getName.startsWith("test-replay")).sortBy(_.getAccessTime).last
@@ -111,53 +126,34 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
111126
assert(logFiles != null && logFiles.size > 0)
112127
val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_"))
113128
assert(logFile.isDefined)
114-
val codec = codecName.map(getCompressionCodec)
115129

116130
// Replay events
117131
val replayer = new ReplayListenerBus(Seq(logFile.get.getPath), fileSystem, codec)
118-
val replayEventKeeper = new EventKeeper
119-
replayer.addListener(replayEventKeeper)
132+
val eventMonster = new EventMonster(conf)
133+
replayer.addListener(eventMonster)
120134
replayer.replay()
121135

122136
// Verify the same events are replayed in the same order
123-
val filteredEvents = filterSchedulerEvents(eventKeeper.events)
124-
val filteredReplayEvents = filterSchedulerEvents(replayEventKeeper.events)
125-
assert(filteredEvents.size === filteredReplayEvents.size)
126-
filteredEvents.zip(filteredReplayEvents).foreach { case (e1, e2) =>
127-
assert(JsonProtocol.sparkEventToJson(e1) === JsonProtocol.sparkEventToJson(e2))
128-
}
137+
assert(sc.eventLogger.isDefined)
138+
val originalEvents = sc.eventLogger.get.loggedEvents
139+
val replayedEvents = eventMonster.loggedEvents
140+
originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
129141
}
130142

131143
/**
132-
* A simple listener that keeps all events it receives
144+
* A simple listener that buffers all the events it receives.
145+
*
146+
* The event buffering functionality must be implemented within EventLoggingListener itself.
147+
* This is because of the following race condition: the event may be mutated between being
148+
* processed by one listener and being processed by another. Thus, in order to establish
149+
* a fair comparison between the original events and the replayed events, both functionalities
150+
* must be implemented within one listener (i.e. the EventLoggingListener).
151+
*
152+
* This child listener inherits only the event buffering functionality, but does not actually
153+
* log the events.
133154
*/
134-
private class EventKeeper extends SparkListener {
135-
val events = new ArrayBuffer[SparkListenerEvent]
136-
override def onStageSubmitted(e: SparkListenerStageSubmitted) { events += e }
137-
override def onStageCompleted(e: SparkListenerStageCompleted) { events += e }
138-
override def onTaskStart(e: SparkListenerTaskStart) { events += e }
139-
override def onTaskGettingResult(e: SparkListenerTaskGettingResult) { events += e }
140-
override def onTaskEnd(e: SparkListenerTaskEnd) { events += e }
141-
override def onJobStart(e: SparkListenerJobStart) { events += e }
142-
override def onJobEnd(e: SparkListenerJobEnd) { events += e }
143-
override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) { events += e }
144-
override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = { events += e }
145-
override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = { events += e }
146-
override def onUnpersistRDD(e: SparkListenerUnpersistRDD) { events += e }
147-
override def onApplicationStart(e: SparkListenerApplicationStart) { events += e }
148-
override def onApplicationEnd(e: SparkListenerApplicationEnd) { events += e }
149-
}
150-
151-
private def filterSchedulerEvents(events: Seq[SparkListenerEvent]): Seq[SparkListenerEvent] = {
152-
events.collect {
153-
case e: SparkListenerStageSubmitted => e
154-
case e: SparkListenerStageCompleted => e
155-
case e: SparkListenerTaskStart => e
156-
case e: SparkListenerTaskGettingResult => e
157-
case e: SparkListenerTaskEnd => e
158-
case e: SparkListenerJobStart => e
159-
case e: SparkListenerJobEnd => e
160-
}
155+
private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
156+
logger.close()
161157
}
162158

163159
private def getCompressionCodec(codecName: String) = {

0 commit comments

Comments
 (0)