Skip to content

Commit 322ec0b

Browse files
gengliangwangdongjoon-hyun
authored andcommitted
[SPARK-28885][SQL] Follow ANSI store assignment rules in table insertion by default
### What changes were proposed in this pull request? When inserting a value into a column with the different data type, Spark performs type coercion. Currently, we support 3 policies for the store assignment rules: ANSI, legacy and strict, which can be set via the option "spark.sql.storeAssignmentPolicy": 1. ANSI: Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean`. It will throw a runtime exception if the value is out-of-range(overflow). 2. Legacy: Spark allows the type coercion as long as it is a valid `Cast`, which is very loose. E.g., converting either `string` to `int` or `double` to `boolean` is allowed. It is the current behavior in Spark 2.x for compatibility with Hive. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of Byte type, the result is 1. 3. Strict: Spark doesn't allow any possible precision loss or data truncation in store assignment, e.g., converting either `double` to `int` or `decimal` to `double` is allowed. The rules are originally for Dataset encoder. As far as I know, no mainstream DBMS is using this policy by default. Currently, the V1 data source uses "Legacy" policy by default, while V2 uses "Strict". This proposal is to use "ANSI" policy by default for both V1 and V2 in Spark 3.0. ### Why are the changes needed? Following the ANSI SQL standard is most reasonable among the 3 policies. ### Does this PR introduce any user-facing change? Yes. The default store assignment policy is ANSI for both V1 and V2 data sources. ### How was this patch tested? Unit test Closes #26107 from gengliangwang/ansiPolicyAsDefault. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2e28622 commit 322ec0b

19 files changed

Lines changed: 92 additions & 57 deletions

File tree

docs/sql-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ license: |
2323
{:toc}
2424

2525
## Upgrading from Spark SQL 2.4 to 3.0
26+
- Since Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception will be thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and earlier, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior.
2627

