File tree Expand file tree Collapse file tree 2 files changed +6
-0
lines changed
sql/core/src/test/scala/org/apache/spark/sql Expand file tree Collapse file tree 2 files changed +6
-0
lines changed Original file line number Diff line number Diff line change @@ -155,19 +155,22 @@ class SessionStateSuite extends SparkFunSuite {
155155 assert(forkedSession ne activeSession)
156156 assert(forkedSession.listenerManager ne activeSession.listenerManager)
157157 runCollectQueryOn(forkedSession)
158+ activeSession.sparkContext.listenerBus.waitUntilEmpty(1000 )
158159 assert(collectorA.commands.length == 1 ) // forked should callback to A
159160 assert(collectorA.commands(0 ) == " collect" )
160161
161162 // independence
162163 // => changes to forked do not affect original
163164 forkedSession.listenerManager.register(collectorB)
164165 runCollectQueryOn(activeSession)
166+ activeSession.sparkContext.listenerBus.waitUntilEmpty(1000 )
165167 assert(collectorB.commands.isEmpty) // original should not callback to B
166168 assert(collectorA.commands.length == 2 ) // original should still callback to A
167169 assert(collectorA.commands(1 ) == " collect" )
168170 // <= changes to original do not affect forked
169171 activeSession.listenerManager.register(collectorC)
170172 runCollectQueryOn(forkedSession)
173+ activeSession.sparkContext.listenerBus.waitUntilEmpty(1000 )
171174 assert(collectorC.commands.isEmpty) // forked should not callback to C
172175 assert(collectorA.commands.length == 3 ) // forked should still callback to A
173176 assert(collectorB.commands.length == 1 ) // forked should still callback to B
Original file line number Diff line number Diff line change @@ -356,10 +356,13 @@ class UDFSuite extends QueryTest with SharedSQLContext {
356356 .withColumn(" b" , udf1($" a" , lit(10 )))
357357 df.cache()
358358 df.write.saveAsTable(" t" )
359+ sparkContext.listenerBus.waitUntilEmpty(1000 )
359360 assert(numTotalCachedHit == 1 , " expected to be cached in saveAsTable" )
360361 df.write.insertInto(" t" )
362+ sparkContext.listenerBus.waitUntilEmpty(1000 )
361363 assert(numTotalCachedHit == 2 , " expected to be cached in insertInto" )
362364 df.write.save(path.getCanonicalPath)
365+ sparkContext.listenerBus.waitUntilEmpty(1000 )
363366 assert(numTotalCachedHit == 3 , " expected to be cached in save for native" )
364367 }
365368 }
You can’t perform that action at this time.
0 commit comments