Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,19 @@ class ParquetFileFormat
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
val parquetReader = if (enableVectorizedReader) {
val iter = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val recordReaderIterator = new RecordReaderIterator(vectorizedReader)
// Register a task completion lister before `initalization`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could new VectorizedParquetRecordReader or new RecordReaderIterator fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those constructors didn't look heavy to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
vectorizedReader
recordReaderIterator
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
Expand All @@ -414,16 +417,16 @@ class ParquetFileFormat
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
}
val recordReaderIterator = new RecordReaderIterator(reader)
// Register a task completion lister before `initalization`.
taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close()))
reader.initialize(split, hadoopAttemptContext)
reader
recordReaderIterator
}

val iter = new RecordReaderIterator(parquetReader)
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the reported leakage, this is too late.


// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
enableVectorizedReader) {
if (enableVectorizedReader) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to merge this if-statement into the above if-statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It looks possible. I'll update together after getting more reviews. Thanks, @kiszk .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it seems more reasonable to merge this if-else now.

iter.asInstanceOf[Iterator[InternalRow]]
} else {
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
Expand Down