Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bcdd070
add counter to monitor number of active SparkSessions
Jun 5, 2019
f333a30
clear session always in stop()
Jun 5, 2019
8fc95e9
added tests
Jun 6, 2019
ff60b84
Merge branch 'master' of github.com:apache/spark into vinooganesh/SPA…
Jun 10, 2019
f69cabb
responding to PR comments
Jun 10, 2019
3fafd2a
cleanup
Jun 10, 2019
0c9c426
adding ticket number to test
Jun 10, 2019
843491f
style
Jun 10, 2019
92c7b22
addressing sean's PR and updating test
Jun 14, 2019
d4e4e27
Merge branch 'master' into vinooganesh/SPARK-27958
Mar 21, 2020
61c6fed
first iteration of new proposal
Mar 27, 2020
b586bf9
Merge remote-tracking branch 'origin/master' into vinooganesh/SPARK-2…
Apr 5, 2020
387acb1
testing and style
Apr 5, 2020
99c5f64
style fix
Apr 5, 2020
7613046
Merge remote-tracking branch 'origin' into vinooganesh/SPARK-27958
vinooganesh Apr 30, 2020
7a70369
Merge branch 'master' into vinooganesh/SPARK-27958
vinooganesh May 5, 2020
5a1e0ab
remove unnecessary s
vinooganesh May 6, 2020
38dea00
Merge branch 'master' into vinooganesh/SPARK-27958
vinooganesh May 6, 2020
0c7e9df
remove lifecycle methods - context tracks listener
vinooganesh May 12, 2020
0284c79
remove unnecessary import
vinooganesh May 12, 2020
19f45da
Merge branch 'master' into vinooganesh/SPARK-27958
vinooganesh May 12, 2020
9573805
add ticket number to test
vinooganesh May 13, 2020
e877ee1
Merge branch 'master' into vinooganesh/SPARK-27958
vinooganesh May 18, 2020
e5563a7
move logic to listener
vinooganesh May 19, 2020
e5ef33a
adding test and cleanup
vinooganesh May 20, 2020
cfa1462
same class, don't need SparkSession
vinooganesh May 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import java.io.Closeable
import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -49,7 +49,6 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.{CallSite, Utils}


/**
* The entry point to programming Spark with the Dataset and DataFrame API.
*
Expand Down Expand Up @@ -940,15 +939,7 @@ object SparkSession extends Logging {
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
setDefaultSession(session)
setActiveSession(session)

// Register a successfully instantiated context to the singleton. This should be at the
// end of the class definition so that the singleton is updated only if there is no
// exception in the construction of the instance.
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
}
})
registerContextListener(sparkContext)
}

return session
Expand Down Expand Up @@ -1064,6 +1055,20 @@ object SparkSession extends Logging {
// Private methods from now on
////////////////////////////////////////////////////////////////////////////////////////

private val listenerRegistered: AtomicBoolean = new AtomicBoolean(false)

/** Register the AppEnd listener onto the Context */
private def registerContextListener(sparkContext: SparkContext): Unit = {
if (!listenerRegistered.get()) {
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
}
})
listenerRegistered.set(true)
}
}

/** The active SparkSession for the current thread. */
private val activeThreadSession = new InheritableThreadLocal[SparkSession]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,31 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
}

test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-app-SPARK-31354-1")
val context = new SparkContext(conf)
SparkSession
.builder()
.sparkContext(context)
.master("local")
.getOrCreate()
val postFirstCreation = context.listenerBus.listeners.size()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession
.builder()
.sparkContext(context)
.master("local")
.getOrCreate()
val postSecondCreation = context.listenerBus.listeners.size()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
assert(postFirstCreation == postSecondCreation)
}

test("SPARK-31532: should not propagate static sql configs to the existing" +
" active/default SparkSession") {
val session = SparkSession.builder()
Expand Down