Skip to content

Commit 14e3f11

Browse files
rxinmarmbrus
authored andcommitted
[SPARK-5168] Make SQLConf a field rather than mixin in SQLContext
This change should be binary and source backward compatible since we didn't change any user facing APIs. Author: Reynold Xin <rxin@databricks.com> Closes #3965 from rxin/SPARK-5168-sqlconf and squashes the following commits: 42eec09 [Reynold Xin] Fix default conf value. 0ef86cc [Reynold Xin] Fix constructor ordering. 4d7f910 [Reynold Xin] Properly override config. ccc8e6a [Reynold Xin] [SPARK-5168] Make SQLConf a field rather than mixin in SQLContext
1 parent 6463e0b commit 14e3f11

33 files changed

Lines changed: 124 additions & 92 deletions

sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ private[sql] trait CacheManager {
9191
CachedData(
9292
planToCache,
9393
InMemoryRelation(
94-
useCompression,
95-
columnBatchSize,
94+
conf.useCompression,
95+
conf.columnBatchSize,
9696
storageLevel,
9797
query.queryExecution.executedPlan,
9898
tableName))

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[spark] object SQLConf {
6161
*
6262
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
6363
*/
64-
private[sql] trait SQLConf {
64+
private[sql] class SQLConf {
6565
import SQLConf._
6666

6767
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.util.Properties
21+
22+
import scala.collection.immutable
2023
import scala.language.implicitConversions
2124
import scala.reflect.runtime.universe.TypeTag
2225

@@ -49,14 +52,37 @@ import org.apache.spark.sql.sources.{DataSourceStrategy, BaseRelation, DDLParser
4952
@AlphaComponent
5053
class SQLContext(@transient val sparkContext: SparkContext)
5154
extends org.apache.spark.Logging
52-
with SQLConf
5355
with CacheManager
5456
with ExpressionConversions
5557
with UDFRegistration
5658
with Serializable {
5759

5860
self =>
5961

62+
// Note that this is a lazy val so we can override the default value in subclasses.
63+
private[sql] lazy val conf: SQLConf = new SQLConf
64+
65+
/** Set Spark SQL configuration properties. */
66+
def setConf(props: Properties): Unit = conf.setConf(props)
67+
68+
/** Set the given Spark SQL configuration property. */
69+
def setConf(key: String, value: String): Unit = conf.setConf(key, value)
70+
71+
/** Return the value of Spark SQL configuration property for the given key. */
72+
def getConf(key: String): String = conf.getConf(key)
73+
74+
/**
75+
* Return the value of Spark SQL configuration property for the given key. If the key is not set
76+
* yet, return `defaultValue`.
77+
*/
78+
def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
79+
80+
/**
81+
* Return all the configuration properties that have been set (i.e. not the default).
82+
* This creates a new copy of the config properties in the form of a Map.
83+
*/
84+
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
85+
6086
@transient
6187
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
6288

@@ -212,7 +238,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
212238
*/
213239
@Experimental
214240
def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
215-
val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
241+
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
216242
val appliedSchema =
217243
Option(schema).getOrElse(
218244
JsonRDD.nullTypeToStringType(
@@ -226,7 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
226252
*/
227253
@Experimental
228254
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
229-
val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
255+
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
230256
val appliedSchema =
231257
JsonRDD.nullTypeToStringType(
232258
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
@@ -299,10 +325,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
299325
* @group userf
300326
*/
301327
def sql(sqlText: String): SchemaRDD = {
302-
if (dialect == "sql") {
328+
if (conf.dialect == "sql") {
303329
new SchemaRDD(this, parseSql(sqlText))
304330
} else {
305-
sys.error(s"Unsupported SQL dialect: $dialect")
331+
sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
306332
}
307333
}
308334

@@ -323,9 +349,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
323349

324350
val sqlContext: SQLContext = self
325351

326-
def codegenEnabled = self.codegenEnabled
352+
def codegenEnabled = self.conf.codegenEnabled
327353

328-
def numPartitions = self.numShufflePartitions
354+
def numPartitions = self.conf.numShufflePartitions
329355

330356
def strategies: Seq[Strategy] =
331357
extraStrategies ++ (

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
5252
* @group userf
5353
*/
5454
def sql(sqlText: String): JavaSchemaRDD = {
55-
if (sqlContext.dialect == "sql") {
55+
if (sqlContext.conf.dialect == "sql") {
5656
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
5757
} else {
5858
sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
@@ -164,7 +164,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
164164
* It goes through the entire dataset once to determine the schema.
165165
*/
166166
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
167-
val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
167+
val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
168168
val appliedScalaSchema =
169169
JsonRDD.nullTypeToStringType(
170170
JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
@@ -182,7 +182,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
182182
*/
183183
@Experimental
184184
def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
185-
val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
185+
val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
186186
val appliedScalaSchema =
187187
Option(asScalaDataType(schema)).getOrElse(
188188
JsonRDD.nullTypeToStringType(

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private[sql] case class InMemoryRelation(
8282
if (batchStats.value.isEmpty) {
8383
// Underlying columnar RDD hasn't been materialized, no useful statistics information
8484
// available, return the default statistics.
85-
Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes)
85+
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
8686
} else {
8787
// Underlying columnar RDD has been materialized, required information has also been collected
8888
// via the `batchStats` accumulator, compute the final statistics, and update `_statistics`.
@@ -233,7 +233,7 @@ private[sql] case class InMemoryColumnarTableScan(
233233
val readPartitions = sparkContext.accumulator(0)
234234
val readBatches = sparkContext.accumulator(0)
235235

236-
private val inMemoryPartitionPruningEnabled = sqlContext.inMemoryPartitionPruning
236+
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
237237

238238
override def execute() = {
239239
readPartitions.setValue(0)

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
123123
*/
124124
private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
125125
// TODO: Determine the number of partitions.
126-
def numPartitions = sqlContext.numShufflePartitions
126+
def numPartitions = sqlContext.conf.numShufflePartitions
127127

128128
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
129129
case operator: SparkPlan =>

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont
6969
@transient override lazy val statistics = Statistics(
7070
// TODO: Instead of returning a default value here, find a way to return a meaningful size
7171
// estimate for RDDs. See PR 1238 for more discussions.
72-
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
72+
sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
7373
)
7474
}
7575

@@ -106,6 +106,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
106106
@transient override lazy val statistics = Statistics(
107107
// TODO: Instead of returning a default value here, find a way to return a meaningful size
108108
// estimate for RDDs. See PR 1238 for more discussions.
109-
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
109+
sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
110110
)
111111
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
5151
// sqlContext will be null when we are being deserialized on the slaves. In this instance
5252
// the value of codegenEnabled will be set by the desserializer after the constructor has run.
5353
val codegenEnabled: Boolean = if (sqlContext != null) {
54-
sqlContext.codegenEnabled
54+
sqlContext.conf.codegenEnabled
5555
} else {
5656
false
5757
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3535
object LeftSemiJoin extends Strategy with PredicateHelper {
3636
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3737
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right)
38-
if sqlContext.autoBroadcastJoinThreshold > 0 &&
39-
right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
38+
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
39+
right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
4040
val semiJoin = joins.BroadcastLeftSemiJoinHash(
4141
leftKeys, rightKeys, planLater(left), planLater(right))
4242
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
@@ -81,13 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
8181

8282
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
8383
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
84-
if sqlContext.autoBroadcastJoinThreshold > 0 &&
85-
right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
84+
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
85+
right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
8686
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
8787

8888
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
89-
if sqlContext.autoBroadcastJoinThreshold > 0 &&
90-
left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
89+
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
90+
left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
9191
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
9292

9393
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
@@ -215,7 +215,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
215215
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
216216
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
217217
val prunePushedDownFilters =
218-
if (sqlContext.parquetFilterPushDown) {
218+
if (sqlContext.conf.parquetFilterPushDown) {
219219
(predicates: Seq[Expression]) => {
220220
// Note: filters cannot be pushed down to Parquet if they contain more complex
221221
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
@@ -237,7 +237,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
237237
ParquetTableScan(
238238
_,
239239
relation,
240-
if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
240+
if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
241241

242242
case _ => Nil
243243
}
@@ -270,7 +270,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
270270
// This sort only sorts tuples within a partition. Its requiredDistribution will be
271271
// an UnspecifiedDistribution.
272272
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
273-
case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
273+
case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
274274
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
275275
case logical.Sort(sortExprs, global, child) =>
276276
execution.Sort(sortExprs, global, planLater(child)):: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ case class SetCommand(
9494
logWarning(
9595
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
9696
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
97-
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
97+
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
9898

9999
// Queries a single property.
100100
case Some((key, None)) =>

0 commit comments

Comments
 (0)