Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c7abad3
Added stringDataType option to jdbc connection properties
rama-mullapudi Aug 22, 2015
f1d0b9e
Added maxlength for field metadata so string types can use for VARCHAR.
rama-mullapudi Aug 27, 2015
faff507
Added maxlength for field metadata so string types can use for VARCHAR.
rama-mullapudi Aug 27, 2015
c4b4477
Added maxlength for field metadata so string types can use for VARCHAR.
rama-mullapudi Aug 27, 2015
35e61f3
Added method override for getJDBCType to take 2 parameters DataType a…
rama-mullapudi Sep 14, 2015
dd22b2f
Sync to master
rama-mullapudi Sep 14, 2015
cd809c5
Sync Master
rama-mullapudi Sep 26, 2015
0cfeefa
Sync Master
rama-mullapudi Sep 26, 2015
dddc137
Updated a call with added parameter
rama-mullapudi Sep 26, 2015
5f532e8
Fixed spark code style where comment exceeded 100 chars, and changed …
rama-mullapudi Sep 28, 2015
03f4d96
Merge remote-tracking branch 'upstream/master'
rama-mullapudi Sep 28, 2015
27f118b
Fixed sync with master issue
rama-mullapudi Sep 28, 2015
44e1978
Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType
rama-mullapudi Sep 28, 2015
e605a11
Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType
rama-mullapudi Sep 28, 2015
4cae11b
Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType
rama-mullapudi Sep 29, 2015
4c2a7a4
Updated JDBCDialects to save VARCHAR size to metadata
rama-mullapudi Sep 29, 2015
a0cb024
Updated JDBCDialects to save VARCHAR size to metadata
rama-mullapudi Sep 29, 2015
d50bdf7
Changed call for getJDBCType to call both methods with datatype and m…
rama-mullapudi Oct 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,15 @@ object JdbcUtils extends Logging {
/**
* Compute the schema string for this RDD.
*/
def schemaString(df: DataFrame, url: String): String = {
def schemaString(
df: DataFrame,
url: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
val name = field.name
val typ: String =
dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
dialect.getJDBCType(field.dataType, field.metadata).map(_.databaseTypeDefinition).getOrElse(
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
Expand Down Expand Up @@ -199,7 +201,7 @@ object JdbcUtils extends Logging {
properties: Properties = new Properties()) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
dialect.getJDBCType(field.dataType, field.metadata).map(_.jdbcNullType).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ abstract class JdbcDialect {
/**
* Retrieve the jdbc / sql type for a given datatype.
* @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]])
* @param md The metadata
* @return The new JdbcType if there is an override for this DataType
*/
def getJDBCType(dt: DataType): Option[JdbcType] = None
def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is a DeveloperAPI, it is public so it would be good to fix without breaking binary compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling getJDBCType(dt,Metadata.empty) instead of None in getJDBCType(dt: DataType), because the classes that extends JdbcDialect possibly implement one of them and then the behaviours of the two functions totally different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry did not get the change you are suggesting, do you mean to call getJDBCType(dt,Metadata.empty) from getJDBCType(dt: DataType).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Quotes the identifier. This is used to put quotes around the identifier in case the column
Expand Down Expand Up @@ -125,6 +126,9 @@ object JdbcDialects {

registerDialect(MySQLDialect)
registerDialect(PostgresDialect)
registerDialect(OracleDialect)
registerDialect(DB2Dialect)
registerDialect(NetezzaDialect)

/**
* Fetch the JdbcDialect class corresponding to a given database url.
Expand Down Expand Up @@ -159,8 +163,8 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
}

override def getJDBCType(dt: DataType): Option[JdbcType] = {
dialects.flatMap(_.getJDBCType(dt)).headOption
override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
dialects.flatMap(_.getJDBCType(dt, md)).headOption
}
}

Expand Down Expand Up @@ -191,7 +195,7 @@ case object PostgresDialect extends JdbcDialect {
} else None
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
Expand Down Expand Up @@ -222,3 +226,66 @@ case object MySQLDialect extends JdbcDialect {
s"`$colName`"
}
}

/**
* :: DeveloperApi ::
* Default DB2 dialect, mapping string/boolean on write to valid DB2 types.
* By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1).
*/
@DeveloperApi
case object DB2Dialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")

override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
if (dt == StringType && md.contains("maxlength")) {
Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR))
} else if (dt == StringType ) {
Some(JdbcType("CLOB", java.sql.Types.CLOB))
} else if (dt == BooleanType ) {
Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
} else None
}
}

/**
* :: DeveloperApi ::
* Default Oracle dialect, mapping string/boolean on write to valid Oracle types.
*/
@DeveloperApi
case object OracleDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")

override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
if (dt == StringType && md.contains("maxlength")) {
Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR))
} else if (dt == StringType ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove the end single space in parenthesis.

Some(JdbcType("CLOB", java.sql.Types.CLOB))
} else if (dt == BooleanType ) {
Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
} else None
}
}

/**
* :: DeveloperApi ::
* Default Netezza dialect, mapping string/boolean on write to valid Netezza types.
*/
@DeveloperApi
case object NetezzaDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza")

override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
if (dt == StringType && md.contains("maxlength")) {
Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR))
} else if (dt == StringType ) {
Some(JdbcType("VARCHAR(255)", java.sql.Types.CHAR))
} else if (dt == BinaryType ) {
Some(JdbcType("BYTEINT", java.sql.Types.BINARY))
} else if (dt == BooleanType ) {
Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
} else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
test("Default jdbc dialect registration") {
assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect)
assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect)
assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
assert(JdbcDialects.get("jdbc:oracle://127.0.0.1/db") == OracleDialect)
assert(JdbcDialects.get("jdbc:netezza://127.0.0.1/db") == NetezzaDialect)
assert(JdbcDialects.get("test.invalid") == NoopDialect)
}

Expand Down