Skip to content

Commit b7fcc35

Browse files
Documenting conversions, bugfix, wrappers of Rows
1 parent ee70125 commit b7fcc35

File tree

3 files changed

+152
-38
lines changed

3 files changed

+152
-38
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class CatalystGroupConverter(
115115
protected[parquet] val index: Int,
116116
protected[parquet] val parent: CatalystConverter,
117117
protected[parquet] var current: ArrayBuffer[Any],
118-
protected[parquet] var buffer: ArrayBuffer[ArrayBuffer[Any]])
118+
protected[parquet] var buffer: ArrayBuffer[Row])
119119
extends GroupConverter with CatalystConverter {
120120

121121
def this(schema: Seq[FieldType], index: Int, parent: CatalystConverter) =
@@ -124,7 +124,7 @@ class CatalystGroupConverter(
124124
index,
125125
parent,
126126
current=null,
127-
buffer=new ArrayBuffer[ArrayBuffer[Any]](
127+
buffer=new ArrayBuffer[Row](
128128
CatalystArrayConverter.INITIAL_ARRAY_SIZE))
129129

130130
// This constructor is used for the root converter only
@@ -141,6 +141,7 @@ class CatalystGroupConverter(
141141
// Should be only called in root group converter!
142142
def getCurrentRecord: Row = {
143143
assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
144+
// TODO: use iterators if possible
144145
new GenericRow {
145146
override val values: Array[Any] = current.toArray
146147
}
@@ -155,7 +156,7 @@ class CatalystGroupConverter(
155156

156157
override protected[parquet] def clearBuffer(): Unit = {
157158
// TODO: reuse buffer?
158-
buffer = new ArrayBuffer[ArrayBuffer[Any]](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
159+
buffer = new ArrayBuffer[Row](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
159160
}
160161

161162
override def start(): Unit = {
@@ -173,8 +174,13 @@ class CatalystGroupConverter(
173174
override def end(): Unit = {
174175
if (!isRootConverter) {
175176
assert(current!=null) // there should be no empty groups
176-
buffer.append(current)
177-
parent.updateField(index, buffer)
177+
buffer.append(new GenericRow {
178+
override val values: Array[Any] = current.toArray
179+
})
180+
// TODO: use iterators if possible, avoid Row wrapping
181+
parent.updateField(index, new GenericRow {
182+
override val values: Array[Any] = buffer.toArray
183+
})
178184
}
179185
}
180186
}
@@ -276,7 +282,10 @@ class CatalystArrayConverter(
276282
// TODO: think about reusing the buffer
277283
override def end(): Unit = {
278284
assert(parent != null)
279-
parent.updateField(index, buffer)
285+
// TODO: use iterators if possible, avoid Row wrapping
286+
parent.updateField(index, new GenericRow {
287+
override val values: Array[Any] = buffer.toArray
288+
})
280289
clearBuffer()
281290
}
282291
}
@@ -294,7 +303,10 @@ class CatalystStructConverter(
294303
// TODO: think about reusing the buffer
295304
override def end(): Unit = {
296305
assert(!isRootConverter)
297-
parent.updateField(index, current)
306+
// TODO: use iterators if possible, avoid Row wrapping!
307+
parent.updateField(index, new GenericRow {
308+
override val values: Array[Any] = current.toArray
309+
})
298310
}
299311
}
300312

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,31 @@ private[parquet] object ParquetTypesConverter {
193193
s"Unsupported parquet datatype $parquetType")
194194
}
195195

196+
/**
197+
* Converts a given Parquet `Type` into the corresponding
198+
* [[org.apache.spark.sql.catalyst.types.DataType]].
199+
*
200+
* Note that we apply the following conversion rules:
201+
* <ul>
202+
* <li> Primitive types are converter to the corresponding primitive type.</li>
203+
* <li> Group types that have a single field with repetition `REPEATED` or themselves
204+
* have repetition level `REPEATED` are converted to an [[ArrayType]] with the
205+
* corresponding field type (possibly primitive) as element type.</li>
206+
* <li> Other group types are converted as follows:<ul>
207+
* <li> If they have a single field, they are converted into a [[StructType]] with
208+
* the corresponding field type.</li>
209+
* <li> If they have more than one field and repetition level `REPEATED` they are
210+
* converted into an [[ArrayType]] with the corresponding [[StructType]] as complex
211+
* element type.</li>
212+
* <li> Otherwise they are converted into a [[StructType]] with the corresponding
213+
* field types.</li></ul></li>
214+
* </ul>
215+
* Note that fields are determined to be `nullable` if and only if their Parquet repetition
216+
* level is not `REQUIRED`.
217+
*
218+
* @param parquetType The type to convert.
219+
* @return The corresponding Catalyst type.
220+
*/
196221
def toDataType(parquetType: ParquetType): DataType = {
197222
if (parquetType.isPrimitive) {
198223
toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
@@ -215,7 +240,9 @@ private[parquet] object ParquetTypesConverter {
215240
case _ => { // everything else nested becomes a Struct, unless it has a single repeated field
216241
// in which case it becomes an array (this should correspond to the inverse operation of
217242
// parquet.schema.ConversionPatterns.listType)
218-
if (groupType.getFieldCount == 1 && groupType.getFields.apply(0).getRepetition == Repetition.REPEATED) {
243+
if (groupType.getFieldCount == 1 &&
244+
(groupType.getFields.apply(0).getRepetition == Repetition.REPEATED ||
245+
groupType.getRepetition == Repetition.REPEATED)) {
219246
val elementType = toDataType(groupType.getFields.apply(0))
220247
new ArrayType(elementType)
221248
} else {
@@ -225,9 +252,10 @@ private[parquet] object ParquetTypesConverter {
225252
ptype.getName,
226253
toDataType(ptype),
227254
ptype.getRepetition != Repetition.REQUIRED))
228-
if (groupType.getFieldCount == 1) { // single field, either optional or required
255+
256+
if (groupType.getFieldCount == 1) {
229257
new StructType(fields)
230-
} else { // multi field repeated group, which we map into an array of structs
258+
} else {
231259
if (parquetType.getRepetition == Repetition.REPEATED) {
232260
new ArrayType(StructType(fields))
233261
} else {
@@ -240,6 +268,14 @@ private[parquet] object ParquetTypesConverter {
240268
}
241269
}
242270

271+
/**
272+
* For a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] return
273+
* the name of the corresponding Parquet primitive type or None if the given type
274+
* is not primitive.
275+
*
276+
* @param ctype The type to convert
277+
* @return The name of the corresponding Parquet primitive type
278+
*/
243279
def fromPrimitiveDataType(ctype: DataType): Option[ParquetPrimitiveTypeName] = ctype match {
244280
case StringType => Some(ParquetPrimitiveTypeName.BINARY)
245281
case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN)
@@ -251,6 +287,41 @@ private[parquet] object ParquetTypesConverter {
251287
case _ => None
252288
}
253289

290+
/**
291+
* Converts a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] into
292+
* the corrponsing Parquet `Type`.
293+
*
294+
* The conversion follows the rules below:
295+
* <ul>
296+
* <li> Primitive types are converted into Parquet's primitive types.</li>
297+
* <li> [[org.apache.spark.sql.catalyst.types.StructType]]s are converted
298+
* into Parquet's `GroupType` with the corresponding field types.</li>
299+
* <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are handled as follows:<ul>
300+
* <li> If their element is complex, that is of type
301+
* [[org.apache.spark.sql.catalyst.types.StructType]], they are converted
302+
* into a `GroupType` with the corresponding field types of the struct and
303+
* original type of the `GroupType` is set to `LIST`.</li>
304+
* <li> Otherwise, that is they contain a primitive they are converted into a `GroupType`
305+
* that is also a list but has only a single field of the type corresponding to
306+
* the element type.</li></ul></li>
307+
* </ul>
308+
* Parquet's repetition level is set according to the following rule:
309+
* <ul>
310+
* <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType`, then
311+
* the repetition level is set to `REPEATED`.</li>
312+
* <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
313+
* type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
314+
* </ul>
315+
* The single expection to this rule is an [[org.apache.spark.sql.catalyst.types.ArrayType]]
316+
* that contains a [[org.apache.spark.sql.catalyst.types.StructType]], whose repetition level
317+
* is always set to `REPEATED`.
318+
*
319+
@param ctype The type to convert.
320+
* @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]] whose type is converted
321+
* @param nullable When true indicates that the attribute is nullable
322+
* @param inArray When true indicates that this is a nested attribute inside an array.
323+
* @return The corresponding Parquet type.
324+
*/
254325
def fromDataType(
255326
ctype: DataType,
256327
name: String,
@@ -271,8 +342,9 @@ private[parquet] object ParquetTypesConverter {
271342
elementType match {
272343
case StructType(fields) => { // first case: array of structs
273344
val parquetFieldTypes = fields.map(
274-
f => fromDataType(f.dataType, f.name, f.nullable, false))
275-
new ParquetGroupType(repetition, name, ParquetOriginalType.LIST, parquetFieldTypes)
345+
f => fromDataType(f.dataType, f.name, f.nullable, inArray = false))
346+
assert(fields.size > 1, "Found struct inside array with a single field.. error parsin Catalyst schema")
347+
new ParquetGroupType(Repetition.REPEATED, name, ParquetOriginalType.LIST, parquetFieldTypes)
276348
//ConversionPatterns.listType(Repetition.REPEATED, name, parquetFieldTypes)
277349
}
278350
case _ => { // second case: array of primitive types
@@ -288,7 +360,7 @@ private[parquet] object ParquetTypesConverter {
288360
// TODO: test structs inside arrays
289361
case StructType(structFields) => {
290362
val fields = structFields.map {
291-
field => fromDataType(field.dataType, field.name, field.nullable)
363+
field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
292364
}
293365
new ParquetGroupType(repetition, name, fields)
294366
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ case class OptionalReflectData(
5454
doubleField: Option[Double],
5555
booleanField: Option[Boolean])
5656

57+
case class Nested(i: Int, s: String)
58+
59+
case class Data(array: Seq[Int], nested: Nested)
60+
5761
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
5862
import TestData._
5963
TestData // Load test data tables.
@@ -366,6 +370,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
366370
}
367371

368372
test("Importing nested Parquet file (Addressbook)") {
373+
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
369374
ParquetTestData.readNestedFile(
370375
ParquetTestData.testNestedFile1,
371376
ParquetTestData.testNestedSchema1)
@@ -374,22 +379,23 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
374379
assert(result.size === 2)
375380
val first_record = result(0)
376381
val second_record = result(1)
377-
val first_owner_numbers = result(0).apply(1).asInstanceOf[ArrayBuffer[Any]]
378-
val first_contacts = result(0).apply(2).asInstanceOf[ArrayBuffer[ArrayBuffer[Any]]]
382+
val first_owner_numbers = result(0)(1)
383+
val first_contacts = result(0)(2)
379384
assert(first_record.size === 3)
380-
assert(second_record.apply(1) === null)
381-
assert(second_record.apply(2) === null)
382-
assert(second_record.apply(0) === "A. Nonymous")
383-
assert(first_record.apply(0) === "Julien Le Dem")
384-
assert(first_owner_numbers.apply(0) === "555 123 4567")
385-
assert(first_owner_numbers.apply(2) === "XXX XXX XXXX")
386-
assert(first_contacts.apply(0).size === 2)
387-
assert(first_contacts.apply(0).apply(0) === "Dmitriy Ryaboy")
388-
assert(first_contacts.apply(0).apply(1) === "555 987 6543")
389-
assert(first_contacts.apply(1).apply(0) === "Chris Aniszczyk")
385+
assert(second_record(1) === null)
386+
assert(second_record(2) === null)
387+
assert(second_record(0) === "A. Nonymous")
388+
assert(first_record(0) === "Julien Le Dem")
389+
assert(first_owner_numbers(0) === "555 123 4567")
390+
assert(first_owner_numbers(2) === "XXX XXX XXXX")
391+
assert(first_contacts(0).size === 2)
392+
assert(first_contacts(0)(0) === "Dmitriy Ryaboy")
393+
assert(first_contacts(0)(1) === "555 987 6543")
394+
assert(first_contacts(1)(0) === "Chris Aniszczyk")
390395
}
391396

392397
test("Importing nested Parquet file (nested numbers)") {
398+
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
393399
ParquetTestData.readNestedFile(
394400
ParquetTestData.testNestedFile2,
395401
ParquetTestData.testNestedSchema2)
@@ -398,19 +404,43 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
398404
assert(result(0).size === 5, "number of fields in row incorrect")
399405
assert(result(0)(0) === 1)
400406
assert(result(0)(1) === 7)
401-
assert(result(0)(2).asInstanceOf[ArrayBuffer[Any]].size === 3)
402-
assert(result(0)(2).asInstanceOf[ArrayBuffer[Any]].apply(0) === (1.toLong << 32))
403-
assert(result(0)(2).asInstanceOf[ArrayBuffer[Any]].apply(1) === (1.toLong << 33))
404-
assert(result(0)(2).asInstanceOf[ArrayBuffer[Any]].apply(2) === (1.toLong << 34))
405-
assert(result(0)(3).asInstanceOf[ArrayBuffer[Any]].size === 2)
406-
assert(result(0)(3).asInstanceOf[ArrayBuffer[Any]].apply(0) === 2.5)
407-
assert(result(0)(3).asInstanceOf[ArrayBuffer[Any]].apply(1) === false)
408-
assert(result(0)(4).asInstanceOf[ArrayBuffer[Any]].size === 2)
409-
assert(result(0)(4).asInstanceOf[ArrayBuffer[Any]].apply(0).asInstanceOf[ArrayBuffer[Any]].size === 2)
410-
assert(result(0)(4).asInstanceOf[ArrayBuffer[Any]].apply(1).asInstanceOf[ArrayBuffer[Any]].size === 1)
411-
assert(result(0)(4).asInstanceOf[ArrayBuffer[Any]].apply(0).asInstanceOf[ArrayBuffer[ArrayBuffer[Any]]].apply(0).apply(0) === 7)
412-
assert(result(0)(4).asInstanceOf[ArrayBuffer[Any]].apply(0).asInstanceOf[ArrayBuffer[ArrayBuffer[Any]]].apply(1).apply(0) === 8)
413-
assert(result(0)(4).asInstanceOf[ArrayBuffer[Any]].apply(1).asInstanceOf[ArrayBuffer[ArrayBuffer[Any]]].apply(0).apply(0) === 9)
407+
assert(result(0)(2).size === 3)
408+
assert(result(0)(2)(0) === (1.toLong << 32))
409+
assert(result(0)(2)(1) === (1.toLong << 33))
410+
assert(result(0)(2)(2) === (1.toLong << 34))
411+
assert(result(0)(3).size === 2)
412+
assert(result(0)(3)(0) === 2.5)
413+
assert(result(0)(3)(1) === false)
414+
assert(result(0)(4).size === 2)
415+
assert(result(0)(4)(0).size === 2)
416+
assert(result(0)(4)(1).size === 1)
417+
assert(result(0)(4)(0)(0)(0) === 7)
418+
assert(result(0)(4)(0)(1)(0) === 8)
419+
assert(result(0)(4)(1)(0)(0) === 9)
420+
}
421+
422+
test("Simple query on addressbook") {
423+
val data = TestSQLContext.parquetFile(ParquetTestData.testNestedFile1.toString).toSchemaRDD
424+
val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 'contacts as 'c).collect()
425+
assert(tmp.size === 1)
426+
assert(tmp(0)(0) === "Julien Le Dem")
427+
}
428+
429+
test("Simple query on nested int data") {
430+
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
431+
val data = TestSQLContext.parquetFile(ParquetTestData.testNestedFile2.toString).toSchemaRDD
432+
data.registerAsTable("data")
433+
val tmp = sql("SELECT booleanNumberPairs.value, booleanNumberPairs.truth FROM data").collect()
434+
assert(tmp(0)(0) === 2.5)
435+
assert(tmp(0)(1) === false)
436+
val result = sql("SELECT outerouter FROM data").collect()
437+
// TODO: why does this not work?
438+
//val result = sql("SELECT outerouter.values FROM data").collect()
439+
// TODO: .. or this:
440+
// val result = sql("SELECT outerouter[0] FROM data").collect()
441+
assert(result(0)(0)(0)(0)(0) === 7)
442+
assert(result(0)(0)(0)(1)(0) === 8)
443+
assert(result(0)(0)(1)(0)(0) === 9)
414444
}
415445

416446
/**

0 commit comments

Comments
 (0)