Skip to content

Commit 297ae42

Browse files
committed
[SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalyst
This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf). Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf. N/A Author: Reynold Xin <rxin@databricks.com> Closes apache#17285 from rxin/SPARK-19944.
1 parent a0ce845 commit 297ae42

5 files changed

Lines changed: 154 additions & 131 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala

Lines changed: 0 additions & 66 deletions
This file was deleted.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
import java.util.TimeZone
21+
22+
import org.apache.spark.sql.internal.SQLConf
23+
24+
25+
/**
26+
* A SQLConf that can be used for local testing. This class is only here to minimize the change
27+
* for ticket SPARK-19944 (moves SQLConf from sql/core to sql/catalyst). This class should
28+
* eventually be removed (test cases should just create SQLConf and set values appropriately).
29+
*/
30+
case class SimpleCatalystConf(
31+
override val caseSensitiveAnalysis: Boolean,
32+
override val orderByOrdinal: Boolean = true,
33+
override val groupByOrdinal: Boolean = true,
34+
override val optimizerMaxIterations: Int = 100,
35+
override val optimizerInSetConversionThreshold: Int = 10,
36+
override val maxCaseBranchesForCodegen: Int = 20,
37+
override val tableRelationCacheSize: Int = 1000,
38+
override val runSQLonFile: Boolean = true,
39+
override val crossJoinEnabled: Boolean = false,
40+
override val cboEnabled: Boolean = false,
41+
override val joinReorderEnabled: Boolean = false,
42+
override val joinReorderDPThreshold: Int = 12,
43+
override val warehousePath: String = "/user/hive/warehouse",
44+
override val sessionLocalTimeZone: String = TimeZone.getDefault().getID)
45+
extends SQLConf {
46+
47+
override def clone(): SimpleCatalystConf = this.copy()
48+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.internal.SQLConf
21+
2022
/**
2123
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
2224
* considered an internal API to Spark SQL and are subject to change between minor releases.
@@ -29,4 +31,9 @@ package object catalyst {
2931
*/
3032
protected[sql] object ScalaReflectionLock
3133

34+
/**
35+
* This class is only here to minimize the change for ticket SPARK-19944
36+
* (moves SQLConf from sql/core to sql/catalyst). This class should eventually be removed.
37+
*/
38+
type CatalystConf = SQLConf
3239
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 22 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,11 @@ import scala.collection.JavaConverters._
2424
import scala.collection.immutable
2525

2626
import org.apache.hadoop.fs.Path
27-
import org.apache.parquet.hadoop.ParquetOutputCommitter
2827

2928
import org.apache.spark.internal.Logging
3029
import org.apache.spark.internal.config._
3130
import org.apache.spark.network.util.ByteUnit
32-
import org.apache.spark.sql.catalyst.CatalystConf
33-
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
34-
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
35-
import org.apache.spark.util.Utils
31+
import org.apache.spark.sql.catalyst.analysis.Resolver
3632

3733
////////////////////////////////////////////////////////////////////////////////////////////////////
3834
// This file defines the configuration options for Spark SQL.
@@ -240,7 +236,7 @@ object SQLConf {
240236
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
241237
.internal()
242238
.stringConf
243-
.createWithDefault(classOf[ParquetOutputCommitter].getName)
239+
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
244240

245241
val PARQUET_VECTORIZED_READER_ENABLED =
246242
SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
@@ -406,7 +402,8 @@ object SQLConf {
406402
SQLConfigBuilder("spark.sql.sources.commitProtocolClass")
407403
.internal()
408404
.stringConf
409-
.createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
405+
.createWithDefault(
406+
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
410407

411408
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
412409
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
@@ -552,7 +549,7 @@ object SQLConf {
552549
SQLConfigBuilder("spark.sql.streaming.commitProtocolClass")
553550
.internal()
554551
.stringConf
555-
.createWithDefault(classOf[ManifestFileCommitProtocol].getName)
552+
.createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
556553

557554
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
558555
.internal()
@@ -658,7 +655,7 @@ object SQLConf {
658655
*
659656
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
660657
*/
661-
private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
658+
class SQLConf extends Serializable with Logging {
662659
import SQLConf._
663660

664661
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -761,6 +758,18 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
761758

762759
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
763760

761+
/**
762+
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
763+
* identifiers are equal.
764+
*/
765+
def resolver: Resolver = {
766+
if (caseSensitiveAnalysis) {
767+
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
768+
} else {
769+
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
770+
}
771+
}
772+
764773
def subexpressionEliminationEnabled: Boolean =
765774
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
766775

@@ -818,7 +827,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
818827

819828
def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
820829

821-
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
830+
def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
822831

823832
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
824833

@@ -830,11 +839,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
830839

831840
def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
832841

833-
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
842+
def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
834843

835-
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
844+
def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
836845

837-
override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
846+
def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
838847

839848
def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
840849
/** ********************** SQLConf functionality methods ************ */
@@ -956,55 +965,3 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
956965
settings.clear()
957966
}
958967
}
959-
960-
/**
961-
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
962-
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
963-
*/
964-
object StaticSQLConf {
965-
val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
966-
967-
private def buildConf(key: String): ConfigBuilder = {
968-
ConfigBuilder(key).onCreate { entry =>
969-
globalConfKeys.add(entry.key)
970-
SQLConf.register(entry)
971-
}
972-
}
973-
974-
val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir")
975-
.doc("The default location for managed databases and tables.")
976-
.stringConf
977-
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)
978-
979-
val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
980-
.internal()
981-
.stringConf
982-
.checkValues(Set("hive", "in-memory"))
983-
.createWithDefault("in-memory")
984-
985-
val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase")
986-
.internal()
987-
.stringConf
988-
.createWithDefault("global_temp")
989-
990-
// This is used to control when we will split a schema's JSON string to multiple pieces
991-
// in order to fit the JSON string in metastore's table property (by default, the value has
992-
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
993-
// value of this property). We will split the JSON string of a schema to its length exceeds the
994-
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
995-
// that's why this conf has to be a static SQL conf.
996-
val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
997-
.doc("The maximum length allowed in a single cell when " +
998-
"storing additional schema information in Hive's metastore.")
999-
.internal()
1000-
.intConf
1001-
.createWithDefault(4000)
1002-
1003-
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
1004-
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
1005-
val DEBUG_MODE = buildConf("spark.sql.debug")
1006-
.internal()
1007-
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
1008-
.booleanConf
1009-
.createWithDefault(false)
1010-
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.internal
19+
20+
import org.apache.spark.internal.config.ConfigBuilder
21+
import org.apache.spark.util.Utils
22+
23+
24+
/**
25+
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
26+
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
27+
*//**
28+
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
29+
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
30+
*/
31+
object StaticSQLConf {
32+
val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
33+
34+
private def buildConf(key: String): ConfigBuilder = {
35+
ConfigBuilder(key).onCreate { entry =>
36+
globalConfKeys.add(entry.key)
37+
SQLConf.register(entry)
38+
}
39+
}
40+
41+
val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir")
42+
.doc("The default location for managed databases and tables.")
43+
.stringConf
44+
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)
45+
46+
val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
47+
.internal()
48+
.stringConf
49+
.checkValues(Set("hive", "in-memory"))
50+
.createWithDefault("in-memory")
51+
52+
val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase")
53+
.internal()
54+
.stringConf
55+
.createWithDefault("global_temp")
56+
57+
// This is used to control when we will split a schema's JSON string to multiple pieces
58+
// in order to fit the JSON string in metastore's table property (by default, the value has
59+
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
60+
// value of this property). We will split the JSON string of a schema to its length exceeds the
61+
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
62+
// that's why this conf has to be a static SQL conf.
63+
val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
64+
.doc("The maximum length allowed in a single cell when " +
65+
"storing additional schema information in Hive's metastore.")
66+
.internal()
67+
.intConf
68+
.createWithDefault(4000)
69+
70+
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
71+
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
72+
val DEBUG_MODE = buildConf("spark.sql.debug")
73+
.internal()
74+
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
75+
.booleanConf
76+
.createWithDefault(false)
77+
}

0 commit comments

Comments
 (0)