Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
"""
|INSERT INTO numbers VALUES (
|0,
|255, 32767, 2147483647, 9223372036854775807,
|127, 32767, 2147483647, 9223372036854775807,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto - #26549 (comment). Can anyone explain why it was possible, and how do we handle unsigned cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing from #26549 (comment) : TINYINT looks like it's a single byte alright, so using ByteType is reasonable. However it looks like it's treated as signed by some but not all DBMSes. Is it unsigned in SQL Server?

Just checking: these types like TINYINT and SMALLINT are not standard types, although widely supported, right? should these types be used by default for all JDBC sources?

Yeah I have some more doubts now that the TINYINT issue was pointed out. @shivsood

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. @gatorsmile , @HyukjinKwon , @srowen . I overlooked that mismatch between TINYINT vs Byte in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SMALLINT is widely supported because it's SQL-92. For TINYINT, I agree that we need to revert partially for that type.

Copy link
Contributor Author

@shivsood shivsood Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks all for pointing out these issues. I had overlooked handling of unsigned cases and the fact that each database may define on its own. I think the problem exists for both SMALLINT and TINYINT.

  • TINYINT is very clear that it can be range any where from -127 to +127 and unsigned as 0 to 255. Both SQLServer and MySQL
  • SMALLINT can take an unsigned value 0 to 65535 per MySQL

My understanding is that there are no unsigned type in Spark. c.f. https://spark.apache.org/docs/latest/sql-reference.html. Is that assertion right?

Do we have a test for an integer where integer value is 4294967295? Per MySQL documentation that's possible that an unsigned integer will have that value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior would be as follows.

Overwrite scenario

  1. (ShortType -> Int) If a spark df has a ShortType, on overwrite a DBMSS table with type Int will get created. Because Spark ShortyType is -32768 to +32767 only these values can be written.
  2. (ByteType -> SmallInt) In a spark df has a ByteType, on overwrite DBMSS table with type Short will get created. Because Spark ShortyType is -128 to +127 only these values can be written.

Read scenario

  1. If an existing table in DBMSS has type TinyInt, a read in Spark would results in a ShortType. Because ShortType range in Spark in -32786 to +32768, DBMSS signed value -127 to +127 and unsigned range of 0 to 255 will be handled.

  2. If an existing table in DBMSS has type SmallInt, a read would result in Spark dataframe having a column type as Integer. Because Integer range in Spark in -2147483648 to +2147483648, DBMSS signed value --32768 to +32768 as well as unsigned range of 0 to 65535 will be handled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have to use a widening conversion both ways. That's the safest thing to do, I guess, and less of a change from the current behavior, where bytes go all the way to ints.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I see. right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, thanks for the explanation, @shivsood

Copy link
Contributor Author

@shivsood shivsood Jan 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised PR #27172 with the proposed fix. ShortType is unchanged, only ByteType fix is modified to map to ShortType on the read path so enable support for 0 to 255 range.

@maropu @srowen @HyukjinKwon as FYI. Thanks

|123456789012345.123456789012345, 123456789012345.123456789012345,
|123456789012345.123456789012345,
|123, 12345.12,
Expand Down Expand Up @@ -119,7 +119,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
val types = row.toSeq.map(x => x.getClass.toString)
assert(types.length == 12)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Integer"))
assert(types(1).equals("class java.lang.Byte"))
assert(types(2).equals("class java.lang.Short"))
assert(types(3).equals("class java.lang.Integer"))
assert(types(4).equals("class java.lang.Long"))
Expand All @@ -131,7 +131,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(10).equals("class java.math.BigDecimal"))
assert(types(11).equals("class java.math.BigDecimal"))
assert(row.getBoolean(0) == false)
assert(row.getInt(1) == 255)
assert(row.getByte(1) == 127)
assert(row.getShort(2) == 32767)
assert(row.getInt(3) == 2147483647)
assert(row.getLong(4) == 9223372036854775807L)
Expand Down Expand Up @@ -202,4 +202,46 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}

test("SPARK-29644: Write tables with ShortType") {
import testImplicits._
val df = Seq(-32768.toShort, 0.toShort, 1.toShort, 38.toShort, 32768.toShort).toDF("a")
val tablename = "shorttable"
df.write
.format("jdbc")
.mode("overwrite")
.option("url", jdbcUrl)
.option("dbtable", tablename)
.save()
val df2 = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", tablename)
.load()
assert(df.count == df2.count)
val rows = df2.collect()
val colType = rows(0).toSeq.map(x => x.getClass.toString)
assert(colType(0) == "class java.lang.Short")
}

test("SPARK-29644: Write tables with ByteType") {
import testImplicits._
val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a")
val tablename = "bytetable"
df.write
.format("jdbc")
.mode("overwrite")
.option("url", jdbcUrl)
.option("dbtable", tablename)
.save()
val df2 = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", tablename)
.load()
assert(df.count == df2.count)
val rows = df2.collect()
val colType = rows(0).toSeq.map(x => x.getClass.toString)
assert(colType(0) == "class java.lang.Byte")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types.length == 9)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
assert(types(2).equals("class java.lang.Integer"))
assert(types(2).equals("class java.lang.Short"))
assert(types(3).equals("class java.lang.Integer"))
assert(types(4).equals("class java.lang.Integer"))
assert(types(5).equals("class java.lang.Long"))
Expand All @@ -93,7 +93,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(8).equals("class java.lang.Double"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
assert(rows(0).getInt(2) == 17)
assert(rows(0).getShort(2) == 17)
assert(rows(0).getInt(3) == 77777)
assert(rows(0).getInt(4) == 123456789)
assert(rows(0).getLong(5) == 123456789012345L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ object JdbcUtils extends Logging {
case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
Expand Down Expand Up @@ -235,7 +235,7 @@ object JdbcUtils extends Logging {
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case java.sql.Types.ROWID => LongType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SMALLINT => ShortType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
Expand All @@ -244,7 +244,7 @@ object JdbcUtils extends Logging {
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> null
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.TINYINT => ByteType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ =>
Expand Down Expand Up @@ -546,11 +546,11 @@ object JdbcUtils extends Logging {

case ShortType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getShort(pos))
stmt.setShort(pos + 1, row.getShort(pos))

case ByteType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getByte(pos))
stmt.setByte(pos + 1, row.getByte(pos))

case BooleanType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,8 @@ class JDBCSuite extends QueryTest
assert(rows.length === 1)
assert(rows(0).getInt(0) === 1)
assert(rows(0).getBoolean(1) === false)
assert(rows(0).getInt(2) === 3)
assert(rows(0).getInt(3) === 4)
assert(rows(0).getByte(2) === 3.toByte)
assert(rows(0).getShort(3) === 4.toShort)
assert(rows(0).getLong(4) === 1234567890123L)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,48 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {
}
}

test("SPARK-29644: Write tables with ShortType") {
import testImplicits._
val df = Seq(-32768.toShort, 0.toShort, 1.toShort, 38.toShort, 32768.toShort).toDF("a")
val tablename = "shorttable"
df.write
.format("jdbc")
.mode("overwrite")
.option("url", url)
.option("dbtable", tablename)
.save()
val df2 = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", tablename)
.load()
assert(df.count == df2.count)
val rows = df2.collect()
val colType = rows(0).toSeq.map(x => x.getClass.toString)
assert(colType(0) == "class java.lang.Short")
}

test("SPARK-29644: Write tables with ByteType") {
import testImplicits._
val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a")
val tablename = "bytetable"
df.write
.format("jdbc")
.mode("overwrite")
.option("url", url)
.option("dbtable", tablename)
.save()
val df2 = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", tablename)
.load()
assert(df.count == df2.count)
val rows = df2.collect()
val colType = rows(0).toSeq.map(x => x.getClass.toString)
assert(colType(0) == "class java.lang.Byte")
}

private def runAndVerifyRecordsWritten(expected: Long)(job: => Unit): Unit = {
assert(expected === runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten))
}
Expand Down