Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.apache.spark.sql.errors

import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext}
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.catalyst.util.QuotingUtils.toSQLSchema
import org.apache.spark.sql.types.{DataType, Decimal, StringType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -97,49 +99,45 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase {
}

def schemaFailToParseError(schema: String, e: Throwable): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "INVALID_SCHEMA.PARSE_ERROR",
messageParameters = Map(
"inputSchema" -> QuotingUtils.toSQLSchema(schema),
"inputSchema" -> toSQLSchema(schema),
"reason" -> e.getMessage
),
cause = e)
cause = Some(e))
}

def invalidDayTimeIntervalType(startFieldName: String, endFieldName: String): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1224",
messageParameters = Map(
"startFieldName" -> startFieldName,
"endFieldName" -> endFieldName),
cause = null)
"endFieldName" -> endFieldName))
}

def invalidDayTimeField(field: Byte, supportedIds: Seq[String]): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1223",
messageParameters = Map(
"field" -> field.toString,
"supportedIds" -> supportedIds.mkString(", ")),
cause = null)
"supportedIds" -> supportedIds.mkString(", ")))
}

def invalidYearMonthField(field: Byte, supportedIds: Seq[String]): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1225",
messageParameters = Map(
"field" -> field.toString,
"supportedIds" -> supportedIds.mkString(", ")),
cause = null)
"supportedIds" -> supportedIds.mkString(", ")))
}

def decimalCannotGreaterThanPrecisionError(scale: Int, precision: Int): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1228",
messageParameters = Map(
"scale" -> scale.toString,
"precision" -> precision.toString),
cause = null)
"precision" -> precision.toString))
}

def negativeScaleNotAllowedError(scale: Int): Throwable = {
Expand All @@ -150,10 +148,9 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase {
}

def attributeNameSyntaxError(name: String): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1049",
messageParameters = Map("name" -> name),
cause = null)
messageParameters = Map("name" -> name))
}

def cannotMergeIncompatibleDataTypesError(left: DataType, right: DataType): Throwable = {
Expand Down Expand Up @@ -182,13 +179,12 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase {
}

def invalidFieldName(fieldName: Seq[String], path: Seq[String], context: Origin): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "INVALID_FIELD_NAME",
messageParameters = Map(
"fieldName" -> toSQLId(fieldName),
"path" -> toSQLId(path)),
cause = null,
context = context.getQueryContext)
origin = context)
}

def unscaledValueTooLargeForPrecisionError(
Expand Down Expand Up @@ -241,13 +237,12 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase {

def ambiguousColumnOrFieldError(
name: Seq[String], numMatches: Int, context: Origin): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "AMBIGUOUS_COLUMN_OR_FIELD",
messageParameters = Map(
"name" -> toSQLId(name),
"n" -> numMatches.toString),
cause = null,
context = context.getQueryContext)
origin = context)
}

def castingCauseOverflowError(t: String, from: DataType, to: DataType): ArithmeticException = {
Expand Down Expand Up @@ -283,17 +278,15 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase {
}

def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
messageParameters = Map.empty,
cause = null)
messageParameters = Map.empty)
}

def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
new SparkException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1189",
messageParameters = Map("operation" -> operation),
cause = null)
messageParameters = Map("operation" -> operation))
}

