-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support #35262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
7a8b41c
2c73794
52df517
0011bab
50ed815
3dc340a
ca20068
6ad1dbe
6f364d9
80a4ceb
406d176
be62ad6
166afe1
0583e2f
31eee9f
9eaf387
1fc0060
d95100a
6d273f0
1d15022
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| import java.util.Set; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.parquet.VersionParser; | ||
| import org.apache.parquet.VersionParser.ParsedVersion; | ||
| import org.apache.parquet.column.page.PageReadStore; | ||
| import scala.Option; | ||
|
|
||
|
|
@@ -69,6 +71,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo | |
| protected MessageType fileSchema; | ||
| protected MessageType requestedSchema; | ||
| protected StructType sparkSchema; | ||
| protected ParsedVersion writerVersion; | ||
|
|
||
| /** | ||
| * The total number of rows this RecordReader will eventually read. The sum of the | ||
|
|
@@ -93,6 +96,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont | |
| HadoopInputFile.fromPath(file, configuration), options); | ||
| this.reader = new ParquetRowGroupReaderImpl(fileReader); | ||
| this.fileSchema = fileReader.getFileMetaData().getSchema(); | ||
| try { | ||
| this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy()); | ||
| } catch (Exception e) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will other types of exceptions be thrown here, except
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well yes. I encountered at least one case where the version information was empty and the version check threw a NPE. |
||
| // Swallow any exception, if we cannot parse the version we will revert to a sequential read | ||
| // if the column is a delta byte array encoding (due to PARQUET-246). | ||
| } | ||
| Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData(); | ||
| ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration)); | ||
| ReadSupport.ReadContext readContext = readSupport.init(new InitContext( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,20 +21,24 @@ | |
| import java.time.ZoneId; | ||
| import java.util.PrimitiveIterator; | ||
|
|
||
| import org.apache.parquet.CorruptDeltaByteArrays; | ||
| import org.apache.parquet.VersionParser.ParsedVersion; | ||
| import org.apache.parquet.bytes.ByteBufferInputStream; | ||
| import org.apache.parquet.bytes.BytesInput; | ||
| import org.apache.parquet.bytes.BytesUtils; | ||
| import org.apache.parquet.column.ColumnDescriptor; | ||
| import org.apache.parquet.column.Dictionary; | ||
| import org.apache.parquet.column.Encoding; | ||
| import org.apache.parquet.column.page.*; | ||
| import org.apache.parquet.column.values.RequiresPreviousReader; | ||
| import org.apache.parquet.column.values.ValuesReader; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; | ||
| import org.apache.parquet.schema.PrimitiveType; | ||
|
|
||
| import org.apache.spark.memory.MemoryMode; | ||
| import org.apache.spark.sql.execution.vectorized.WritableColumnVector; | ||
| import org.apache.spark.sql.types.Decimal; | ||
|
|
||
|
|
@@ -86,6 +90,8 @@ public class VectorizedColumnReader { | |
| private final ColumnDescriptor descriptor; | ||
| private final LogicalTypeAnnotation logicalTypeAnnotation; | ||
| private final String datetimeRebaseMode; | ||
| private final ParsedVersion writerVersion; | ||
| private final MemoryMode memoryMode; | ||
|
|
||
| public VectorizedColumnReader( | ||
| ColumnDescriptor descriptor, | ||
|
|
@@ -96,7 +102,9 @@ public VectorizedColumnReader( | |
| String datetimeRebaseMode, | ||
| String datetimeRebaseTz, | ||
| String int96RebaseMode, | ||
| String int96RebaseTz) throws IOException { | ||
| String int96RebaseTz, | ||
| ParsedVersion writerVersion, | ||
| MemoryMode memoryMode) throws IOException { | ||
| this.descriptor = descriptor; | ||
| this.pageReader = pageReader; | ||
| this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes); | ||
|
|
@@ -129,6 +137,8 @@ public VectorizedColumnReader( | |
| this.datetimeRebaseMode = datetimeRebaseMode; | ||
| assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) || | ||
| "CORRECTED".equals(int96RebaseMode); | ||
| this.writerVersion = writerVersion; | ||
| this.memoryMode = memoryMode; | ||
| } | ||
|
|
||
| private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) { | ||
|
|
@@ -174,7 +184,7 @@ void readBatch(int total, WritableColumnVector column) throws IOException { | |
| readState.resetForNewPage(pageValueCount, pageFirstRowIndex); | ||
| } | ||
| PrimitiveType.PrimitiveTypeName typeName = | ||
| descriptor.getPrimitiveType().getPrimitiveTypeName(); | ||
| descriptor.getPrimitiveType().getPrimitiveTypeName(); | ||
|
||
| if (isCurrentPageDictionaryEncoded) { | ||
| // Save starting offset in case we need to decode dictionary IDs. | ||
| int startOffset = readState.offset; | ||
|
|
@@ -259,6 +269,7 @@ private void initDataReader( | |
| int pageValueCount, | ||
| Encoding dataEncoding, | ||
| ByteBufferInputStream in) throws IOException { | ||
| ValuesReader previousReader = this.dataColumn; | ||
| if (dataEncoding.usesDictionary()) { | ||
| this.dataColumn = null; | ||
| if (dictionary == null) { | ||
|
|
@@ -283,25 +294,30 @@ private void initDataReader( | |
| } catch (IOException e) { | ||
| throw new IOException("could not read page in col " + descriptor, e); | ||
| } | ||
| if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) && | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does this happen? Can you add a comment on why we need this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment. Detailed explanation is in the comment in |
||
| previousReader instanceof RequiresPreviousReader) { | ||
| // previous reader can only be set if reading sequentially | ||
|
||
| ((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader); | ||
| } | ||
| } | ||
|
|
||
| private ValuesReader getValuesReader(Encoding encoding) { | ||
| switch (encoding) { | ||
| case PLAIN: | ||
| return new VectorizedPlainValuesReader(); | ||
| case DELTA_BYTE_ARRAY: | ||
| return new VectorizedDeltaByteArrayReader(); | ||
| return new VectorizedDeltaByteArrayReader(memoryMode); | ||
| case DELTA_BINARY_PACKED: | ||
| return new VectorizedDeltaBinaryPackedReader(); | ||
| case RLE: | ||
| PrimitiveType.PrimitiveTypeName typeName = | ||
| this.descriptor.getPrimitiveType().getPrimitiveTypeName(); | ||
| this.descriptor.getPrimitiveType().getPrimitiveTypeName(); | ||
|
||
| // RLE encoding only supports boolean type `Values`, and `bitwidth` is always 1. | ||
| if (typeName == BOOLEAN) { | ||
| return new VectorizedRleValuesReader(1); | ||
| } else { | ||
| throw new UnsupportedOperationException( | ||
| "RLE encoding is not supported for values of type: " + typeName); | ||
| "RLE encoding is not supported for values of type: " + typeName); | ||
|
||
| } | ||
| default: | ||
| throw new UnsupportedOperationException("Unsupported encoding: " + encoding); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,13 +90,18 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce | |
| Preconditions.checkArgument(miniSize % 8 == 0, | ||
| "miniBlockSize must be multiple of 8, but it's " + miniSize); | ||
| this.miniBlockSizeInValues = (int) miniSize; | ||
| // True value count. May be less than valueCount because of nulls | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be more useful to annotate the method
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the comment to |
||
| this.totalValueCount = BytesUtils.readUnsignedVarInt(in); | ||
| this.bitWidths = new int[miniBlockNumInABlock]; | ||
| this.unpackedValuesBuffer = new long[miniBlockSizeInValues]; | ||
| // read the first value | ||
| firstValue = BytesUtils.readZigZagVarLong(in); | ||
| } | ||
|
|
||
| int getTotalValueCount() { | ||
| return totalValueCount; | ||
| } | ||
|
|
||
| @Override | ||
| public byte readByte() { | ||
| readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,51 +16,131 @@ | |
| */ | ||
| package org.apache.spark.sql.execution.datasources.parquet; | ||
|
|
||
| import static org.apache.spark.sql.types.DataTypes.BinaryType; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a clear import order definition for static import ? @sunchao @dongjoon-hyun |
||
| import static org.apache.spark.sql.types.DataTypes.IntegerType; | ||
|
|
||
| import org.apache.parquet.bytes.ByteBufferInputStream; | ||
| import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; | ||
| import org.apache.parquet.column.values.RequiresPreviousReader; | ||
| import org.apache.parquet.column.values.ValuesReader; | ||
| import org.apache.parquet.io.api.Binary; | ||
| import org.apache.spark.memory.MemoryMode; | ||
| import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; | ||
| import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; | ||
| import org.apache.spark.sql.execution.vectorized.WritableColumnVector; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| /** | ||
| * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface. | ||
| * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized | ||
| * interface. | ||
| */ | ||
| public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase { | ||
| private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader(); | ||
| public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase | ||
| implements VectorizedValuesReader, RequiresPreviousReader { | ||
|
|
||
| private final MemoryMode memoryMode; | ||
| private final VectorizedDeltaBinaryPackedReader prefixLengthReader = | ||
| new VectorizedDeltaBinaryPackedReader(); | ||
|
||
| private final VectorizedDeltaLengthByteArrayReader suffixReader; | ||
| private WritableColumnVector prefixLengthVector; | ||
| private WritableColumnVector suffixVector; | ||
| private byte[] previous = new byte[0]; | ||
sunchao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private int currentRow = 0; | ||
|
|
||
| //temporary variable used by getBinary | ||
|
||
| private Binary binaryVal; | ||
|
|
||
| VectorizedDeltaByteArrayReader(MemoryMode memoryMode){ | ||
| this.memoryMode = memoryMode; | ||
| this.suffixReader = new VectorizedDeltaLengthByteArrayReader(memoryMode); | ||
| } | ||
|
|
||
| @Override | ||
| public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { | ||
| deltaByteArrayReader.initFromPage(valueCount, in); | ||
| if (memoryMode == MemoryMode.OFF_HEAP) { | ||
| prefixLengthVector = new OffHeapColumnVector(valueCount, IntegerType); | ||
| suffixVector = new OffHeapColumnVector(valueCount, BinaryType); | ||
| } else { | ||
| prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType); | ||
| suffixVector = new OnHeapColumnVector(valueCount, BinaryType); | ||
| } | ||
| prefixLengthReader.initFromPage(valueCount, in); | ||
| prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(), | ||
| prefixLengthVector, 0); | ||
| suffixReader.initFromPage(valueCount, in); | ||
| suffixReader.readBinary(valueCount, suffixVector, 0); | ||
|
||
| } | ||
|
|
||
| @Override | ||
| public Binary readBinary(int len) { | ||
| return deltaByteArrayReader.readBytes(); | ||
| readValues(1, null, 0, | ||
| (w, r, v, l) -> | ||
|
||
| binaryVal = Binary.fromConstantByteArray(v.array(), v.arrayOffset() + v.position(), l)); | ||
| return binaryVal; | ||
| } | ||
|
|
||
| @Override | ||
| public void readBinary(int total, WritableColumnVector c, int rowId) { | ||
| public void readValues(int total, WritableColumnVector c, int rowId, | ||
|
||
| ByteBufferOutputWriter outputWriter) { | ||
| if (total == 0) { | ||
|
||
| return; | ||
| } | ||
|
|
||
| for (int i = 0; i < total; i++) { | ||
| Binary binary = deltaByteArrayReader.readBytes(); | ||
| ByteBuffer buffer = binary.toByteBuffer(); | ||
| if (buffer.hasArray()) { | ||
| c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), | ||
| binary.length()); | ||
| int prefixLength = prefixLengthVector.getInt(currentRow); | ||
| byte[] suffix = suffixVector.getBinary(currentRow); | ||
|
||
| // This does not copy bytes | ||
|
||
| int length = prefixLength + suffix.length; | ||
|
|
||
| // NOTE: due to PARQUET-246, it is important that we | ||
sunchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // respect prefixLength which was read from prefixLengthReader, | ||
| // even for the *first* value of a page. Even though the first | ||
| // value of the page should have an empty prefix, it may not | ||
| // because of PARQUET-246. | ||
|
|
||
| // We have to do this to materialize the output | ||
| if (prefixLength != 0) { | ||
| // We could do | ||
| // c.putByteArray(rowId + i, previous, 0, prefixLength); | ||
| // c.putByteArray(rowId+i, suffix, prefixLength, suffix.length); | ||
| // previous = c.getBinary(rowId+1); | ||
| // but it incurs the same cost of copying the values twice _and_ c.getBinary | ||
| // is a _slow_ byte by byte copy | ||
| // The following always uses the faster system arraycopy method | ||
| byte[] out = new byte[length]; | ||
|
||
| System.arraycopy(previous, 0, out, 0, prefixLength); | ||
| System.arraycopy(suffix, 0, out, prefixLength, suffix.length); | ||
| previous = out; | ||
| } else { | ||
| byte[] bytes = new byte[binary.length()]; | ||
| buffer.get(bytes); | ||
| c.putByteArray(rowId + i, bytes); | ||
| previous = suffix; | ||
| } | ||
| outputWriter.write(c, rowId + i, ByteBuffer.wrap(previous), previous.length); | ||
| currentRow++; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void skipBinary(int total) { | ||
| for (int i = 0; i < total; i++) { | ||
| deltaByteArrayReader.skip(); | ||
| public void readBinary(int total, WritableColumnVector c, int rowId) { | ||
| readValues(total, c, rowId, ByteBufferOutputWriter::writeArrayByteBuffer); | ||
| } | ||
|
|
||
| /** | ||
| * There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not clear the | ||
| * previous value state that it tracks internally. This resulted in the first value of all pages | ||
sunchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * (except for the first page) to be a delta from the last value of the previous page. In order to | ||
| * read corrupted files written with this bug, when reading a new page we need to recover the | ||
| * previous page's last value to use it (if needed) to read the first value. | ||
| */ | ||
| public void setPreviousReader(ValuesReader reader) { | ||
| if (reader != null) { | ||
| this.previous = ((VectorizedDeltaByteArrayReader) reader).previous; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void skipBinary(int total) { | ||
| // we have to read all the values so that we always have the correct 'previous' | ||
| // we just don't write it to the output vector | ||
| readValues(total, null, currentRow, ByteBufferOutputWriter::skipWrite); | ||
| } | ||
|
|
||
| } | ||



Uh oh!
There was an error while loading. Please reload this page.