Skip to content

Commit e08d06c

Browse files
committed
Address comments
1 parent 43f809f commit e08d06c

1 file changed

Lines changed: 10 additions & 15 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -395,19 +395,21 @@ class ParquetFileFormat
395395
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
396396
}
397397
val taskContext = Option(TaskContext.get())
398-
val iter = if (enableVectorizedReader) {
398+
if (enableVectorizedReader) {
399399
val vectorizedReader = new VectorizedParquetRecordReader(
400400
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
401-
val recordReaderIterator = new RecordReaderIterator(vectorizedReader)
402-
// Register a task completion lister before `initalization`.
403-
taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close()))
401+
val iter = new RecordReaderIterator(vectorizedReader)
402+
// Register a task completion lister before `initialization`.
403+
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
404404
vectorizedReader.initialize(split, hadoopAttemptContext)
405405
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
406406
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
407407
if (returningBatch) {
408408
vectorizedReader.enableReturningBatches()
409409
}
410-
recordReaderIterator
410+
411+
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
412+
iter.asInstanceOf[Iterator[InternalRow]]
411413
} else {
412414
logDebug(s"Falling back to parquet-mr")
413415
// ParquetRecordReader returns UnsafeRow
@@ -417,18 +419,11 @@ class ParquetFileFormat
417419
} else {
418420
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
419421
}
420-
val recordReaderIterator = new RecordReaderIterator(reader)
421-
// Register a task completion lister before `initalization`.
422-
taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close()))
422+
val iter = new RecordReaderIterator(reader)
423+
// Register a task completion lister before `initialization`.
424+
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
423425
reader.initialize(split, hadoopAttemptContext)
424-
recordReaderIterator
425-
}
426426

427-
428-
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
429-
if (enableVectorizedReader) {
430-
iter.asInstanceOf[Iterator[InternalRow]]
431-
} else {
432427
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
433428
val joinedRow = new JoinedRow()
434429
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

0 commit comments

Comments
 (0)