Skip to content

Commit 4d4892a

Browse files
First commit nested Parquet read converters
1 parent aa688fe commit 4d4892a

File tree

5 files changed

+258
-80
lines changed

5 files changed

+258
-80
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,23 @@ abstract class DataType {
3333

3434
case object NullType extends DataType
3535

36+
trait PrimitiveType
37+
3638
abstract class NativeType extends DataType {
3739
type JvmType
3840
@transient val tag: TypeTag[JvmType]
3941
val ordering: Ordering[JvmType]
4042
}
4143

42-
case object StringType extends NativeType {
44+
case object StringType extends NativeType with PrimitiveType {
4345
type JvmType = String
4446
@transient lazy val tag = typeTag[JvmType]
4547
val ordering = implicitly[Ordering[JvmType]]
4648
}
47-
case object BinaryType extends DataType {
49+
case object BinaryType extends DataType with PrimitiveType {
4850
type JvmType = Array[Byte]
4951
}
50-
case object BooleanType extends NativeType {
52+
case object BooleanType extends NativeType with PrimitiveType {
5153
type JvmType = Boolean
5254
@transient lazy val tag = typeTag[JvmType]
5355
val ordering = implicitly[Ordering[JvmType]]
@@ -63,7 +65,7 @@ case object TimestampType extends NativeType {
6365
}
6466
}
6567

66-
abstract class NumericType extends NativeType {
68+
abstract class NumericType extends NativeType with PrimitiveType {
6769
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
6870
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
6971
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private[sql] object ParquetRelation {
161161
}
162162

163163
if (fs.exists(path) &&
164-
!fs.getFileStatus(path)
164+
!fs.getFileStatus(path)
165165
.getPermission
166166
.getUserAction
167167
.implies(FsAction.READ_WRITE)) {
@@ -173,6 +173,8 @@ private[sql] object ParquetRelation {
173173
}
174174

175175
private[parquet] object ParquetTypesConverter {
176+
def isPrimitiveType(ctype: DataType): Boolean = classOf[PrimitiveType] isAssignableFrom ctype.getClass
177+
176178
def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
177179
// for now map binary to string type
178180
// TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
@@ -192,24 +194,36 @@ private[parquet] object ParquetTypesConverter {
192194
}
193195

194196
def toDataType(parquetType: ParquetType): DataType = {
195-
if (parquetType.isPrimitive) toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
197+
if (parquetType.isPrimitive) {
198+
toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
199+
}
196200
else {
197201
val groupType = parquetType.asGroupType()
198202
parquetType.getOriginalType match {
199-
case ParquetOriginalType.LIST | ParquetOriginalType.ENUM => {
200-
val fields = groupType.getFields.map(toDataType(_))
203+
// if the schema was constructed programmatically there may be hints how to convert
204+
// it inside the metadata via the OriginalType field
205+
case ParquetOriginalType.LIST | ParquetOriginalType.ENUM => { // TODO: check enums!
206+
val fields = groupType.getFields.map(toDataType(_))
201207
new ArrayType(fields.apply(0)) // array fields should have the same type
202208
}
203-
case _ => { // everything else nested becomes a Struct
204-
val fields = groupType
205-
.getFields
206-
.map(ptype => new StructField(
207-
ptype.getName,
208-
toDataType(ptype),
209-
ptype.getRepetition != Repetition.REQUIRED))
210-
new StructType(fields)
209+
case _ => { // everything else nested becomes a Struct, unless it has a single repeated field
210+
// in which case it becomes an array (this should correspond to the inverse operation of
211+
// parquet.schema.ConversionPatterns.listType)
212+
if (groupType.getFieldCount == 1 && groupType.getFields.apply(0).getRepetition == Repetition.REPEATED) {
213+
val elementType = toDataType(groupType.getFields.apply(0))
214+
new ArrayType(elementType)
215+
} else {
216+
val fields = groupType
217+
.getFields
218+
.map(ptype => new StructField(
219+
ptype.getName,
220+
toDataType(ptype),
221+
ptype.getRepetition != Repetition.REQUIRED))
222+
new StructType(fields)
223+
}
211224
}
212225
}
226+
//}
213227
}
214228
}
215229

@@ -224,24 +238,32 @@ private[parquet] object ParquetTypesConverter {
224238
case _ => None
225239
}
226240

227-
def fromComplexDataType(ctype: DataType, name: String, nullable: Boolean = true): ParquetType = {
241+
def fromDataType(ctype: DataType, name: String, nullable: Boolean = true, inArray: Boolean = false): ParquetType = {
228242
val repetition =
229-
if (nullable) Repetition.OPTIONAL
230-
else Repetition.REQUIRED
243+
if (inArray) Repetition.REPEATED
244+
else {
245+
if (nullable) Repetition.OPTIONAL
246+
else Repetition.REQUIRED
247+
}
231248
val primitiveType = fromPrimitiveDataType(ctype)
232249
if (primitiveType.isDefined) {
233250
new ParquetPrimitiveType(repetition, primitiveType.get, name)
234251
} else {
235252
ctype match {
236253
case ArrayType(elementType: DataType) => {
237-
val parquetElementType = fromComplexDataType(elementType, name + "_values", false)
238-
new ParquetGroupType(repetition, name, parquetElementType)
254+
// TODO: "values" is a generic name but without it the Parquet column path would
255+
// be incomplete and values may be silently dropped; better would be to give
256+
// Array elements a name of some sort (and specify whether they are nullable),
257+
// as in StructField
258+
val parquetElementType = fromDataType(elementType, "values", nullable=false, inArray=true)
259+
ConversionPatterns.listType(repetition, name, parquetElementType)
239260
}
261+
// TODO: test structs inside arrays
240262
case StructType(structFields) => {
241263
val fields = structFields.map {
242-
field => fromComplexDataType(field.dataType, field.name, false)
264+
field => fromDataType(field.dataType, field.name, field.nullable)
243265
}
244-
new ParquetGroupType(repetition, name, fields)
266+
new ParquetGroupType(Repetition.REPEATED, name, fields)
245267
}
246268
case _ => sys.error(s"Unsupported datatype $ctype")
247269
}
@@ -275,7 +297,7 @@ private[parquet] object ParquetTypesConverter {
275297
}
276298

277299
def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
278-
val fields = attributes.map(attribute => fromComplexDataType(attribute.dataType, attribute.name, attribute.nullable))
300+
val fields = attributes.map(attribute => fromDataType(attribute.dataType, attribute.name, attribute.nullable))
279301
new MessageType("root", fields)
280302
}
281303

0 commit comments

Comments
 (0)