From e62e2f760f2ed4683725cdb144920900fba8e92f Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Sun, 16 Jan 2022 23:39:30 -0500 Subject: [PATCH] Ensure touching sessionstate doesn't add repeated listeners --- .../spark/sql/internal/SharedState.scala | 2 +- .../spark/sql/SparkSessionBuilderSuite.scala | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index e894f39d9270..c945bc8829b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -109,7 +109,7 @@ private[sql] class SharedState( * A status store to query SQL status/metrics of this Spark application, based on SQL-specific * [[org.apache.spark.scheduler.SparkListenerEvent]]s. */ - val statusStore: SQLAppStatusStore = { + lazy val statusStore: SQLAppStatusStore = { val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] val listener = new SQLAppStatusListener(conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 0a7c684a6895..7ed3b7c89beb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -229,6 +229,35 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach wit SparkSession.clearActiveSession() SparkSession.clearDefaultSession() assert(postFirstCreation == postSecondCreation) + context.stop() + } + + test("SPARK-32165: Ensure Spark only initiates SharedState once across SparkSessions") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-app-SPARK-32165-1") + val context = new SparkContext(conf) + SparkSession + .builder() + .master("local") + .getOrCreate() + .sessionState // this touches the sessionState + val postFirstCreation = context.listenerBus.listeners.size() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + + SparkSession + .builder() + .sparkContext(context) + .master("local") + .getOrCreate() + .sessionState // this touches the sessionState + val postSecondCreation = context.listenerBus.listeners.size() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + + // minus 1 to account for ExecutionListenerBus + assert(postFirstCreation == postSecondCreation - 1) } test("SPARK-31532: should not propagate static sql configs to the existing" +