Skip to content

Commit cc592b9

Browse files
belieferchenzhx
authored andcommitted
[SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the data type is correct
### What changes were proposed in this pull request? Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the DS V2 push-down will throw exception show below: ``` Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor driver): org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL statement: SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee" WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) > 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00) [50004-210] ``` H2Dialect should implement `getJDBCType` of `JdbcDialect`. ### Why are the changes needed? make the H2 data type is correct. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug for `H2Dialect`. ### How was this patch tested? New tests. Closes apache#36516 from beliefer/SPARK-39157. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 7fa4ccc commit cc592b9

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.jdbc
1919

20-
import java.sql.SQLException
20+
import java.sql.{SQLException, Types}
2121
import java.util.Locale
2222

2323
import scala.util.control.NonFatal
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT
2727
import org.apache.spark.sql.connector.expressions.Expression
2828
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
2929
import org.apache.spark.sql.errors.QueryCompilationErrors
30+
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
31+
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
3032

3133
private object H2Dialect extends JdbcDialect {
3234
override def canHandle(url: String): Boolean =
@@ -90,6 +92,15 @@ private object H2Dialect extends JdbcDialect {
9092
)
9193
}
9294

95+
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
96+
case StringType => Option(JdbcType("CLOB", Types.CLOB))
97+
case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
98+
case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT))
99+
case t: DecimalType => Some(
100+
JdbcType(s"NUMERIC(${t.precision},${t.scale})", Types.NUMERIC))
101+
case _ => JdbcUtils.getCommonJDBCType(dt)
102+
}
103+
93104
override def classifyException(message: String, e: Throwable): AnalysisException = {
94105
if (e.isInstanceOf[SQLException]) {
95106
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,23 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
465465
checkFiltersRemoved(df7, false)
466466
checkPushedInfo(df7, "PushedFilters: [DEPT IS NOT NULL]")
467467
checkAnswer(df7, Seq(Row(6, "jen", 12000, 1200, true)))
468+
469+
val df8 = sql(
470+
"""
471+
|SELECT * FROM h2.test.employee
472+
|WHERE cast(bonus as string) like '%30%'
473+
|AND cast(dept as byte) > 1
474+
|AND cast(dept as short) > 1
475+
|AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin)
476+
checkFiltersRemoved(df8, ansiMode)
477+
val expectedPlanFragment8 = if (ansiMode) {
478+
"PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " +
479+
"CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...,"
480+
} else {
481+
"PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL],"
482+
}
483+
checkPushedInfo(df8, expectedPlanFragment8)
484+
checkAnswer(df8, Seq(Row(2, "david", 10000, 1300, true)))
468485
}
469486
}
470487
}

0 commit comments

Comments
 (0)