From d3934180d56dbc00714c22655ac508c2503bec3f Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Fri, 5 Jul 2019 14:05:02 +0800 Subject: [PATCH 1/5] Use ConfigEntry for hardcoded configs in SQL --- .../KafkaDontFailOnDataLossSuite.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 18 ++++++++++++ .../catalyst/analysis/TypeCoercionSuite.scala | 10 +++---- .../aggregate/HashAggregateExec.scala | 14 ++++----- .../columnar/InMemoryTableScanExec.scala | 3 +- .../spark/sql/AggregateHashMapSuite.scala | 29 ++++++++++--------- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 4 +-- .../org/apache/spark/sql/JoinSuite.scala | 8 ++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 8 ++--- .../columnar/PartitionBatchPruningSuite.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../sql/sources/BucketedWriteSuite.scala | 2 +- .../sources/CreateTableAsSelectSuite.scala | 3 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 8 ++--- .../ContinuousAggregationSuite.scala | 5 ++-- .../execution/AggregationQuerySuite.scala | 2 +- 18 files changed, 70 insertions(+), 54 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala index e089e36eba5f3..ba8340ea59c14 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala @@ -135,7 +135,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe test("failOnDataLoss=false should not return duplicated records: microbatch v1") { withSQLConf( - "spark.sql.streaming.disabledV2MicroBatchReaders" -> + SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key -> classOf[KafkaSourceProvider].getCanonicalName) { verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => val query = df.writeStream.format("memory").queryName(table).start() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index feb3b46df0cd1..94d197bbbb3d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -234,6 +234,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED = + buildConf("spark.sql.inMemoryTableScanStatistics.enable") + .internal() + .doc("When true, enable in-memory table scan accumulators.") + .booleanConf + .createWithDefault(false) + val CACHE_VECTORIZED_READER_ENABLED = buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader") .doc("Enables vectorized reader for columnar caching.") @@ -1024,6 +1031,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ENABLE_VECTORIZED_HASH_MAP = + buildConf("spark.sql.codegen.aggregate.map.vectorized.enable") + .internal() + .doc("Enable vectorized aggregate hash map. This is for testing/benchmarking only.") + .booleanConf + .createWithDefault(false) + val MAX_NESTED_VIEW_DEPTH = buildConf("spark.sql.view.maxNestedViewDepth") .internal() @@ -2109,6 +2123,8 @@ class SQLConf extends Serializable with Logging { def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) + def inMemoryTableScanStatisticsEnabled: Boolean = getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED) + def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED) def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) @@ -2148,6 +2164,8 @@ class SQLConf extends Serializable with Logging { def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) + def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP) + def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala index 2c3ba1b0daf41..a725e4b608a5f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala @@ -1126,14 +1126,14 @@ class TypeCoercionSuite extends AnalysisTest { Concat(Seq(Cast(Literal(new java.sql.Date(0)), StringType), Cast(Literal(new Timestamp(0)), StringType)))) - withSQLConf("spark.sql.function.concatBinaryAsString" -> "true") { + withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "true") { ruleTest(rule, Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))), Concat(Seq(Cast(Literal("123".getBytes), StringType), Cast(Literal("456".getBytes), StringType)))) } - withSQLConf("spark.sql.function.concatBinaryAsString" -> "false") { + withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "false") { ruleTest(rule, Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))), Concat(Seq(Literal("123".getBytes), Literal("456".getBytes)))) @@ -1180,14 +1180,14 @@ class TypeCoercionSuite extends AnalysisTest { Elt(Seq(Literal(2), Cast(Literal(new java.sql.Date(0)), StringType), Cast(Literal(new Timestamp(0)), StringType)))) - withSQLConf("spark.sql.function.eltOutputAsString" -> "true") { + withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "true") { ruleTest(rule, Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))), Elt(Seq(Literal(1), Cast(Literal("123".getBytes), StringType), Cast(Literal("456".getBytes), StringType)))) } - withSQLConf("spark.sql.function.eltOutputAsString" -> "false") { + withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "false") { ruleTest(rule, Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))), Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes)))) @@ -1498,7 +1498,7 @@ class TypeCoercionSuite extends AnalysisTest { DoubleType))) Seq(true, false).foreach { convertToTS => withSQLConf( - "spark.sql.legacy.compareDateTimestampInTimestamp" -> convertToTS.toString) { + SQLConf.COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP.key -> convertToTS.toString) { val date0301 = Literal(java.sql.Date.valueOf("2017-03-01")) val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00")) val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 25ff6584360e6..4a95f76381339 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.MutableColumnarRow +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator import org.apache.spark.util.Utils @@ -559,7 +560,7 @@ case class HashAggregateExec( private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = { if (!checkIfFastHashMapSupported(ctx)) { if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) { - logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but" + logInfo(s"${SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key} is set to true, but" + " current version of codegened fast hashmap does not support this aggregate.") } } else { @@ -567,8 +568,7 @@ case class HashAggregateExec( // This is for testing/benchmarking only. // We enforce to first level to be a vectorized hashmap, instead of the default row-based one. - isVectorizedHashMapEnabled = sqlContext.getConf( - "spark.sql.codegen.aggregate.map.vectorized.enable", "false") == "true" + isVectorizedHashMapEnabled = sqlContext.conf.enableVectorizedHashMap } } @@ -576,12 +576,8 @@ case class HashAggregateExec( val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg") if (sqlContext.conf.enableTwoLevelAggMap) { enableTwoLevelHashMap(ctx) - } else { - sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match { - case "true" => - logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.") - case _ => - } + } else if (sqlContext.conf.enableVectorizedHashMap) { + logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.") } val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 06634c13ec439..bb490c3c692f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -294,8 +294,7 @@ case class InMemoryTableScanExec( } } - lazy val enableAccumulatorsForTest: Boolean = - sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean + lazy val enableAccumulatorsForTest: Boolean = sqlContext.conf.inMemoryTableScanStatisticsEnabled // Accumulators used for testing purposes lazy val readPartitions = sparkContext.longAccumulator diff --git a/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala index 938d76c9f0837..b253c4a70bbf9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala @@ -20,33 +20,34 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter { override protected def sparkConf: SparkConf = super.sparkConf - .set("spark.sql.codegen.fallback", "false") - .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "false") // adding some checking after each test is run, assuring that the configs are not changed // in test code after { - assert(sparkConf.get("spark.sql.codegen.fallback") == "false", + assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false", "configuration parameter changed in test body") - assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "false", + assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "false", "configuration parameter changed in test body") } } class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter { override protected def sparkConf: SparkConf = super.sparkConf - .set("spark.sql.codegen.fallback", "false") - .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true") // adding some checking after each test is run, assuring that the configs are not changed // in test code after { - assert(sparkConf.get("spark.sql.codegen.fallback") == "false", + assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false", "configuration parameter changed in test body") - assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "true", + assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true", "configuration parameter changed in test body") } } @@ -56,18 +57,18 @@ class TwoLevelAggregateHashMapWithVectorizedMapSuite with BeforeAndAfter { override protected def sparkConf: SparkConf = super.sparkConf - .set("spark.sql.codegen.fallback", "false") - .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - .set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true") + .set(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key, "true") // adding some checking after each test is run, assuring that the configs are not changed // in test code after { - assert(sparkConf.get("spark.sql.codegen.fallback") == "false", + assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false", "configuration parameter changed in test body") - assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "true", + assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true", "configuration parameter changed in test body") - assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable") == "true", + assert(sparkConf.get(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key) == "true", "configuration parameter changed in test body") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 98936702a013d..e8ddd4e1fd974 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1672,7 +1672,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("reuse exchange") { - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") { val df = spark.range(100).toDF() val join = df.join(df, "id") val plan = join.queryExecution.executedPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index fffe52d52dec0..89195284a5b5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -671,8 +671,8 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") { Seq(1.0, 0.5).foreach { compressionFactor => - withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, - "spark.sql.autoBroadcastJoinThreshold" -> "250") { + withSQLConf(SQLConf.FILE_COMRESSION_FACTOR.key -> compressionFactor.toString, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "250") { withTempPath { workDir => // the file size is 486 bytes val workDirPath = workDir.getAbsolutePath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 32cddc94166b7..531cc8660b6ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -72,7 +72,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { test("join operator selection") { spark.sharedState.cacheManager.clearCache() - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0", + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", @@ -651,7 +651,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { test("test SortMergeJoin (without spill)") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", - "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> Int.MaxValue.toString) { + SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> Int.MaxValue.toString) { assertNotSpilled(sparkContext, "inner join") { checkAnswer( @@ -708,8 +708,8 @@ class JoinSuite extends QueryTest with SharedSQLContext { test("test SortMergeJoin (with spill)") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", - "spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0", - "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") { + SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0", + SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") { assertSpilled(sparkContext, "inner join") { checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2cc1be9fdda2f..972950669198c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1896,7 +1896,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("Star Expansion - group by") { - withSQLConf("spark.sql.retainGroupColumns" -> "false") { + withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { checkAnswer( testData2.groupBy($"a", $"b").agg($"*"), sql("SELECT * FROM testData2 group by a, b")) @@ -1936,7 +1936,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("Common subexpression elimination") { // TODO: support subexpression elimination in whole stage codegen - withSQLConf("spark.sql.codegen.wholeStage" -> "false") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { // select from a table to prevent constant folding. val df = sql("SELECT a, b from testData2 limit 1") checkAnswer(df, Row(1, 1)) @@ -1985,9 +1985,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) // Try disabling it via configuration. - spark.conf.set("spark.sql.subexpressionElimination.enabled", "false") + spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) - spark.conf.set("spark.sql.subexpressionElimination.enabled", "true") + spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "true") verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala index b3a5c687f775d..e74099202a1df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala @@ -50,7 +50,7 @@ class PartitionBatchPruningSuite // Enable in-memory partition pruning spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, true) // Enable in-memory table scan accumulators - spark.conf.set("spark.sql.inMemoryTableScanStatistics.enable", "true") + spark.conf.set(SQLConf.IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED.key, "true") } override protected def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5f27e75addcff..a28f4e49ffe1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1322,7 +1322,7 @@ class JDBCSuite extends QueryTest testJdbcParitionColumn("THEID", "THEID") testJdbcParitionColumn("\"THEID\"", "THEID") - withSQLConf("spark.sql.caseSensitive" -> "false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { testJdbcParitionColumn("ThEiD", "THEID") } testJdbcParitionColumn("THE ID", "THE ID") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index fc61050dc7458..75f68dea96bf0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -63,7 +63,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog - withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) { + withSQLConf(SQLConf.BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) { // within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index d46029e84433c..ab3494c2253da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -252,7 +253,7 @@ class CreateTableAsSelectSuite val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog - withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) { + withSQLConf(SQLConf.BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) { // Within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 2b8d77386925f..72f893845172d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1310,7 +1310,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val start = startId.map(new FileStreamSourceOffset(_)) val end = FileStreamSourceOffset(endId) - withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false") { assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index a5cb25c49b869..e6b56e5f46f89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -413,9 +413,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi sources.nonEmpty } // Disabled by default - assert(spark.conf.get("spark.sql.streaming.metricsEnabled").toBoolean === false) + assert(spark.conf.get(SQLConf.STREAMING_METRICS_ENABLED.key).toBoolean === false) - withSQLConf("spark.sql.streaming.metricsEnabled" -> "false") { + withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "false") { testStream(inputData.toDF)( AssertOnQuery { q => !isMetricsRegistered(q) }, StopStream, @@ -424,7 +424,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } // Registered when enabled - withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") { + withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "true") { testStream(inputData.toDF)( AssertOnQuery { q => isMetricsRegistered(q) }, StopStream, @@ -434,7 +434,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } test("SPARK-22975: MetricsReporter defaults when there was no progress reported") { - withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") { + withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "true") { BlockingSource.latch = new CountDownLatch(1) withTempDir { tempDir => val sq = spark.readStream diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala index c5b95fa9b64a9..62d4af9575f38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming.continuous import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode class ContinuousAggregationSuite extends ContinuousSuiteBase { @@ -36,7 +37,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase { } test("basic") { - withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) { + withSQLConf((SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { val input = ContinuousMemoryStream.singlePartition[Int] testStream(input.toDF().agg(max('value)), OutputMode.Complete)( @@ -112,7 +113,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase { } test("repeated restart") { - withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) { + withSQLConf((SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { val input = ContinuousMemoryStream.singlePartition[Int] testStream(input.toDF().agg(max('value)), OutputMode.Complete)( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index ecd428780c671..d06cc1c0a88ac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -1028,7 +1028,7 @@ class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySu override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { Seq("true", "false").foreach { enableTwoLevelMaps => - withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enabled" -> + withSQLConf(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> enableTwoLevelMaps) { (1 to 3).foreach { fallbackStartsAt => withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> From c9a9ad76c768b77a482410933788f14e9af4ce15 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sat, 6 Jul 2019 14:07:36 +0800 Subject: [PATCH 2/5] replace more config --- .../org/apache/spark/sql/avro/AvroSuite.scala | 2 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../apache/spark/sql/RuntimeConfigSuite.scala | 5 ++-- .../sql/SparkSessionExtensionSuite.scala | 9 +++--- .../spark/sql/execution/SQLViewSuite.scala | 7 +++-- .../execution/SQLWindowFunctionSuite.scala | 5 ++-- .../arrow/ArrowConvertersSuite.scala | 4 +-- .../benchmark/AggregateBenchmark.scala | 20 ++++++------- .../BuiltInDataSourceWriteBenchmark.scala | 6 ++-- .../columnar/InMemoryColumnarQuerySuite.scala | 2 +- .../command/PlanResolutionSuite.scala | 3 +- .../datasources/FileSourceStrategySuite.scala | 4 +-- .../execution/datasources/csv/CSVSuite.scala | 4 +-- .../datasources/json/JsonSuite.scala | 4 +-- .../datasources/parquet/ParquetIOSuite.scala | 4 +-- .../ParquetPartitionDiscoverySuite.scala | 2 +- .../execution/metric/SQLMetricsSuite.scala | 6 ++-- .../metric/SQLMetricsTestUtils.scala | 3 +- .../state/StateStoreCoordinatorSuite.scala | 3 +- .../streaming/state/StateStoreSuite.scala | 2 +- .../ui/SQLAppStatusListenerSuite.scala | 3 +- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 5 ++-- .../streaming/EventTimeWatermarkSuite.scala | 2 +- .../spark/sql/streaming/StreamSuite.scala | 6 ++-- .../StreamingQueryListenersConfSuite.scala | 3 +- .../continuous/ContinuousSuite.scala | 3 +- .../sources/StreamingDataSourceV2Suite.scala | 2 +- .../test/DataStreamReaderWriterSuite.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 2 +- .../HiveThriftServer2Suites.scala | 11 +++---- .../HiveExternalCatalogVersionsSuite.scala | 23 ++++++++------- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 29 ++++++++++--------- .../apache/spark/sql/hive/test/TestHive.scala | 9 +++--- 33 files changed, 108 insertions(+), 89 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 40bf3b1530fbb..924bf374c7370 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -50,7 +50,7 @@ abstract class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUti override protected def beforeAll(): Unit = { super.beforeAll() - spark.conf.set("spark.sql.files.maxPartitionBytes", 1024) + spark.conf.set(SQLConf.FILES_MAX_PARTITION_BYTES.key, 1024) } def checkReloadMatchesSaved(originalFile: String, newFile: String): Unit = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 3d14ebe267c49..bb9b3696fe8f6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1066,7 +1066,7 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { override def beforeAll(): Unit = { super.beforeAll() spark.conf.set( - "spark.sql.streaming.disabledV2MicroBatchReaders", + SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key, classOf[KafkaSourceProvider].getCanonicalName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala index 3284231606966..d8d0211e553b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} class RuntimeConfigSuite extends SparkFunSuite { @@ -60,8 +61,8 @@ class RuntimeConfigSuite extends SparkFunSuite { val conf = newConf() // SQL configs - assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold")) - assert(conf.isModifiable("spark.sql.streaming.checkpointLocation")) + assert(!conf.isModifiable(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key)) + assert(conf.isModifiable(SQLConf.CHECKPOINT_LOCATION.key)) // Core configs assert(!conf.isModifiable(config.CPUS_PER_TASK.key)) assert(!conf.isModifiable("spark.executor.cores")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 2e2e61b438963..542ece5796803 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, Metadata, StructType} import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch, ColumnarMap, ColumnVector} import org.apache.spark.unsafe.types.UTF8String @@ -152,7 +153,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("use custom class for extensions") { val session = SparkSession.builder() .master("local[1]") - .config("spark.sql.extensions", classOf[MyExtensions].getCanonicalName) + .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, classOf[MyExtensions].getCanonicalName) .getOrCreate() try { assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session))) @@ -173,7 +174,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("use multiple custom class for extensions in the specified order") { val session = SparkSession.builder() .master("local[1]") - .config("spark.sql.extensions", Seq( + .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions2].getCanonicalName, classOf[MyExtensions].getCanonicalName).mkString(",")) .getOrCreate() @@ -201,7 +202,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("allow an extension to be duplicated") { val session = SparkSession.builder() .master("local[1]") - .config("spark.sql.extensions", Seq( + .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions].getCanonicalName, classOf[MyExtensions].getCanonicalName).mkString(",")) .getOrCreate() @@ -228,7 +229,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("use the last registered function name when there are duplicates") { val session = SparkSession.builder() .master("local[1]") - .config("spark.sql.extensions", Seq( + .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions2].getCanonicalName, classOf[MyExtensions2Duplicate].getCanonicalName).mkString(",")) .getOrCreate() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 8269d4d3a285d..e503bc8698f25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext @@ -665,17 +666,17 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { sql(s"CREATE VIEW view${idx + 1} AS SELECT * FROM view$idx") } - withSQLConf("spark.sql.view.maxNestedViewDepth" -> "10") { + withSQLConf(SQLConf.MAX_NESTED_VIEW_DEPTH.key -> "10") { val e = intercept[AnalysisException] { sql("SELECT * FROM view10") }.getMessage assert(e.contains("The depth of view `default`.`view0` exceeds the maximum view " + "resolution depth (10). Analysis is aborted to avoid errors. Increase the value " + - "of spark.sql.view.maxNestedViewDepth to work around this.")) + s"of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work around this.")) } val e = intercept[IllegalArgumentException] { - withSQLConf("spark.sql.view.maxNestedViewDepth" -> "0") {} + withSQLConf(SQLConf.MAX_NESTED_VIEW_DEPTH.key -> "0") {} }.getMessage assert(e.contains("The maximum depth of a view reference in a nested view must be " + "positive.")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index 1c6fc3530cbe1..3547c7dac84ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.TestUtils.assertSpilled import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext case class WindowData(month: Int, area: String, product: Int) @@ -477,8 +478,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT RoW) """.stripMargin) - withSQLConf("spark.sql.windowExec.buffer.in.memory.threshold" -> "1", - "spark.sql.windowExec.buffer.spill.threshold" -> "2") { + withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") { assertSpilled(sparkContext, "test with low buffer spill threshold") { checkAnswer(actual, expected) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 86874b9817c20..67c3fa0d3bf59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1191,7 +1191,7 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { test("max records in batch conf") { val totalRecords = 10 val maxRecordsPerBatch = 3 - spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", maxRecordsPerBatch) + spark.conf.set(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key, maxRecordsPerBatch) val df = spark.sparkContext.parallelize(1 to totalRecords, 2).toDF("i") val arrowBatches = df.toArrowBatchRdd.collect() assert(arrowBatches.length >= 4) @@ -1206,7 +1206,7 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } assert(recordCount == totalRecords) allocator.close() - spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch") + spark.conf.unset(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key) } testQuietly("unsupported types") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 81158d9e54246..2776bc310fefe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -83,7 +83,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") { f() } } @@ -92,7 +92,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") { f() } } @@ -119,7 +119,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") { f() } } @@ -128,7 +128,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") { f() } } @@ -154,7 +154,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") { f() } } @@ -163,7 +163,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") { f() } } @@ -189,7 +189,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") { f() } } @@ -198,7 +198,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") { f() } } @@ -234,7 +234,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") { f() } } @@ -243,7 +243,7 @@ object AggregateBenchmark extends SqlBasedBenchmark { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", - "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") { f() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala index cd97324c997f5..6925bdd72674f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.benchmark +import org.apache.spark.sql.internal.SQLConf + /** * Benchmark to measure built-in data sources write performance. * To run this benchmark: @@ -45,8 +47,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { mainArgs } - spark.conf.set("spark.sql.parquet.compression.codec", "snappy") - spark.conf.set("spark.sql.orc.compression.codec", "snappy") + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") formats.foreach { format => runBenchmark(s"$format writer benchmark") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index d31e49cf8cd4c..008eaa2416f68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -437,7 +437,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-20356: pruned InMemoryTableScanExec should have correct ordering and partitioning") { - withSQLConf("spark.sql.shuffle.partitions" -> "200") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") { val df1 = Seq(("a", 1), ("b", 1), ("c", 2)).toDF("item", "group") val df2 = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("item", "id") val df3 = df1.join(df2, Seq("item")).select($"id", $"group".as("item")).distinct() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7df0dabd67f86..293fe56e3a8f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -77,7 +78,7 @@ class PlanResolutionSuite extends AnalysisTest { def parseAndResolve(query: String, withDefault: Boolean = false): LogicalPlan = { val newConf = conf.copy() - newConf.setConfString("spark.sql.default.catalog", "testcat") + newConf.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, "testcat") DataSourceResolution(newConf, if (withDefault) lookupWithDefault else lookupWithoutDefault) .apply(parsePlan(query)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index af524c7ca0255..eaff5a2352a0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -201,7 +201,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } test("partitioned table - case insensitive") { - withSQLConf("spark.sql.caseSensitive" -> "false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { val table = createTable( files = Seq( @@ -437,7 +437,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } test("[SPARK-16818] exchange reuse respects differences in partition pruning") { - spark.conf.set("spark.sql.exchange.reuse", true) + spark.conf.set(SQLConf.EXCHANGE_REUSE_ENABLED.key, true) withTempPath { path => val tempDir = path.getCanonicalPath spark.range(10) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2e7d682a3bbca..fdb50a6dd929c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1399,8 +1399,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // that whole test file is mapped to only one partition. This will guarantee // reliable sampling of the input file. withSQLConf( - "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString, - "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString + SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString, + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString )(withTempPath { path => val ds = sampledTestData.coalesce(1) ds.write.text(path.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6316e89537ca1..34b44be576897 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2041,8 +2041,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // that whole test file is mapped to only one partition. This will guarantee // reliable sampling of the input file. withSQLConf( - "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString, - "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString + SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString, + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString )(withTempPath { path => val ds = sampledTestData.coalesce(1) ds.write.text(path.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 6b05b9c0f7207..6f2218ba82dc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -475,7 +475,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { val extraOptions = Map( SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, - "spark.sql.parquet.output.committer.class" -> + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName ) withTempPath { dir => @@ -505,7 +505,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Using a output committer that always fail when committing a task, so that both // `commitTask()` and `abortTask()` are invoked. val extraOptions = Map[String, String]( - "spark.sql.parquet.output.committer.class" -> + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 6f3ed3d85e937..04ace0a236e6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -953,7 +953,7 @@ abstract class ParquetPartitionDiscoverySuite withSQLConf( ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL", - "spark.sql.sources.commitProtocolClass" -> + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { spark.range(3).write.parquet(s"$path/p0=0/p1=0") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a8d230870aeb2..ad13a80f90acf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -277,9 +277,9 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("ShuffledHashJoin metrics") { - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40", - "spark.sql.shuffle.partitions" -> "2", - "spark.sql.join.preferSortMergeJoin" -> "false") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index f12eeaa580642..dc58bfb7b24b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -154,7 +155,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { expectedNodeIds: Set[Long], enableWholeStage: Boolean = false): Option[Map[Long, (String, Map[String, Any])]] = { val previousExecutionIds = currentExecutionIds() - withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage.toString) { df.collect() } sparkContext.listenerBus.waitUntilEmpty(10000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala index 2a1e7d615e5e3..81758b8ec31fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.functions.count +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { @@ -124,7 +125,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { import spark.implicits._ coordRef = spark.streams.stateStoreCoordinator implicit val sqlContext = spark.sqlContext - spark.conf.set("spark.sql.shuffle.partitions", "1") + spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1") // Start a query and run a batch to load state stores val inputData = MemoryStream[Int] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index af4369de800ec..a84d107f2cbc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -569,7 +569,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val spark = SparkSession.builder().master("local[2]").getOrCreate() SparkSession.setActiveSession(spark) implicit val sqlContext = spark.sqlContext - spark.conf.set("spark.sql.shuffle.partitions", "1") + spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1") import spark.implicits._ val inputData = MemoryStream[Int] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index e3e5ddff96378..c0a821adfd868 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.status.ElementTrackingStore @@ -647,7 +648,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { .setMaster("local") .setAppName("test") .set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this test quickly - .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly + .set(StaticSQLConf.UI_RETAINED_EXECUTIONS.key, "50") // Set it to 50 to run this test quickly .set(ASYNC_TRACKING_ENABLED, false) withSpark(new SparkContext(conf)) { sc => quietly { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 01752125ac265..88827a6bf1a0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} @@ -140,7 +141,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn test("CreateTable: use default catalog for v2 sources when default catalog is set") { val sparkSession = spark.newSession() sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - sparkSession.conf.set("spark.sql.default.catalog", "testcat") + sparkSession.conf.set(DEFAULT_V2_CATALOG.key, "testcat") sparkSession.sql(s"CREATE TABLE table_name (id bigint, data string) USING foo") val testCatalog = sparkSession.catalog("testcat").asTableCatalog @@ -255,7 +256,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn test("CreateTableAsSelect: use default catalog for v2 sources when default catalog is set") { val sparkSession = spark.newSession() sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - sparkSession.conf.set("spark.sql.default.catalog", "testcat") + sparkSession.conf.set(DEFAULT_V2_CATALOG.key, "testcat") val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 4bf49ff4d5c61..92ec2a0c172ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -305,7 +305,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche test("update mode") { val inputData = MemoryStream[Int] - spark.conf.set("spark.sql.shuffle.partitions", "10") + spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "10") val windowedAggregation = inputData.toDF() .withColumn("eventTime", $"value".cast("timestamp")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f2f5fad59eb2a..1ed2599444c5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -871,7 +871,7 @@ class StreamSuite extends StreamTest { testQuietly("specify custom state store provider") { val providerClassName = classOf[TestStateStoreProvider].getCanonicalName - withSQLConf("spark.sql.streaming.stateStore.providerClass" -> providerClassName) { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName) { val input = MemoryStream[Int] val df = input.toDS().groupBy().count() val query = df.writeStream.outputMode("complete").format("memory").queryName("name").start() @@ -888,9 +888,9 @@ class StreamSuite extends StreamTest { testQuietly("custom state store provider read from offset log") { val input = MemoryStream[Int] val df = input.toDS().groupBy().count() - val providerConf1 = "spark.sql.streaming.stateStore.providerClass" -> + val providerConf1 = SQLConf.STATE_STORE_PROVIDER_CLASS.key -> "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider" - val providerConf2 = "spark.sql.streaming.stateStore.providerClass" -> + val providerConf2 = SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[TestStateStoreProvider].getCanonicalName def runQuery(queryName: String, checkpointLoc: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala index 88f510c726faa..530b833a60bfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.streaming.StreamingQueryListener._ @@ -29,7 +30,7 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { import testImplicits._ override protected def sparkConf: SparkConf = - super.sparkConf.set("spark.sql.streaming.streamingQueryListeners", + super.sparkConf.set(StaticSQLConf.STREAMING_QUERY_LISTENERS.key, "org.apache.spark.sql.streaming.TestListener") test("test if the configured query lister is loaded") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 9840c7f066780..45b359a2b40ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.test.TestSparkSession @@ -307,7 +308,7 @@ class ContinuousMetaSuite extends ContinuousSuiteBase { "local[10]", "continuous-stream-test-sql-context", sparkConf.set("spark.sql.testkey", "true") - .set("spark.sql.streaming.minBatchesToRetain", "2"))) + .set(SQLConf.MIN_BATCHES_TO_RETAIN.key, "2"))) test("SPARK-24351: check offsetLog/commitLog retained in the checkpoint directory") { withTempDir { checkpointDir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 7b2c1a56e8baa..79016b5bdb527 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -242,7 +242,7 @@ class StreamingDataSourceV2Suite extends StreamTest { override def beforeAll(): Unit = { super.beforeAll() val fakeCheckpoint = Utils.createTempDir() - spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath) + spark.conf.set(SQLConf.CHECKPOINT_LOCATION.key, fakeCheckpoint.getCanonicalPath) } override def afterEach(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 8fb1400a9b5a7..c630f1497a17e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -203,7 +203,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .stop() assert(LastOptions.partitionColumns == Seq("a")) - withSQLConf("spark.sql.caseSensitive" -> "false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { df.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e9ab62800f84f..126e23e6e5926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -409,7 +409,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be test("write path implements onTaskCommit API correctly") { withSQLConf( - "spark.sql.sources.commitProtocolClass" -> + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[MessageCapturingCommitProtocol].getCanonicalName) { withTempDir { dir => val path = dir.getCanonicalPath diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index b06856b054795..bef53f97762b3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -44,6 +44,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.HiveTestUtils +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} @@ -536,9 +537,9 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } if (HiveUtils.isHive23) { - assert(conf.get("spark.sql.hive.version") === Some("2.3.5")) + assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.5")) } else { - assert(conf.get("spark.sql.hive.version") === Some("1.2.1")) + assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1")) } } } @@ -553,9 +554,9 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } if (HiveUtils.isHive23) { - assert(conf.get("spark.sql.hive.version") === Some("2.3.5")) + assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.5")) } else { - assert(conf.get("spark.sql.hive.version") === Some("1.2.1")) + assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1")) } } } @@ -659,7 +660,7 @@ class SingleSessionSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.binary override protected def extraConf: Seq[String] = - "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil + s"--conf ${StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION.key}=true" :: Nil test("share the temporary functions across JDBC connections") { withMultipleConnectionJdbcStatement()( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 4351dc7036846..4789fbe5c6bae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -27,9 +27,12 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.spark.{SecurityManager, SparkConf, TestUtils} +import org.apache.spark.internal.config +import org.apache.spark.internal.config.UI import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils @@ -184,11 +187,11 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { val args = Seq( "--name", "prepare testing tables", "--master", "local[2]", - "--conf", "spark.ui.enabled=false", - "--conf", "spark.master.rest.enabled=false", - "--conf", "spark.sql.hive.metastore.version=1.2.1", - "--conf", "spark.sql.hive.metastore.jars=maven", - "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}", + "--conf", s"${UI.UI_ENABLED.key}=false", + "--conf", s"${config.MASTER_REST_SERVER_ENABLED}=false", + "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1", + "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven", + "--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}", "--conf", s"spark.sql.test.version.index=$index", "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", tempPyFile.getCanonicalPath) @@ -203,11 +206,11 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"), "--name", "HiveExternalCatalog backward compatibility test", "--master", "local[2]", - "--conf", "spark.ui.enabled=false", - "--conf", "spark.master.rest.enabled=false", - "--conf", "spark.sql.hive.metastore.version=1.2.1", - "--conf", "spark.sql.hive.metastore.jars=maven", - "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}", + "--conf", s"${UI.UI_ENABLED.key}=false", + "--conf", s"${config.MASTER_REST_SERVER_ENABLED}=false", + "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1", + "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven", + "--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}", "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", unusedJar.toString) runSparkSubmit(args) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 0ff22150658b9..4cc0cfc9a0e8c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHiveContext} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -338,10 +339,10 @@ object SetMetastoreURLTest extends Logging { val builder = SparkSession.builder() .config(sparkConf) .config(UI_ENABLED.key, "false") - .config("spark.sql.hive.metastore.version", "0.13.1") + .config(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1") // The issue described in SPARK-16901 only appear when // spark.sql.hive.metastore.jars is not set to builtin. - .config("spark.sql.hive.metastore.jars", "maven") + .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven") .enableHiveSupport() val spark = builder.getOrCreate() @@ -392,16 +393,16 @@ object SetWarehouseLocationTest extends Logging { // We are expecting that the value of spark.sql.warehouse.dir will override the // value of hive.metastore.warehouse.dir. val session = new TestHiveContext(new SparkContext(sparkConf - .set("spark.sql.warehouse.dir", warehouseLocation.toString) + .set(StaticSQLConf.WAREHOUSE_PATH.key, warehouseLocation.toString) .set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString))) .sparkSession (session, warehouseLocation.toString) } - if (sparkSession.conf.get("spark.sql.warehouse.dir") != expectedWarehouseLocation) { + if (sparkSession.conf.get(StaticSQLConf.WAREHOUSE_PATH.key) != expectedWarehouseLocation) { throw new Exception( - "spark.sql.warehouse.dir is not set to the expected warehouse location " + + s"${StaticSQLConf.WAREHOUSE_PATH.key} is not set to the expected warehouse location " + s"$expectedWarehouseLocation.") } @@ -564,7 +565,7 @@ object SparkSubmitClassLoaderTest extends Logging { val conf = new SparkConf() val hiveWarehouseLocation = Utils.createTempDir() conf.set(UI_ENABLED, false) - conf.set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString) + conf.set(StaticSQLConf.WAREHOUSE_PATH.key, hiveWarehouseLocation.toString) val sc = new SparkContext(conf) val hiveContext = new TestHiveContext(sc) val df = hiveContext.createDataFrame((1 to 100).map(i => (i, i))).toDF("i", "j") @@ -642,14 +643,14 @@ object SparkSQLConfTest extends Logging { val conf = new SparkConf() { override def getAll: Array[(String, String)] = { def isMetastoreSetting(conf: String): Boolean = { - conf == "spark.sql.hive.metastore.version" || conf == "spark.sql.hive.metastore.jars" + conf == HiveUtils.HIVE_METASTORE_VERSION.key || conf == HiveUtils.HIVE_METASTORE_JARS.key } // If there is any metastore settings, remove them. val filteredSettings = super.getAll.filterNot(e => isMetastoreSetting(e._1)) // Always add these two metastore settings at the beginning. - ("spark.sql.hive.metastore.version" -> "0.12") +: - ("spark.sql.hive.metastore.jars" -> "maven") +: + (HiveUtils.HIVE_METASTORE_VERSION.key -> "0.12") +: + (HiveUtils.HIVE_METASTORE_JARS.key -> "maven") +: filteredSettings } @@ -676,10 +677,10 @@ object SPARK_9757 extends QueryTest { val hiveWarehouseLocation = Utils.createTempDir() val sparkContext = new SparkContext( new SparkConf() - .set("spark.sql.hive.metastore.version", "0.13.1") - .set("spark.sql.hive.metastore.jars", "maven") + .set(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1") + .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") .set(UI_ENABLED, false) - .set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString)) + .set(StaticSQLConf.WAREHOUSE_PATH.key, hiveWarehouseLocation.toString)) val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession @@ -725,7 +726,7 @@ object SPARK_11009 extends QueryTest { val sparkContext = new SparkContext( new SparkConf() .set(UI_ENABLED, false) - .set("spark.sql.shuffle.partitions", "100")) + .set(SQLConf.SHUFFLE_PARTITIONS.key, "100")) val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession @@ -756,7 +757,7 @@ object SPARK_14244 extends QueryTest { val sparkContext = new SparkContext( new SparkConf() .set(UI_ENABLED, false) - .set("spark.sql.shuffle.partitions", "100")) + .set(SQLConf.SHUFFLE_PARTITIONS.key, "100")) val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 8cdb8dd84fb2e..30ea0c84c7acc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, WithTestConf} +import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -57,9 +57,9 @@ object TestHive new SparkConf() .set("spark.sql.test", "") .set(SQLConf.CODEGEN_FALLBACK.key, "false") - .set("spark.sql.hive.metastore.barrierPrefixes", + .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, "org.apache.spark.sql.hive.execution.PairSerDe") - .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) + .set(StaticSQLConf.WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) // SPARK-8910 .set(UI_ENABLED, false) .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) @@ -534,7 +534,8 @@ private[hive] class TestHiveSparkSession( } // Clean out the Hive warehouse between each suite - val warehouseDir = new File(new URI(sparkContext.conf.get("spark.sql.warehouse.dir")).getPath) + val warehouseDir = new File(new URI( + sparkContext.conf.get(StaticSQLConf.WAREHOUSE_PATH.key)).getPath) Utils.deleteRecursively(warehouseDir) warehouseDir.mkdir() From 89da7a99b1e42c2dd4fc17a6902dd26b22618580 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 11 Jul 2019 15:07:38 +0800 Subject: [PATCH 3/5] update --- .../spark/sql/SparkSessionExtensionSuite.scala | 10 +++++----- .../spark/sql/execution/SQLViewSuite.scala | 8 ++++---- .../state/StateStoreCoordinatorSuite.scala | 4 ++-- .../ui/SQLAppStatusListenerSuite.scala | 3 +-- .../sql/sources/CreateTableAsSelectSuite.scala | 4 ++-- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 10 +++++----- .../StreamingQueryListenersConfSuite.scala | 4 ++-- .../ContinuousAggregationSuite.scala | 6 +++--- .../streaming/continuous/ContinuousSuite.scala | 5 ++--- .../thriftserver/HiveThriftServer2Suites.scala | 4 ++-- .../HiveExternalCatalogVersionsSuite.scala | 18 +++++++++--------- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 17 +++++++++-------- .../apache/spark/sql/hive/test/TestHive.scala | 8 ++++---- 13 files changed, 50 insertions(+), 51 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 542ece5796803..74341f93dd5ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, Metadata, StructType} import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch, ColumnarMap, ColumnVector} import org.apache.spark.unsafe.types.UTF8String @@ -153,7 +153,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("use custom class for extensions") { val session = SparkSession.builder() .master("local[1]") - .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, classOf[MyExtensions].getCanonicalName) + .config(SPARK_SESSION_EXTENSIONS.key, classOf[MyExtensions].getCanonicalName) .getOrCreate() try { assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session))) @@ -174,7 +174,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("use multiple custom class for extensions in the specified order") { val session = SparkSession.builder() .master("local[1]") - .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, Seq( + .config(SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions2].getCanonicalName, classOf[MyExtensions].getCanonicalName).mkString(",")) .getOrCreate() @@ -202,7 +202,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("allow an extension to be duplicated") { val session = SparkSession.builder() .master("local[1]") - .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, Seq( + .config(SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions].getCanonicalName, classOf[MyExtensions].getCanonicalName).mkString(",")) .getOrCreate() @@ -229,7 +229,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { test("use the last registered function name when there are duplicates") { val session = SparkSession.builder() .master("local[1]") - .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, Seq( + .config(SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions2].getCanonicalName, classOf[MyExtensions2Duplicate].getCanonicalName).mkString(",")) .getOrCreate() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index e503bc8698f25..64e305cd5c371 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.MAX_NESTED_VIEW_DEPTH import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext @@ -666,17 +666,17 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { sql(s"CREATE VIEW view${idx + 1} AS SELECT * FROM view$idx") } - withSQLConf(SQLConf.MAX_NESTED_VIEW_DEPTH.key -> "10") { + withSQLConf(MAX_NESTED_VIEW_DEPTH.key -> "10") { val e = intercept[AnalysisException] { sql("SELECT * FROM view10") }.getMessage assert(e.contains("The depth of view `default`.`view0` exceeds the maximum view " + "resolution depth (10). Analysis is aborted to avoid errors. Increase the value " + - s"of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work around this.")) + s"of ${MAX_NESTED_VIEW_DEPTH.key} to work around this.")) } val e = intercept[IllegalArgumentException] { - withSQLConf(SQLConf.MAX_NESTED_VIEW_DEPTH.key -> "0") {} + withSQLConf(MAX_NESTED_VIEW_DEPTH.key -> "0") {} }.getMessage assert(e.contains("The maximum depth of a view reference in a nested view must be " + "positive.")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala index 81758b8ec31fa..7bca225dfdd8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.functions.count -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS import org.apache.spark.util.Utils class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { @@ -125,7 +125,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { import spark.implicits._ coordRef = spark.streams.stateStoreCoordinator implicit val sqlContext = spark.sqlContext - spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1") + spark.conf.set(SHUFFLE_PARTITIONS.key, "1") // Start a query and run a batch to load state stores val inputData = MemoryStream[Int] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index c0a821adfd868..8edbb87706716 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.status.ElementTrackingStore @@ -648,7 +647,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { .setMaster("local") .setAppName("test") .set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this test quickly - .set(StaticSQLConf.UI_RETAINED_EXECUTIONS.key, "50") // Set it to 50 to run this test quickly + .set(UI_RETAINED_EXECUTIONS.key, "50") // Set it to 50 to run this test quickly .set(ASYNC_TRACKING_ENABLED, false) withSpark(new SparkContext(conf)) { sc => quietly { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index ab3494c2253da..5f9856656ac3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.BUCKETING_MAX_BUCKETS import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -253,7 +253,7 @@ class CreateTableAsSelectSuite val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog - withSQLConf(SQLConf.BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) { + withSQLConf(BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) { // Within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 88827a6bf1a0a..1a58516d18a78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 -import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} @@ -39,7 +39,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) - spark.conf.set("spark.sql.catalog.session", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set(V2_SESSION_CATALOG.key, classOf[TestInMemoryTableCatalog].getName) val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") @@ -141,7 +141,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn test("CreateTable: use default catalog for v2 sources when default catalog is set") { val sparkSession = spark.newSession() sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - sparkSession.conf.set(DEFAULT_V2_CATALOG.key, "testcat") + sparkSession.conf.set("spark.sql.default.catalog", "testcat") sparkSession.sql(s"CREATE TABLE table_name (id bigint, data string) USING foo") val testCatalog = sparkSession.catalog("testcat").asTableCatalog @@ -256,7 +256,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn test("CreateTableAsSelect: use default catalog for v2 sources when default catalog is set") { val sparkSession = spark.newSession() sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - sparkSession.conf.set(DEFAULT_V2_CATALOG.key, "testcat") + sparkSession.conf.set("spark.sql.default.catalog", "testcat") val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") @@ -281,7 +281,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn test("CreateTableAsSelect: v2 session catalog can load v1 source table") { val sparkSession = spark.newSession() - sparkSession.conf.set("spark.sql.catalog.session", classOf[V2SessionCatalog].getName) + sparkSession.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName) val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala index 530b833a60bfc..da2f221aaf101 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS import org.apache.spark.sql.streaming.StreamingQueryListener._ @@ -30,7 +30,7 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { import testImplicits._ override protected def sparkConf: SparkConf = - super.sparkConf.set(StaticSQLConf.STREAMING_QUERY_LISTENERS.key, + super.sparkConf.set(STREAMING_QUERY_LISTENERS.key, "org.apache.spark.sql.streaming.TestListener") test("test if the configured query lister is loaded") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala index 62d4af9575f38..3ec4750c59fc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming.continuous import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED import org.apache.spark.sql.streaming.OutputMode class ContinuousAggregationSuite extends ContinuousSuiteBase { @@ -37,7 +37,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase { } test("basic") { - withSQLConf((SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { + withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { val input = ContinuousMemoryStream.singlePartition[Int] testStream(input.toDF().agg(max('value)), OutputMode.Complete)( @@ -113,7 +113,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase { } test("repeated restart") { - withSQLConf((SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { + withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { val input = ContinuousMemoryStream.singlePartition[Int] testStream(input.toDF().agg(max('value)), OutputMode.Complete)( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 45b359a2b40ea..dca452062c70d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -25,8 +25,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE +import org.apache.spark.sql.internal.SQLConf.{CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, MIN_BATCHES_TO_RETAIN} import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.test.TestSparkSession @@ -308,7 +307,7 @@ class ContinuousMetaSuite extends ContinuousSuiteBase { "local[10]", "continuous-stream-test-sql-context", sparkConf.set("spark.sql.testkey", "true") - .set(SQLConf.MIN_BATCHES_TO_RETAIN.key, "2"))) + .set(MIN_BATCHES_TO_RETAIN.key, "2"))) test("SPARK-24351: check offsetLog/commitLog retained in the checkpoint directory") { withTempDir { checkpointDir => diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index bef53f97762b3..dd18add53fde8 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -44,7 +44,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.HiveTestUtils -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} @@ -660,7 +660,7 @@ class SingleSessionSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.binary override protected def extraConf: Seq[String] = - s"--conf ${StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION.key}=true" :: Nil + s"--conf ${HIVE_THRIFT_SERVER_SINGLESESSION.key}=true" :: Nil test("share the temporary functions across JDBC connections") { withMultipleConnectionJdbcStatement()( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 4789fbe5c6bae..9bc0be87be5af 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -27,12 +27,12 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.spark.{SecurityManager, SparkConf, TestUtils} -import org.apache.spark.internal.config -import org.apache.spark.internal.config.UI +import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED +import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils @@ -187,11 +187,11 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { val args = Seq( "--name", "prepare testing tables", "--master", "local[2]", - "--conf", s"${UI.UI_ENABLED.key}=false", - "--conf", s"${config.MASTER_REST_SERVER_ENABLED}=false", + "--conf", s"${UI_ENABLED.key}=false", + "--conf", s"${MASTER_REST_SERVER_ENABLED.key}=false", "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1", "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven", - "--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}", + "--conf", s"${WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}", "--conf", s"spark.sql.test.version.index=$index", "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", tempPyFile.getCanonicalPath) @@ -206,11 +206,11 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"), "--name", "HiveExternalCatalog backward compatibility test", "--master", "local[2]", - "--conf", s"${UI.UI_ENABLED.key}=false", - "--conf", s"${config.MASTER_REST_SERVER_ENABLED}=false", + "--conf", s"${UI_ENABLED.key}=false", + "--conf", s"${MASTER_REST_SERVER_ENABLED.key}=false", "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1", "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven", - "--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}", + "--conf", s"${WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}", "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", unusedJar.toString) runSparkSubmit(args) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 4cc0cfc9a0e8c..e2ddec3427665 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHiveContext} -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -393,16 +394,16 @@ object SetWarehouseLocationTest extends Logging { // We are expecting that the value of spark.sql.warehouse.dir will override the // value of hive.metastore.warehouse.dir. val session = new TestHiveContext(new SparkContext(sparkConf - .set(StaticSQLConf.WAREHOUSE_PATH.key, warehouseLocation.toString) + .set(WAREHOUSE_PATH.key, warehouseLocation.toString) .set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString))) .sparkSession (session, warehouseLocation.toString) } - if (sparkSession.conf.get(StaticSQLConf.WAREHOUSE_PATH.key) != expectedWarehouseLocation) { + if (sparkSession.conf.get(WAREHOUSE_PATH.key) != expectedWarehouseLocation) { throw new Exception( - s"${StaticSQLConf.WAREHOUSE_PATH.key} is not set to the expected warehouse location " + + s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location " + s"$expectedWarehouseLocation.") } @@ -565,7 +566,7 @@ object SparkSubmitClassLoaderTest extends Logging { val conf = new SparkConf() val hiveWarehouseLocation = Utils.createTempDir() conf.set(UI_ENABLED, false) - conf.set(StaticSQLConf.WAREHOUSE_PATH.key, hiveWarehouseLocation.toString) + conf.set(WAREHOUSE_PATH.key, hiveWarehouseLocation.toString) val sc = new SparkContext(conf) val hiveContext = new TestHiveContext(sc) val df = hiveContext.createDataFrame((1 to 100).map(i => (i, i))).toDF("i", "j") @@ -680,7 +681,7 @@ object SPARK_9757 extends QueryTest { .set(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1") .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") .set(UI_ENABLED, false) - .set(StaticSQLConf.WAREHOUSE_PATH.key, hiveWarehouseLocation.toString)) + .set(WAREHOUSE_PATH.key, hiveWarehouseLocation.toString)) val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession @@ -726,7 +727,7 @@ object SPARK_11009 extends QueryTest { val sparkContext = new SparkContext( new SparkConf() .set(UI_ENABLED, false) - .set(SQLConf.SHUFFLE_PARTITIONS.key, "100")) + .set(SHUFFLE_PARTITIONS.key, "100")) val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession @@ -757,7 +758,7 @@ object SPARK_14244 extends QueryTest { val sparkContext = new SparkContext( new SparkConf() .set(UI_ENABLED, false) - .set(SQLConf.SHUFFLE_PARTITIONS.key, "100")) + .set(SHUFFLE_PARTITIONS.key, "100")) val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 30ea0c84c7acc..1a784fcfad319 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -44,8 +44,8 @@ import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal._ -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, WithTestConf} +import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, WAREHOUSE_PATH} import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. @@ -59,7 +59,7 @@ object TestHive .set(SQLConf.CODEGEN_FALLBACK.key, "false") .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, "org.apache.spark.sql.hive.execution.PairSerDe") - .set(StaticSQLConf.WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) + .set(WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) // SPARK-8910 .set(UI_ENABLED, false) .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) @@ -535,7 +535,7 @@ private[hive] class TestHiveSparkSession( // Clean out the Hive warehouse between each suite val warehouseDir = new File(new URI( - sparkContext.conf.get(StaticSQLConf.WAREHOUSE_PATH.key)).getPath) + sparkContext.conf.get(WAREHOUSE_PATH.key)).getPath) Utils.deleteRecursively(warehouseDir) warehouseDir.mkdir() From e601c74e8fbca8c1fab14483cd07113ace3e4322 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 11 Jul 2019 22:53:07 +0800 Subject: [PATCH 4/5] update --- .../spark/sql/execution/command/PlanResolutionSuite.scala | 4 ++-- .../spark/sql/execution/metric/SQLMetricsTestUtils.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 293fe56e3a8f0..ce209666024d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -78,7 +78,7 @@ class PlanResolutionSuite extends AnalysisTest { def parseAndResolve(query: String, withDefault: Boolean = false): LogicalPlan = { val newConf = conf.copy() - newConf.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, "testcat") + newConf.setConfString(DEFAULT_V2_CATALOG.key, "testcat") DataSourceResolution(newConf, if (withDefault) lookupWithDefault else lookupWithoutDefault) .apply(parsePlan(query)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index dc58bfb7b24b0..8f26c04307adc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED import org.apache.spark.sql.test.SQLTestUtils @@ -155,7 +155,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { expectedNodeIds: Set[Long], enableWholeStage: Boolean = false): Option[Map[Long, (String, Map[String, Any])]] = { val previousExecutionIds = currentExecutionIds() - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage.toString) { + withSQLConf(WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage.toString) { df.collect() } sparkContext.listenerBus.waitUntilEmpty(10000) From b39fb625b18d5a07d3076406a73e0db8d519ae4b Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Fri, 12 Jul 2019 09:03:16 +0800 Subject: [PATCH 5/5] update --- .../scala/org/apache/spark/sql/RuntimeConfigSuite.scala | 7 ++++--- .../spark/sql/execution/SQLWindowFunctionSuite.scala | 6 +++--- .../scala/org/apache/spark/sql/hive/test/TestHive.scala | 3 +-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala index d8d0211e553b3..720d570ca8384 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION +import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD class RuntimeConfigSuite extends SparkFunSuite { @@ -61,8 +62,8 @@ class RuntimeConfigSuite extends SparkFunSuite { val conf = newConf() // SQL configs - assert(!conf.isModifiable(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key)) - assert(conf.isModifiable(SQLConf.CHECKPOINT_LOCATION.key)) + assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key)) + assert(conf.isModifiable(CHECKPOINT_LOCATION.key)) // Core configs assert(!conf.isModifiable(config.CPUS_PER_TASK.key)) assert(!conf.isModifiable("spark.executor.cores")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index 3547c7dac84ea..971fd842f046a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.TestUtils.assertSpilled import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} import org.apache.spark.sql.test.SharedSQLContext case class WindowData(month: Int, area: String, product: Int) @@ -478,8 +478,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT RoW) """.stripMargin) - withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", - SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") { + withSQLConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", + WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") { assertSpilled(sparkContext, "test with low buffer spill threshold") { checkAnswer(actual, expected) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1a784fcfad319..d68a47053f18c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -534,8 +534,7 @@ private[hive] class TestHiveSparkSession( } // Clean out the Hive warehouse between each suite - val warehouseDir = new File(new URI( - sparkContext.conf.get(WAREHOUSE_PATH.key)).getPath) + val warehouseDir = new File(new URI(sparkContext.conf.get(WAREHOUSE_PATH.key)).getPath) Utils.deleteRecursively(warehouseDir) warehouseDir.mkdir()