Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,6 @@ private[spark] class TaskContextImpl(

private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException

// TODO: shall we publish it and define it in `TaskContext`?
private[spark] def getLocalProperties(): Properties = localProperties
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -261,9 +260,7 @@ trait CheckAnalysis extends PredicateHelper {
// Check if the data types match.
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
// SPARK-18058: we shall not care about the nullability of columns
val widerType = TypeCoercion.findWiderTypeForTwo(
dt1.asNullable, dt2.asNullable, SQLConf.get.caseSensitiveAnalysis)
if (widerType.isEmpty) {
if (TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty) {
failAnalysis(
s"""
|${operator.nodeName} can only be performed on tables with the compatible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ case class ResolveInlineTables(conf: SQLConf) extends Rule[LogicalPlan] with Cas
// For each column, traverse all the values and find a common data type and nullability.
val fields = table.rows.transpose.zip(table.names).map { case (column, name) =>
val inputTypes = column.map(_.dataType)
val wideType = TypeCoercion.findWiderTypeWithoutStringPromotion(
inputTypes, conf.caseSensitiveAnalysis)
val tpe = wideType.getOrElse {
val tpe = TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse {
table.failAnalysis(s"incompatible types found in column $name for inline table")
}
StructField(name, tpe, nullable = column.exists(_.nullable))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,4 @@ private[sql] object CreateJacksonParser extends Serializable {
def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
jsonFactory.createParser(new InputStreamReader(is, enc))
}

def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
val ba = row.getBinary(0)

jsonFactory.createParser(ba, 0, ba.length)
}

def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
val binary = row.getBinary(0)
val sd = getStreamDecoder(enc, binary, binary.length)

jsonFactory.createParser(sd)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these two removed? Looks like no SQLConf involved here?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal

import java.util.{Map => JMap}

import org.apache.spark.{TaskContext, TaskContextImpl}
import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, ConfigReader}

/**
* A readonly SQLConf that will be created by tasks running at the executor side. It reads the
* configs from the local properties which are propagated from driver to executors.
*/
class ReadOnlySQLConf(context: TaskContext) extends SQLConf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice :)


@transient override val settings: JMap[String, String] = {
context.asInstanceOf[TaskContextImpl].getLocalProperties().asInstanceOf[JMap[String, String]]
}

@transient override protected val reader: ConfigReader = {
new ConfigReader(new TaskContextConfigProvider(context))
}

override protected def setConfWithCheck(key: String, value: String): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
}

override def unsetConf(key: String): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
}

override def unsetConf(entry: ConfigEntry[_]): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
}

override def clear(): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
}

override def clone(): SQLConf = {
throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
}

override def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to allow us to do clone? clone will create mutable SQLConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to clone or copy SQLConf in tasks, let's ban it.


class TaskContextConfigProvider(context: TaskContext) extends ConfigProvider {
override def get(key: String): Option[String] = Option(context.getLocalProperty(key))
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
Expand Down Expand Up @@ -108,11 +107,11 @@ object SQLConf {
* run unit tests (that does not involve SparkSession) in serial order.
*/
def get: SQLConf = {
if (Utils.isTesting && TaskContext.get != null) {
// we're accessing it during task execution, fail.
throw new IllegalStateException("SQLConf should only be created and accessed on the driver.")
if (TaskContext.get != null) {
new ReadOnlySQLConf(TaskContext.get())
} else {
confGetter.get()()
}
confGetter.get()()
}

val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
Expand Down Expand Up @@ -1284,7 +1283,7 @@ class SQLConf extends Serializable with Logging {
@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

@transient private val reader = new ConfigReader(settings)
@transient protected val reader = new ConfigReader(settings)

/** ************************ Spark SQL Params/Hints ******************* */

Expand Down Expand Up @@ -1734,7 +1733,7 @@ class SQLConf extends Serializable with Logging {
settings.containsKey(key)
}

private def setConfWithCheck(key: String, value: String): Unit = {
protected def setConfWithCheck(key: String, value: String): Unit = {
settings.put(key, value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ abstract class DataType extends AbstractDataType {
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
DataType.equalsIgnoreNullability(this, other)
if (SQLConf.get.caseSensitiveAnalysis) {
DataType.equalsIgnoreNullability(this, other)
} else {
DataType.equalsIgnoreCaseAndNullability(this, other)
}

/**
* Returns the same data type but set all nullability fields are true
Expand Down Expand Up @@ -214,7 +218,7 @@ object DataType {
/**
* Compares two types, ignoring nullability of ArrayType, MapType, StructType.
*/
private[sql] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
private[types] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
(left, right) match {
case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
equalsIgnoreNullability(leftElementType, rightElementType)
Expand Down
Loading