Skip to content

Commit 0f2afd3

Browse files
vinooganeshVinoo GaneshVinoo Ganesh
authored andcommitted
[SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener
## What changes were proposed in this pull request? This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here. This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus. Before running this PR, the following code: ``` SparkSession.builder().master("local").getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() SparkSession.builder().master("local").getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() ``` would result in a SparkContext with the following listeners on the listener bus: ``` [org.apache.spark.status.AppStatusListener5f610071, org.apache.spark.HeartbeatReceiverd400c17, org.apache.spark.sql.SparkSession$$anon$125849aeb, <-First instance org.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- Second instance ``` After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus: ``` [org.apache.spark.status.AppStatusListener5f610071, org.apache.spark.HeartbeatReceiverd400c17, org.apache.spark.sql.SparkSession$$anon$125849aeb] <-One instance ``` ## How was this patch tested? * Unit test included as a part of the PR Closes #28128 from vinooganesh/vinooganesh/SPARK-27958. Lead-authored-by: Vinoo Ganesh <[email protected]> Co-authored-by: Vinoo Ganesh <[email protected]> Co-authored-by: Vinoo Ganesh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit dae7988) Signed-off-by: Wenchen Fan <[email protected]>
1 parent b4df7b5 commit 0f2afd3

2 files changed

Lines changed: 41 additions & 11 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql
1919

2020
import java.io.Closeable
2121
import java.util.concurrent.TimeUnit._
22-
import java.util.concurrent.atomic.AtomicReference
22+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
2323

2424
import scala.collection.JavaConverters._
2525
import scala.reflect.runtime.universe.TypeTag
@@ -49,7 +49,6 @@ import org.apache.spark.sql.types.{DataType, StructType}
4949
import org.apache.spark.sql.util.ExecutionListenerManager
5050
import org.apache.spark.util.{CallSite, Utils}
5151

52-
5352
/**
5453
* The entry point to programming Spark with the Dataset and DataFrame API.
5554
*
@@ -940,15 +939,7 @@ object SparkSession extends Logging {
940939
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
941940
setDefaultSession(session)
942941
setActiveSession(session)
943-
944-
// Register a successfully instantiated context to the singleton. This should be at the
945-
// end of the class definition so that the singleton is updated only if there is no
946-
// exception in the construction of the instance.
947-
sparkContext.addSparkListener(new SparkListener {
948-
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
949-
defaultSession.set(null)
950-
}
951-
})
942+
registerContextListener(sparkContext)
952943
}
953944

954945
return session
@@ -1064,6 +1055,20 @@ object SparkSession extends Logging {
10641055
// Private methods from now on
10651056
////////////////////////////////////////////////////////////////////////////////////////
10661057

1058+
private val listenerRegistered: AtomicBoolean = new AtomicBoolean(false)
1059+
1060+
/** Register the AppEnd listener onto the Context */
1061+
private def registerContextListener(sparkContext: SparkContext): Unit = {
1062+
if (!listenerRegistered.get()) {
1063+
sparkContext.addSparkListener(new SparkListener {
1064+
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
1065+
defaultSession.set(null)
1066+
}
1067+
})
1068+
listenerRegistered.set(true)
1069+
}
1070+
}
1071+
10671072
/** The active SparkSession for the current thread. */
10681073
private val activeThreadSession = new InheritableThreadLocal[SparkSession]
10691074

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,31 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
169169
assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
170170
}
171171

172+
test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") {
173+
val conf = new SparkConf()
174+
.setMaster("local")
175+
.setAppName("test-app-SPARK-31354-1")
176+
val context = new SparkContext(conf)
177+
SparkSession
178+
.builder()
179+
.sparkContext(context)
180+
.master("local")
181+
.getOrCreate()
182+
val postFirstCreation = context.listenerBus.listeners.size()
183+
SparkSession.clearActiveSession()
184+
SparkSession.clearDefaultSession()
185+
186+
SparkSession
187+
.builder()
188+
.sparkContext(context)
189+
.master("local")
190+
.getOrCreate()
191+
val postSecondCreation = context.listenerBus.listeners.size()
192+
SparkSession.clearActiveSession()
193+
SparkSession.clearDefaultSession()
194+
assert(postFirstCreation == postSecondCreation)
195+
}
196+
172197
test("SPARK-31532: should not propagate static sql configs to the existing" +
173198
" active/default SparkSession") {
174199
val session = SparkSession.builder()

0 commit comments

Comments
 (0)