def unsupportedDataTypeError(typeName: DataType): SparkUnsupportedOperationException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1921,12 +1921,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {

def ambiguousColumnOrFieldError(
name: Seq[String], numMatches: Int, context: Origin): Throwable = {
new AnalysisException(
errorClass = "AMBIGUOUS_COLUMN_OR_FIELD",
messageParameters = Map(
"name" -> toSQLId(name),
"n" -> numMatches.toString),
origin = context)
DataTypeErrors.ambiguousColumnOrFieldError(name, numMatches, context)
}

def ambiguousColumnOrFieldError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
check(Seq("s1", "S12"), Some(Seq("s1") -> StructField("s12", IntegerType)))
caseSensitiveCheck(Seq("s1", "S12"), None)
check(Seq("S1.non_exist"), None)
var e = intercept[SparkException] {
var e = intercept[AnalysisException] {
check(Seq("S1", "S12", "S123"), None)
}
checkError(
Expand All @@ -335,17 +335,17 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
"path" -> "`s1`.`s12`"))

// ambiguous name
var e2 = intercept[SparkException] {
e = intercept[AnalysisException] {
check(Seq("S2", "x"), None)
}
checkError(
exception = e2,
exception = e,
errorClass = "AMBIGUOUS_COLUMN_OR_FIELD",
parameters = Map("name" -> "`S2`.`x`", "n" -> "2"))
caseSensitiveCheck(Seq("s2", "x"), Some(Seq("s2") -> StructField("x", IntegerType)))

// simple map type
e = intercept[SparkException] {
e = intercept[AnalysisException] {
check(Seq("m1", "key"), None)
}
checkError(
Expand All @@ -356,7 +356,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
"path" -> "`m1`"))
checkCollection(Seq("m1", "key"), Some(Seq("m1") -> StructField("key", IntegerType, false)))
checkCollection(Seq("M1", "value"), Some(Seq("m1") -> StructField("value", IntegerType)))
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("M1", "key", "name"), None)
}
checkError(
Expand All @@ -365,7 +365,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
parameters = Map(
"fieldName" -> "`M1`.`key`.`name`",
"path" -> "`m1`.`key`"))
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("M1", "value", "name"), None)
}
checkError(
Expand All @@ -382,7 +382,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
checkCollection(Seq("M2", "value", "b"),
Some(Seq("m2", "value") -> StructField("b", IntegerType)))
checkCollection(Seq("M2", "value", "non_exist"), None)
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("m2", "key", "A", "name"), None)
}
checkError(
Expand All @@ -391,7 +391,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
parameters = Map(
"fieldName" -> "`m2`.`key`.`A`.`name`",
"path" -> "`m2`.`key`.`a`"))
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("M2", "value", "b", "name"), None)
}
checkError(
Expand All @@ -401,7 +401,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
"fieldName" -> "`M2`.`value`.`b`.`name`",
"path" -> "`m2`.`value`.`b`"))
// simple array type
e = intercept[SparkException] {
e = intercept[AnalysisException] {
check(Seq("A1", "element"), None)
}
checkError(
Expand All @@ -411,7 +411,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
"fieldName" -> "`A1`.`element`",
"path" -> "`a1`"))
checkCollection(Seq("A1", "element"), Some(Seq("a1") -> StructField("element", IntegerType)))
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("A1", "element", "name"), None)
}
checkError(
Expand All @@ -425,7 +425,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
checkCollection(Seq("A2", "element", "C"),
Some(Seq("a2", "element") -> StructField("c", IntegerType)))
checkCollection(Seq("A2", "element", "non_exist"), None)
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("a2", "element", "C", "name"), None)
}
checkError(
Expand All @@ -439,7 +439,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
checkCollection(Seq("M3", "value", "value", "MA"),
Some(Seq("m3", "value", "value") -> StructField("ma", IntegerType)))
checkCollection(Seq("M3", "value", "value", "non_exist"), None)
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("M3", "value", "value", "MA", "name"), None)
}
checkError(
Expand All @@ -453,7 +453,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
checkCollection(Seq("A3", "element", "element", "D"),
Some(Seq("a3", "element", "element") -> StructField("d", IntegerType)))
checkCollection(Seq("A3", "element", "element", "non_exist"), None)
e = intercept[SparkException] {
e = intercept[AnalysisException] {
checkCollection(Seq("A3", "element", "element", "D", "name"), None)
}
checkError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,11 +794,15 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {

test("invalidate char/varchar in functions") {
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
parameters = Map.empty
parameters = Map.empty,
context = ExpectedContext(
fragment = "from_json('{\"a\": \"str\"}', 'a CHAR(5)')",
start = 7,
stop = 44)
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
val df = sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
Expand All @@ -812,19 +816,19 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
val df = spark.range(10).map(_.toString).toDF()
val schema = new StructType().add("id", CharType(5))
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.createDataFrame(df.collectAsList(), schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.createDataFrame(df.rdd, schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.createDataFrame(df.toJavaRDD, schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
Expand All @@ -838,12 +842,12 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {

test("invalidate char/varchar in spark.read.schema") {
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.read.schema(new StructType().add("id", CharType(5)))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING")
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.read.schema("id char(5)")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
Expand Down Expand Up @@ -880,13 +884,13 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {

test("invalidate char/varchar in udf's result type") {
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.udf.register("testchar", () => "B", VarcharType(1))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
Expand All @@ -905,13 +909,13 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {

test("invalidate char/varchar in spark.readStream.schema") {
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.readStream.schema(new StructType().add("id", CharType(5)))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
spark.readStream.schema("id char(5)")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
Expand All @@ -934,7 +938,7 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
sql("CREATE TABLE t(c char(10), v varchar(255)) USING parquet")
sql("INSERT INTO t VALUES('spark', 'awesome')")
val df = sql("SELECT * FROM t")
checkError(exception = intercept[SparkException] {
checkError(exception = intercept[AnalysisException] {
df.to(newSchema)
}, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", parameters = Map.empty)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1721,7 +1721,7 @@ class DataFrameSuite extends QueryTest

def checkSyntaxError(name: String): Unit = {
checkError(
exception = intercept[SparkException] {
exception = intercept[org.apache.spark.sql.AnalysisException] {
df(name)
},
errorClass = "_LEGACY_ERROR_TEMP_1049",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
val invalidJsonSchema = """{"fields": [{"a":123}], "type": "struct"}"""
val invalidJsonSchemaReason = "Failed to convert the JSON string '{\"a\":123}' to a field."
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
df.select(from_json($"json", invalidJsonSchema, Map.empty[String, String])).collect()
},
errorClass = "INVALID_SCHEMA.PARSE_ERROR",
Expand All @@ -1165,7 +1165,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
"was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n " +
"at [Source: (String)\"MAP<INT, cow>\"; line: 1, column: 4]"
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
df.select(from_json($"json", invalidDataType, Map.empty[String, String])).collect()
},
errorClass = "INVALID_SCHEMA.PARSE_ERROR",
Expand All @@ -1180,7 +1180,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
"was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" +
" at [Source: (String)\"x INT, a cow\"; line: 1, column: 2]"
checkError(
exception = intercept[SparkException] {
exception = intercept[AnalysisException] {
df.select(from_json($"json", invalidTableSchema, Map.empty[String, String])).collect()
},
errorClass = "INVALID_SCHEMA.PARSE_ERROR",
Expand Down
Loading