diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala index 1704be7fcbd30..98137ba96c79d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala @@ -73,6 +73,22 @@ private[sql] class PostgresQuirks extends DriverQuirks { StringType } else if (sqlType == Types.OTHER && typeName.equals("inet")) { StringType + } else if (sqlType == Types.OTHER && typeName.equals("uuid")) { + StringType + } else if (sqlType == Types.OTHER && typeName.equals("hstore")) { + MapType(keyType = StringType, valueType = StringType) + } else if (sqlType == Types.ARRAY) { + ArrayType(elementType = typeName match { + case "_varchar" => StringType + case "_text" => StringType + case "_name" => StringType + case "_int2" => IntegerType + case "_int4" => IntegerType + case "_int8" => LongType + case "_float4" => FloatType + case "_float8" => DoubleType + case _ => StringType + }) } else null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 87304ce2496b4..c7c90f16897f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow} import org.apache.spark.sql.types._ import org.apache.spark.sql.sources._ +import scala.collection.JavaConverters._ + private[sql] object JDBCRDD extends Logging { /** * Maps a JDBC type to a Catalyst type. This function is called only when @@ -281,6 +283,8 @@ private[sql] class JDBCRDD( case object StringConversion extends JDBCConversion case object TimestampConversion extends JDBCConversion case object BinaryConversion extends JDBCConversion + case object ArrayConversion extends JDBCConversion + case object MapConversion extends JDBCConversion /** * Maps a StructType to a type tag list. @@ -298,6 +302,8 @@ private[sql] class JDBCRDD( case StringType => StringConversion case TimestampType => TimestampConversion case BinaryType => BinaryConversion + case ArrayType(_,_) => ArrayConversion + case MapType(_,_,_) => MapConversion case _ => throw new IllegalArgumentException(s"Unsupported field $sf") }).toArray } @@ -356,6 +362,24 @@ private[sql] class JDBCRDD( } mutableRow.setLong(i, ans) } + case ArrayConversion => { + val sqlArray = rs.getArray(pos) + val array = if (sqlArray == null) { + null + } else { + sqlArray.getArray.asInstanceOf[Array[_]].toSeq + } + mutableRow.update(i, array) + } + case MapConversion => { + val sqlMap = rs.getObject(pos) + val map = if (sqlMap == null) { + null + } else { + sqlMap.asInstanceOf[java.util.Map[String,String]].asScala + } + mutableRow.update(i, map) + } } if (rs.wasNull) mutableRow.setNullAt(i) i = i + 1