2728
- In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`.
2829

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2502,9 +2502,9 @@ class Analyzer(
25022502
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
25032503
case append @ AppendData(table, query, _, isByName)
25042504
if table.resolved && query.resolved && !append.outputResolved =>
2505+
validateStoreAssignmentPolicy()
25052506
val projection =
2506-
TableOutputResolver.resolveOutputColumns(
2507-
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
2507+
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
25082508

25092509
if (projection != query) {
25102510
append.copy(query = projection)
@@ -2514,9 +2514,9 @@ class Analyzer(
25142514

25152515
case overwrite @ OverwriteByExpression(table, _, query, _, isByName)
25162516
if table.resolved && query.resolved && !overwrite.outputResolved =>
2517+
validateStoreAssignmentPolicy()
25172518
val projection =
2518-
TableOutputResolver.resolveOutputColumns(
2519-
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
2519+
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
25202520

25212521
if (projection != query) {
25222522
overwrite.copy(query = projection)
@@ -2526,9 +2526,9 @@ class Analyzer(
25262526

25272527
case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName)
25282528
if table.resolved && query.resolved && !overwrite.outputResolved =>
2529+
validateStoreAssignmentPolicy()
25292530
val projection =
2530-
TableOutputResolver.resolveOutputColumns(
2531-
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
2531+
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
25322532

25332533
if (projection != query) {
25342534
overwrite.copy(query = projection)
@@ -2538,16 +2538,14 @@ class Analyzer(
25382538
}
25392539
}
25402540

2541-
private def storeAssignmentPolicy: StoreAssignmentPolicy.Value = {
2542-
val policy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.STRICT)
2541+
private def validateStoreAssignmentPolicy(): Unit = {
25432542
// SPARK-28730: LEGACY store assignment policy is disallowed in data source v2.
2544-
if (policy == StoreAssignmentPolicy.LEGACY) {
2543+
if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
25452544
val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key
25462545
throw new AnalysisException(s"""
25472546
|"LEGACY" store assignment policy is disallowed in Spark data source V2.
25482547
|Please set the configuration $configKey to other values.""".stripMargin)
25492548
}
2550-
policy
25512549
}
25522550

25532551
private def commonNaturalJoinProcessing(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ object TableOutputResolver {
3232
expected: Seq[Attribute],
3333
query: LogicalPlan,
3434
byName: Boolean,
35-
conf: SQLConf,
36-
storeAssignmentPolicy: StoreAssignmentPolicy.Value): LogicalPlan = {
35+
conf: SQLConf): LogicalPlan = {
3736

3837
if (expected.size < query.output.size) {
3938
throw new AnalysisException(
@@ -47,8 +46,7 @@ object TableOutputResolver {
4746
expected.flatMap { tableAttr =>
4847
query.resolve(Seq(tableAttr.name), conf.resolver) match {
4948
case Some(queryExpr) =>
50-
checkField(
51-
tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err)
49+
checkField(tableAttr, queryExpr, byName, conf, err => errors += err)
5250
case None =>
5351
errors += s"Cannot find data for output column '${tableAttr.name}'"
5452
None
@@ -66,8 +64,7 @@ object TableOutputResolver {
6664

6765
query.output.zip(expected).flatMap {
6866
case (queryExpr, tableAttr) =>
69-
checkField(
70-
tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err)
67+
checkField(tableAttr, queryExpr, byName, conf, err => errors += err)
7168
}
7269
}
7370

@@ -88,9 +85,9 @@ object TableOutputResolver {
8885
queryExpr: NamedExpression,
8986
byName: Boolean,
9087
conf: SQLConf,
91-
storeAssignmentPolicy: StoreAssignmentPolicy.Value,
9288
addError: String => Unit): Option[NamedExpression] = {
9389

90+
val storeAssignmentPolicy = conf.storeAssignmentPolicy
9491
lazy val outputField = if (tableAttr.dataType.sameType(queryExpr.dataType) &&
9592
tableAttr.name == queryExpr.name &&
9693
tableAttr.metadata == queryExpr.metadata) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ object Cast {
165165
*/
166166
def canANSIStoreAssign(from: DataType, to: DataType): Boolean = (from, to) match {
167167
case _ if from == to => true
168+
case (NullType, _) => true
168169
case (_: NumericType, _: NumericType) => true
169170
case (_: AtomicType, StringType) => true
170171
case (_: CalendarIntervalType, StringType) => true

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,7 +1740,7 @@ object SQLConf {
17401740
.stringConf
17411741
.transform(_.toUpperCase(Locale.ROOT))
17421742
.checkValues(StoreAssignmentPolicy.values.map(_.toString))
1743-
.createOptional
1743+
.createWithDefault(StoreAssignmentPolicy.ANSI.toString)
17441744

17451745
val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled")
17461746
.doc("When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will " +
@@ -2473,8 +2473,8 @@ class SQLConf extends Serializable with Logging {
24732473
def partitionOverwriteMode: PartitionOverwriteMode.Value =
24742474
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
24752475

2476-
def storeAssignmentPolicy: Option[StoreAssignmentPolicy.Value] =
2477-
getConf(STORE_ASSIGNMENT_POLICY).map(StoreAssignmentPolicy.withName)
2476+
def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
2477+
StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY))
24782478

24792479
def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
24802480

sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,8 @@ object DataType {
456456
true
457457
}
458458

459+
case (_: NullType, _) if storeAssignmentPolicy == ANSI => true
460+
459461
case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI =>
460462
if (!Cast.canANSIStoreAssign(w, r)) {
461463
addError(s"Cannot safely cast '$context': $w to $r")

sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa
7676
assert(err.contains("Cannot safely cast"))
7777
}
7878
}
79+
80+
test("Check NullType is incompatible with all other types") {
81+
allNonNullTypes.foreach { t =>
82+
assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err =>
83+
assert(err.contains(s"incompatible with $t"))
84+
}
85+
}
86+
}
7987
}
8088

8189
class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBaseSuite {
@@ -145,6 +153,12 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase
145153
assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType"))
146154
}
147155
}
156+
157+
test("Check NullType is compatible with all other types") {
158+
allNonNullTypes.foreach { t =>
159+
assertAllowed(NullType, t, "nulls", s"Should allow writing None to type $t")
160+
}
161+
}
148162
}
149163

150164
abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite {
@@ -175,17 +189,9 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite {
175189
private val nestedContainerTypes = Seq(ArrayType(point2, containsNull = false),
176190
MapType(StringType, point3, valueContainsNull = false))
177191

178-
private val allNonNullTypes = Seq(
192+
protected val allNonNullTypes = Seq(
179193
atomicTypes, simpleContainerTypes, nestedContainerTypes, Seq(CalendarIntervalType)).flatten
180194

181-
test("Check NullType is incompatible with all other types") {
182-
allNonNullTypes.foreach { t =>
183-
assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err =>
184-
assert(err.contains(s"incompatible with $t"))
185-
}
186-
}
187-
}
188-
189195
test("Check each type with itself") {
190196
allNonNullTypes.foreach { t =>
191197
assertAllowed(t, t, "t", s"Should allow writing type to itself $t")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
189189
query
190190
}
191191

192-
// SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy.
193-
// TODO: use ANSI store assignment policy by default in SPARK-28495.
194-
val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY)
195192
c.copy(
196193
tableDesc = existingTable,
197194
query = Some(TableOutputResolver.resolveOutputColumns(
198195
tableDesc.qualifiedName, existingTable.schema.toAttributes, newQuery,
199-
byName = true, conf, storeAssignmentPolicy)))
196+
byName = true, conf)))
200197

201198
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
202199
// config, and do various checks:
@@ -402,11 +399,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
402399
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
403400
}
404401

405-
// SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy.
406-
// TODO: use ANSI store assignment policy by default in SPARK-28495.
407-
val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY)
408402
val newQuery = TableOutputResolver.resolveOutputColumns(
409-
tblName, expectedColumns, insert.query, byName = false, conf, storeAssignmentPolicy)
403+
tblName, expectedColumns, insert.query, byName = false, conf)
410404
if (normalizedPartSpec.nonEmpty) {
411405
if (normalizedPartSpec.size != partColNames.length) {
412406
throw new AnalysisException(

sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,19 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
142142

143143
/** List of test cases to ignore, in lower cases. */
144144
protected def blackList: Set[String] = Set(
145-
"blacklist.sql" // Do NOT remove this one. It is here to test the blacklist functionality.
145+
"blacklist.sql", // Do NOT remove this one. It is here to test the blacklist functionality.
146+
// SPARK-28885 String value is not allowed to be stored as numeric type with
147+
// ANSI store assignment policy.
148+
"postgreSQL/numeric.sql",
149+
"postgreSQL/int2.sql",
150+
"postgreSQL/int4.sql",
151+
"postgreSQL/int8.sql",
152+
"postgreSQL/float4.sql",
153+
"postgreSQL/float8.sql",
154+
// SPARK-28885 String value is not allowed to be stored as date/timestamp type with
155+
// ANSI store assignment policy.
156+
"postgreSQL/date.sql",
157+
"postgreSQL/timestamp.sql"
146158
)
147159

148160
// Create all the test cases.

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
150150
Seq(3 -> "c").toDF("i", "j").write.mode("append").saveAsTable("t")
151151
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
152152

153-
Seq("c" -> 3).toDF("i", "j").write.mode("append").saveAsTable("t")
153+
Seq(3.5 -> 3).toDF("i", "j").write.mode("append").saveAsTable("t")
154154
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
155-
:: Row(null, "3") :: Nil)
155+
:: Row(3, "3") :: Nil)
156156

157157
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
158158

0 commit comments

Comments
 (0)