Skip to content
Closed
15 changes: 12 additions & 3 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,10 @@ def getOrCreate(self) -> "SparkSession":
# Do not update `SparkConf` for existing `SparkContext`, as it's shared
# by all sessions.
session = SparkSession(sc, options=self._options)
for key, value in self._options.items():
session._jsparkSession.sessionState().conf().setConfString(key, value)
else:
getattr(
getattr(session._jvm, "SparkSession$"), "MODULE$"
).applyModifiableSettings(session._jsparkSession, self._options)
return session

builder = Builder()
Expand All @@ -291,7 +293,7 @@ def __init__(
self,
sparkContext: SparkContext,
jsparkSession: Optional[JavaObject] = None,
options: Optional[Dict[str, Any]] = {},
options: Dict[str, Any] = {},
):
from pyspark.sql.context import SQLContext

Expand All @@ -304,8 +306,15 @@ def __init__(
and not self._jvm.SparkSession.getDefaultSession().get().sparkContext().isStopped()
):
jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
getattr(getattr(self._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings(
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, this actually shows a lot of new warnings (see also #34893). Another reproducer:

./bin/spark-shell --conf spark.executor.memory=8g --conf spark.driver.memory=8g
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)("a")
21/12/15 14:03:15 WARN SparkSession: Using an existing SparkSession; the static sql configurations will not take effect.
Column<'<lambda>(a)'>

There are more places to fix like this:

ml/util.py:            self._sparkSession = SparkSession.builder.getOrCreate()
sql/column.py:            spark = SparkSession.builder.getOrCreate()
sql/context.py:            sparkSession = SparkSession.builder.getOrCreate()
sql/readwriter.py:        spark = SparkSession.builder.getOrCreate()
sql/readwriter.py:        spark = SparkSession.builder.getOrCreate()
sql/session.py:                return SparkSession.builder.getOrCreate()
sql/session.py:        return SparkSession.builder.getOrCreate()
sql/streaming.py:        spark = SparkSession.builder.getOrCreate()
sql/streaming.py:        spark = SparkSession.builder.getOrCreate()
sql/udf.py:        spark = SparkSession.builder.getOrCreate()

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can't make it in Spark 3.3, I think maybe it's just safer to revert #34757 #34732 and #34559 for now because each patch here will introduce either:

  1. Unexpected configuration propagation of static SQL configuration, or
  2. Too much warnings

Separately, I still feel 8424f55 is inefficient. We don't know which configurations don't take effect, or why it keeps complaining (see the example above) for which configuration. We should probably at least print out the keys or lower the level of log.

cc @AngersZhuuuu @yaooqinn @maropu @dongjoon-hyun FYI

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinrong-databricks actually this is more Python side codes. Are you interested in creating a followup?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably at least print out the keys or lower the level of log.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the head-ups, @HyukjinKwon .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, I will fix it and keep you updated. Thanks!

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably at least print out the keys or lower the level of log.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a PR at #35001 👍

jsparkSession, options
)
else:
jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a short comment here that this is the case when we can set static configurations

else:
getattr(getattr(self._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings(
jsparkSession, options
)
self._jsparkSession = jsparkSession
self._jwrapped = self._jsparkSession.sqlContext()
self._wrapped = SQLContext(self._sc, self, self._jwrapped)
Expand Down
8 changes: 5 additions & 3 deletions python/pyspark/sql/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,14 @@ def test_another_spark_session(self):
session2 = None
try:
session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
session2 = SparkSession.builder.config(
"spark.sql.codegen.comments", "true"
).getOrCreate()

self.assertEqual(session1.conf.get("key1"), "value1")
self.assertEqual(session2.conf.get("key1"), "value1")
self.assertEqual(session1.conf.get("key2"), "value2")
self.assertEqual(session2.conf.get("key2"), "value2")
self.assertEqual(session1.conf.get("spark.sql.codegen.comments"), "false")
self.assertEqual(session2.conf.get("spark.sql.codegen.comments"), "false")
self.assertEqual(session1.sparkContext, session2.sparkContext)

self.assertEqual(session1.sparkContext.getConf().get("key1"), "value1")
Expand Down
42 changes: 24 additions & 18 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ object SparkSession extends Logging {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
applyModifiableSettings(session)
applyModifiableSettings(session, new java.util.HashMap[String, String](options.asJava))
return session
}

Expand All @@ -939,7 +939,7 @@ object SparkSession extends Logging {
// If the current thread does not have an active session, get it from the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
applyModifiableSettings(session)
applyModifiableSettings(session, new java.util.HashMap[String, String](options.asJava))
return session
}

Expand Down Expand Up @@ -967,22 +967,6 @@ object SparkSession extends Logging {

return session
}

private def applyModifiableSettings(session: SparkSession): Unit = {
val (staticConfs, otherConfs) =
options.partition(kv => SQLConf.isStaticConfigKey(kv._1))

otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }

if (staticConfs.nonEmpty) {
logWarning("Using an existing SparkSession; the static sql configurations will not take" +
" effect.")
}
if (otherConfs.nonEmpty) {
logWarning("Using an existing SparkSession; some spark core configurations may not take" +
" effect.")
}
}
}

/**
Expand Down Expand Up @@ -1074,6 +1058,28 @@ object SparkSession extends Logging {
throw new IllegalStateException("No active or default Spark session found")))
}

/**
* Apply modifiable settings to an existing [[SparkSession]]. This method are used
* both in Scala and Python, so put this under [[SparkSession]] object.
*/
private[sql] def applyModifiableSettings(
session: SparkSession,
options: java.util.HashMap[String, String]): Unit = {
val (staticConfs, otherConfs) =
options.asScala.partition(kv => SQLConf.isStaticConfigKey(kv._1))

otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }

if (staticConfs.nonEmpty) {
logWarning("Using an existing SparkSession; the static sql configurations will not take" +
" effect.")
}
if (otherConfs.nonEmpty) {
logWarning("Using an existing SparkSession; some spark core configurations may not take" +
" effect.")
}
}

/**
* Returns a cloned SparkSession with all specified configurations disabled, or
* the original SparkSession if all configurations are already disabled.
Expand Down