Skip to content

Commit f8f8911

Browse files
For primitive rows fall back to more efficient converter, code reorg
1 parent 6dbc9b7 commit f8f8911

File tree

4 files changed

+464
-353
lines changed

4 files changed

+464
-353
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.sql.parquet
2020
import scala.collection.mutable.{Buffer, ArrayBuffer}
2121

2222
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
23+
import parquet.schema.MessageType
2324

25+
import org.apache.spark.Logging
2426
import org.apache.spark.sql.catalyst.types._
2527
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute}
2628
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
@@ -65,9 +67,19 @@ private[parquet] object CatalystConverter {
6567
s"unable to convert datatype ${field.dataType.toString} in CatalystConverter")
6668
}
6769
}
70+
71+
protected[parquet] def createRootConverter(parquetSchema: MessageType): CatalystConverter = {
72+
val attributes = ParquetTypesConverter.convertToAttributes(parquetSchema)
73+
// For non-nested types we use the optimized Row converter
74+
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
75+
new MutableRowGroupConverter(attributes)
76+
} else {
77+
new CatalystGroupConverter(attributes)
78+
}
79+
}
6880
}
6981

70-
trait CatalystConverter {
82+
private[parquet] trait CatalystConverter {
7183
// the number of fields this group has
7284
protected[parquet] val size: Int
7385

@@ -80,7 +92,6 @@ trait CatalystConverter {
8092
// for child converters to update upstream values
8193
protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit
8294

83-
// TODO: in the future consider using specific methods to avoid autoboxing
8495
protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
8596
updateField(fieldIndex, value)
8697

@@ -105,6 +116,9 @@ trait CatalystConverter {
105116
protected[parquet] def isRootConverter: Boolean = parent == null
106117

107118
protected[parquet] def clearBuffer(): Unit
119+
120+
// Should be only called in root group converter!
121+
def getCurrentRecord: Row
108122
}
109123

110124
/**
@@ -113,7 +127,7 @@ trait CatalystConverter {
113127
*
114128
* @param schema The corresponding Catalyst schema in the form of a list of attributes.
115129
*/
116-
class CatalystGroupConverter(
130+
private[parquet] class CatalystGroupConverter(
117131
protected[parquet] val schema: Seq[FieldType],
118132
protected[parquet] val index: Int,
119133
protected[parquet] val parent: CatalystConverter,
@@ -182,13 +196,86 @@ class CatalystGroupConverter(
182196
}
183197
}
184198

199+
/**
200+
* A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
201+
* to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his
202+
* converter is optimized for rows of primitive types (non-nested records).
203+
*/
204+
private[parquet] class MutableRowGroupConverter(
205+
protected[parquet] val schema: Seq[FieldType],
206+
protected[parquet] var current: ParquetRelation.RowType)
207+
extends GroupConverter with CatalystConverter {
208+
209+
// This constructor is used for the root converter only
210+
def this(attributes: Seq[Attribute]) =
211+
this(
212+
attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
213+
new ParquetRelation.RowType(attributes.length))
214+
215+
protected [parquet] val converters: Array[Converter] =
216+
schema.map(field =>
217+
CatalystConverter.createConverter(field, schema.indexOf(field), this))
218+
.toArray
219+
220+
override val size = schema.size
221+
222+
override val index = 0
223+
224+
override val parent = null
225+
226+
// Should be only called in root group converter!
227+
def getCurrentRecord: ParquetRelation.RowType = current
228+
229+
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
230+
231+
// for child converters to update upstream values
232+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
233+
throw new UnsupportedOperationException // child converters should use the
234+
// specific update methods below
235+
}
236+
237+
override protected[parquet] def clearBuffer(): Unit = {}
238+
239+
override def start(): Unit = {
240+
var i = 0
241+
while (i < schema.length) {
242+
current.setNullAt(i)
243+
i = i + 1
244+
}
245+
}
246+
247+
override def end(): Unit = {}
248+
249+
// Overriden here to avoid auto-boxing for primitive types
250+
override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
251+
current.setBoolean(fieldIndex, value)
252+
253+
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
254+
current.setInt(fieldIndex, value)
255+
256+
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
257+
current.setLong(fieldIndex, value)
258+
259+
override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
260+
current.setDouble(fieldIndex, value)
261+
262+
override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
263+
current.setFloat(fieldIndex, value)
264+
265+
override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
266+
current.update(fieldIndex, value.getBytes)
267+
268+
override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
269+
current.setString(fieldIndex, value.toStringUsingUTF8)
270+
}
271+
185272
/**
186273
* A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types.
187274
*
188275
* @param parent The parent group converter.
189276
* @param fieldIndex The index inside the record.
190277
*/
191-
class CatalystPrimitiveConverter(
278+
private[parquet] class CatalystPrimitiveConverter(
192279
parent: CatalystConverter,
193280
fieldIndex: Int) extends PrimitiveConverter {
194281
// TODO: consider refactoring these together with ParquetTypesConverter
@@ -218,7 +305,7 @@ class CatalystPrimitiveConverter(
218305
* @param parent The parent group converter.
219306
* @param fieldIndex The index inside the record.
220307
*/
221-
class CatalystPrimitiveStringConverter(
308+
private[parquet] class CatalystPrimitiveStringConverter(
222309
parent: CatalystConverter,
223310
fieldIndex: Int)
224311
extends CatalystPrimitiveConverter(parent, fieldIndex) {
@@ -232,7 +319,7 @@ object CatalystArrayConverter {
232319

233320
// this is for single-element groups of primitive or complex types
234321
// Note: AvroParquet only uses arrays for primitive types (?)
235-
class CatalystArrayConverter(
322+
private[parquet] class CatalystArrayConverter(
236323
val elementType: DataType,
237324
val index: Int,
238325
protected[parquet] val parent: CatalystConverter,
@@ -283,11 +370,14 @@ class CatalystArrayConverter(
283370
parent.updateField(index, new GenericRow(buffer.toArray))
284371
clearBuffer()
285372
}
373+
374+
// Should be only called in root group converter!
375+
override def getCurrentRecord: Row = throw new UnsupportedOperationException
286376
}
287377

288378
// this is for multi-element groups of primitive or complex types
289379
// that have repetition level optional or required (so struct fields)
290-
class CatalystStructConverter(
380+
private[parquet] class CatalystStructConverter(
291381
override protected[parquet] val schema: Seq[FieldType],
292382
override protected[parquet] val index: Int,
293383
override protected[parquet] val parent: CatalystConverter)
@@ -301,6 +391,9 @@ class CatalystStructConverter(
301391
// TODO: use iterators if possible, avoid Row wrapping!
302392
parent.updateField(index, new GenericRow(current.toArray))
303393
}
394+
395+
// Should be only called in root group converter!
396+
override def getCurrentRecord: Row = throw new UnsupportedOperationException
304397
}
305398

306399
// TODO: add MapConverter

0 commit comments

Comments
 (0)