From 1abf3fc60d1108885df44ae5a39dd8dc83f3be30 Mon Sep 17 00:00:00 2001 From: shivsood Date: Fri, 2 Aug 2019 18:28:31 -0700 Subject: [PATCH 1/4] Fix for ByteType. Mapping of ByteType is changed to TinyInt in MsSQLServerDialect.scala. The fix fails with the following error when df.write is done with a dataframe what contains a ByteType. 19/08/02 18:25:44 INFO Executor: Finished task 0.0 in stage 7.0 (TID 7). 1197 bytes result sent to driver 19/08/02 18:25:44 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 7) in 43 ms on localhost (executor driver) (1/2) 19/08/02 18:25:44 INFO CodeGenerator: Code generated in 14.586963 ms 19/08/02 18:25:44 ERROR Executor: Exception in task 1.0 in stage 7.0 (TID 8) java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of tinyint if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, serialNum), ByteType) AS serialNum#231 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:344) at org.apache.spark.sql.SparkSession.$anonfun$createDataFrame$1(SparkSession.scala:367) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:662) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:845) --- .../jdbc/MsSqlServerIntegrationSuite.scala | 31 +++++++++++++++++++ .../datasources/jdbc/JdbcUtils.scala | 2 +- .../spark/sql/jdbc/MsSqlServerDialect.scala | 2 ++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index efd7ca74c796..937bddf76c52 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -23,6 +23,9 @@ import java.util.Properties import org.apache.spark.tags.DockerTest +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} + @DockerTest class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { @@ -202,4 +205,32 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { df2.write.jdbc(jdbcUrl, "datescopy", new Properties) df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) } + + def create_df() : DataFrame = { + val tableSchema = StructType(Seq( + StructField("serialNum",ByteType,true) + )) + + val tableData = Seq ( + Row(10) + ) + + spark.createDataFrame(spark.sparkContext.parallelize(tableData),tableSchema) + } + + test("SPARK-28151 Test write table with BYTETYPE") { + val df1 = create_df() + df1.write + .format("jdbc") + .mode("overwrite") + .option("url",jdbcUrl) + .option("dbtable","testTable") + .save() + val df2 = spark.read + .format("jdbc") + .option("url",jdbcUrl) + .option("dbtable","byteTable") + .load() + df2.show() + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 86a27b5afc25..5c4a38641f62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -550,7 +550,7 @@ object JdbcUtils extends Logging { case ByteType => (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setInt(pos + 1, row.getByte(pos)) + stmt.setByte(pos + 1, row.getByte(pos)) case BooleanType => (stmt: PreparedStatement, row: Row, pos: Int) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 805f73dee141..d770cb19fb49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -33,6 +33,7 @@ private object MsSqlServerDialect extends JdbcDialect { sqlType match { case java.sql.Types.SMALLINT => Some(ShortType) case java.sql.Types.REAL => Some(FloatType) + case java.sql.Types.TINYINT => Some(ByteType) case _ => None } } @@ -44,6 +45,7 @@ private object MsSqlServerDialect extends JdbcDialect { case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT)) case _ => None } From 3a5a62146217811b011946b342b3b0041960e47a Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 5 Aug 2019 14:00:34 -0700 Subject: [PATCH 2/4] fixed Numeric types test. tinyint should be compared with java.lang.byte and not java.lang.interger. Fixed styling issues noted in the review comments --- .../jdbc/MsSqlServerIntegrationSuite.scala | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 937bddf76c52..bbf6b424d2d3 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -21,10 +21,9 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties -import org.apache.spark.tags.DockerTest - -import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.types._ +import org.apache.spark.tags.DockerTest @DockerTest class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { @@ -122,7 +121,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { val types = row.toSeq.map(x => x.getClass.toString) assert(types.length == 12) assert(types(0).equals("class java.lang.Boolean")) - assert(types(1).equals("class java.lang.Integer")) + assert(types(1).equals("class java.lang.Byte")) assert(types(2).equals("class java.lang.Short")) assert(types(3).equals("class java.lang.Integer")) assert(types(4).equals("class java.lang.Long")) @@ -134,7 +133,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(10).equals("class java.math.BigDecimal")) assert(types(11).equals("class java.math.BigDecimal")) assert(row.getBoolean(0) == false) - assert(row.getInt(1) == 255) + assert(row.getByte(1) == 255.toByte) assert(row.getShort(2) == 32767) assert(row.getInt(3) == 2147483647) assert(row.getLong(4) == 9223372036854775807L) @@ -206,30 +205,23 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) } - def create_df() : DataFrame = { - val tableSchema = StructType(Seq( - StructField("serialNum",ByteType,true) - )) - - val tableData = Seq ( - Row(10) - ) - - spark.createDataFrame(spark.sparkContext.parallelize(tableData),tableSchema) - } - test("SPARK-28151 Test write table with BYTETYPE") { - val df1 = create_df() + val tableSchema = StructType(Seq(StructField("serialNum", ByteType, true))) + val tableData = Seq(Row(10)) + val df1 = spark.createDataFrame( + spark.sparkContext.parallelize(tableData), + tableSchema) + df1.write .format("jdbc") .mode("overwrite") - .option("url",jdbcUrl) - .option("dbtable","testTable") + .option("url", jdbcUrl) + .option("dbtable", "testTable") .save() val df2 = spark.read .format("jdbc") - .option("url",jdbcUrl) - .option("dbtable","byteTable") + .option("url", jdbcUrl) + .option("dbtable", "byteTable") .load() df2.show() } From 7658bbc55f6c658e799eb64b3d5648a3c5581da0 Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 14 Oct 2019 17:21:33 -0700 Subject: [PATCH 3/4] Fixed test. Added test to validate broader value range and col data type post read --- .../jdbc/MsSqlServerIntegrationSuite.scala | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index bbf6b424d2d3..e355403a1e69 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -206,23 +206,34 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { } test("SPARK-28151 Test write table with BYTETYPE") { - val tableSchema = StructType(Seq(StructField("serialNum", ByteType, true))) - val tableData = Seq(Row(10)) - val df1 = spark.createDataFrame( - spark.sparkContext.parallelize(tableData), - tableSchema) - - df1.write + val df : DataFrame = { + val schema = StructType(Seq( + StructField("a", ByteType, true) + )) + val data = Seq( + Row(-127.toByte), + Row(0.toByte), + Row(1.toByte), + Row(38.toByte), + Row(128.toByte) + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + } + val tablename = "bytetable" + df.write .format("jdbc") .mode("overwrite") .option("url", jdbcUrl) - .option("dbtable", "testTable") + .option("dbtable", tablename) .save() - val df2 = spark.read + val df_copy = spark.read .format("jdbc") .option("url", jdbcUrl) - .option("dbtable", "byteTable") + .option("dbtable", tablename) .load() - df2.show() + assert(df.count == df_copy.count) + val rows = df_copy.collect() + val colType = rows(0).toSeq.map(x => x.getClass.toString) + assert(colType(0) == "class java.lang.Byte") } } From ec5315d9ec600547cadc30861636159f25e33afc Mon Sep 17 00:00:00 2001 From: shivsood Date: Tue, 29 Oct 2019 17:31:42 -0700 Subject: [PATCH 4/4] test fix to use testimplicits for simplicity --- .../jdbc/MsSqlServerIntegrationSuite.scala | 25 +++++-------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index e355403a1e69..604d2ce55d62 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -21,8 +21,6 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties -import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @DockerTest @@ -205,20 +203,9 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) } - test("SPARK-28151 Test write table with BYTETYPE") { - val df : DataFrame = { - val schema = StructType(Seq( - StructField("a", ByteType, true) - )) - val data = Seq( - Row(-127.toByte), - Row(0.toByte), - Row(1.toByte), - Row(38.toByte), - Row(128.toByte) - ) - spark.createDataFrame(spark.sparkContext.parallelize(data), schema) - } + test("Write tables with BYTETYPE") { + import testImplicits._ + val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a") val tablename = "bytetable" df.write .format("jdbc") @@ -226,13 +213,13 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { .option("url", jdbcUrl) .option("dbtable", tablename) .save() - val df_copy = spark.read + val df2 = spark.read .format("jdbc") .option("url", jdbcUrl) .option("dbtable", tablename) .load() - assert(df.count == df_copy.count) - val rows = df_copy.collect() + assert(df.count == df2.count) + val rows = df2.collect() val colType = rows(0).toSeq.map(x => x.getClass.toString) assert(colType(0) == "class java.lang.Byte") }