Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2609,7 +2609,7 @@ test_that("enableHiveSupport on SparkSession", {
unsetHiveContext()
# if we are still here, it must be built with hive
conf <- callJMethod(sparkSession, "conf")
value <- callJMethod(conf, "get", "spark.sql.catalogImplementation", "")
value <- callJMethod(conf, "get", "spark.sql.catalogImplementation")
expect_equal(value, "hive")
})

Expand Down
14 changes: 0 additions & 14 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,6 @@ package object config {
.toSequence
.createWithDefault(Nil)

// Note: This is a SQL config but needs to be in core because the REPL depends on it
private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

// Note: This is a SQL config but needs to be in core because it's cross-session and can not put
// in SQLConf.
private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def getOrCreate(self):
sc._conf.set(key, value)
session = SparkSession(sc)
for key, value in self._options.items():
session.conf.set(key, value)
session._jsparkSession.sessionState().conf().setConfString(key, value)
for key, value in self._options.items():
session.sparkContext._conf.set(key, value)
return session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.io.File
import scala.tools.nsc.GenericRunnerSettings

import org.apache.spark._
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.Utils

object Main extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.Utils

class ReplSuite extends SparkFunSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
Expand Down Expand Up @@ -63,7 +62,7 @@ class SessionCatalog(
conf: CatalystConf) {
this(
externalCatalog,
new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString),
new GlobalTempViewManager("global_temp"),
DummyFunctionResourceLoader,
functionRegistry,
conf,
Expand Down
11 changes: 10 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql

import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}


/**
Expand All @@ -36,6 +36,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def set(key: String, value: String): Unit = {
requireNonStaticConf(key)
sqlConf.setConfString(key, value)
}

Expand All @@ -45,6 +46,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def set(key: String, value: Boolean): Unit = {
requireNonStaticConf(key)
set(key, value.toString)
}

Expand All @@ -54,6 +56,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def set(key: String, value: Long): Unit = {
requireNonStaticConf(key)
set(key, value.toString)
}

Expand Down Expand Up @@ -122,6 +125,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def unset(key: String): Unit = {
requireNonStaticConf(key)
sqlConf.unsetConf(key)
}

Expand All @@ -132,4 +136,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
sqlConf.contains(key)
}

private def requireNonStaticConf(key: String): Unit = {
if (StaticSQLConf.globalConfKeys.contains(key)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $key")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
Expand All @@ -41,6 +40,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
Expand Down Expand Up @@ -791,7 +791,7 @@ object SparkSession {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is change related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, the goal is to forbid users to set/unset global sql conf via public API, but here we do need to set global sql conf with sparkConf, so we should use internal API here instead.

if (options.nonEmpty) {
logWarning("Use an existing SparkSession, some configuration may not take effect.")
}
Expand All @@ -803,7 +803,7 @@ object SparkSession {
// If the current thread does not have an active session, get it from the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Use an existing SparkSession, some configuration may not take effect.")
}
Expand All @@ -829,7 +829,7 @@ object SparkSession {
sc
}
session = new SparkSession(sparkContext)
options.foreach { case (k, v) => session.conf.set(k, v) }
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
defaultSession.set(session)

// Register a successfully instantiated context to the singleton. This should be at the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.r.SerDe
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._

private[sql] object SQLUtils extends Logging {
Expand Down Expand Up @@ -64,7 +64,7 @@ private[sql] object SQLUtils extends Logging {
spark: SparkSession,
sparkConfigMap: JMap[Object, Object]): Unit = {
for ((name, value) <- sparkConfigMap.asScala) {
spark.conf.set(name.toString, value.toString)
spark.sessionState.conf.setConfString(name.toString, value.toString)
}
for ((name, value) <- sparkConfigMap.asScala) {
spark.sparkContext.conf.set(name.toString, value.toString)
Expand Down
56 changes: 39 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object SQLConf {
private val sqlConfEntries = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, ConfigEntry[_]]())

private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
require(!sqlConfEntries.containsKey(entry.key),
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
sqlConfEntries.put(entry.key, entry)
Expand Down Expand Up @@ -326,18 +326,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
val SCHEMA_STRING_LENGTH_THRESHOLD =
SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
.doc("The maximum length allowed in a single cell when " +
"storing additional schema information in Hive's metastore.")
.internal()
.intConf
.createWithDefault(4000)

val PARTITION_COLUMN_TYPE_INFERENCE =
SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
.doc("When true, automatically infer the data types for partitioned columns.")
Expand Down Expand Up @@ -736,10 +724,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)

def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
Expand Down Expand Up @@ -886,3 +870,41 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
}
}

/**
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
*/
object StaticSQLConf {
val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())

private def buildConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate { entry =>
globalConfKeys.add(entry.key)
SQLConf.register(entry)
}
}

val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")

// This is used to control when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
// value of this property). We will split the JSON string of a schema to its length exceeds the
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
// that's why this conf has to be a static SQL conf.
val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
.doc("The maximum length allowed in a single cell when " +
"storing additional schema information in Hive's metastore.")
.internal()
.intConf
.createWithDefault(4000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.util.{MutableURLClassLoader, Utils}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
Expand All @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package org.apache.spark.sql.internal

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}

class SQLConfSuite extends QueryTest with SharedSQLContext {
import testImplicits._

private val testKey = "test.key.0"
private val testVal = "test.val.0"

Expand Down Expand Up @@ -250,4 +253,22 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
}
}

test("global SQL conf comes from SparkConf") {
val newSession = SparkSession.builder()
.config(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000")
.getOrCreate()

assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD.key) == "2000")
checkAnswer(
newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"),
Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000"))
}

test("cannot set/unset global SQL conf") {
val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
assert(e1.message.contains("Cannot modify the value of a static config"))
val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
assert(e2.message.contains("Cannot modify the value of a static config"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.{DataType, StructType}


Expand Down Expand Up @@ -201,11 +202,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
// TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`,
// however the current SQLConf is session isolated, which is not applicable to external
// catalog. We should re-enable this conf instead of hard code the value here, after we have
// global SQLConf.
val threshold = 4000
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = tableDefinition.schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.language.implicitConversions

Expand All @@ -36,11 +35,11 @@ import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.sql._
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
Expand All @@ -40,6 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.{ShutdownHookManager, Utils}

// SPARK-3729: Test key required to check for initialization errors with config.
Expand Down
Loading