Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7a8b41c
[SPARK-37974][SQL] Vectorized implementation of DeltaLengthByteArray …
parthchandra Jan 13, 2022
2c73794
[SPARK-37974][SQL] Vectorized implementation of DeltaByteArray reader
parthchandra Jan 14, 2022
52df517
Addressing review comments
parthchandra Jan 22, 2022
0011bab
More review comments addressed
parthchandra Jan 24, 2022
50ed815
One more review comment
parthchandra Jan 25, 2022
3dc340a
Updated JDK 8 benchmark
parthchandra Jan 26, 2022
ca20068
Fix for off heap memory not being initialized. Added off heap mode to…
parthchandra Jan 28, 2022
6ad1dbe
Remove use of OffHeap vectors for internal buffers. Skip writing to o…
parthchandra Jan 31, 2022
6f364d9
more review comments addressed
parthchandra Feb 2, 2022
80a4ceb
Still more review comments addressed
parthchandra Feb 9, 2022
406d176
Remove unnecessary check for 'total' parameter in 'readValues'
parthchandra Feb 11, 2022
be62ad6
Remove check for zero length in DeltaLengthByteArrayReader, and add u…
parthchandra Feb 14, 2022
166afe1
Update benchmark
parthchandra Feb 16, 2022
0583e2f
Evaluate suffix array lazily in VectorizedDeltaLengthByteArrayReader
sunchao Mar 5, 2022
31eee9f
In DeltaLengthByteArrayReader avoid extra copy if memory mode is on_h…
sunchao Mar 5, 2022
9eaf387
Avoid unnecessary check for parameter in skipBytes
parthchandra Mar 7, 2022
1fc0060
Update benchmark results
parthchandra Mar 7, 2022
d95100a
More review comments
parthchandra Mar 15, 2022
6d273f0
More review comments addressed
parthchandra Mar 16, 2022
1d15022
Cleaner naming for WritableColumnVector.getBytesUnsafe
parthchandra Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
470 changes: 235 additions & 235 deletions sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt

Large diffs are not rendered by default.

470 changes: 235 additions & 235 deletions sql/core/benchmarks/DataSourceReadBenchmark-jdk17-results.txt

Large diffs are not rendered by default.

470 changes: 235 additions & 235 deletions sql/core/benchmarks/DataSourceReadBenchmark-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.apache.parquet.VersionParser;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.column.page.PageReadStore;
import scala.Option;

Expand Down Expand Up @@ -69,6 +71,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
protected MessageType fileSchema;
protected MessageType requestedSchema;
protected StructType sparkSchema;
// Keep track of the version of the parquet writer. An older version wrote
// corrupt delta byte arrays, and the version check is needed to detect that.
protected ParsedVersion writerVersion;

/**
* The total number of rows this RecordReader will eventually read. The sum of the
Expand All @@ -93,6 +98,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
HadoopInputFile.fromPath(file, configuration), options);
this.reader = new ParquetRowGroupReaderImpl(fileReader);
this.fileSchema = fileReader.getFileMetaData().getSchema();
try {
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy());
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will other types of exceptions be thrown here, except VersionParseException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes. I encountered at least one case where the version information was empty and the version check threw a NPE.

// Swallow any exception, if we cannot parse the version we will revert to a sequential read
// if the column is a delta byte array encoding (due to PARQUET-246).
}
Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData();
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import java.time.ZoneId;
import java.util.PrimitiveIterator;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
Expand Down Expand Up @@ -86,6 +89,7 @@ public class VectorizedColumnReader {
private final ColumnDescriptor descriptor;
private final LogicalTypeAnnotation logicalTypeAnnotation;
private final String datetimeRebaseMode;
private final ParsedVersion writerVersion;

public VectorizedColumnReader(
ColumnDescriptor descriptor,
Expand All @@ -96,7 +100,8 @@ public VectorizedColumnReader(
String datetimeRebaseMode,
String datetimeRebaseTz,
String int96RebaseMode,
String int96RebaseTz) throws IOException {
String int96RebaseTz,
ParsedVersion writerVersion) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes);
Expand Down Expand Up @@ -129,6 +134,7 @@ public VectorizedColumnReader(
this.datetimeRebaseMode = datetimeRebaseMode;
assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) ||
"CORRECTED".equals(int96RebaseMode);
this.writerVersion = writerVersion;
}

private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
Expand Down Expand Up @@ -259,6 +265,7 @@ private void initDataReader(
int pageValueCount,
Encoding dataEncoding,
ByteBufferInputStream in) throws IOException {
ValuesReader previousReader = this.dataColumn;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
if (dictionary == null) {
Expand All @@ -283,6 +290,11 @@ private void initDataReader(
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? Can you add a comment on why we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment. Detailed explanation is in the comment in VectorizedDeltaByteArrayReader.setPreviousValue

previousReader instanceof RequiresPreviousReader) {
// previous reader can only be set if reading sequentially
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: [P]revious.

((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
}
}

private ValuesReader getValuesReader(Encoding encoding) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,18 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
Preconditions.checkArgument(miniSize % 8 == 0,
"miniBlockSize must be multiple of 8, but it's " + miniSize);
this.miniBlockSizeInValues = (int) miniSize;
// True value count. May be less than valueCount because of nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more useful to annotate the method getTotalValueCount instead of here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment to getTotalValueCount as well.

this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
this.bitWidths = new int[miniBlockNumInABlock];
this.unpackedValuesBuffer = new long[miniBlockSizeInValues];
// read the first value
firstValue = BytesUtils.readZigZagVarLong(in);
}

int getTotalValueCount() {
return totalValueCount;
}

@Override
public byte readByte() {
readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,115 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;

import static org.apache.spark.sql.types.DataTypes.BinaryType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a clear import order definition for static import ? @sunchao @dongjoon-hyun

import static org.apache.spark.sql.types.DataTypes.IntegerType;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface.
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized
* interface.
*/
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader();
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
implements VectorizedValuesReader, RequiresPreviousReader {

private final VectorizedDeltaBinaryPackedReader prefixLengthReader;
private final VectorizedDeltaLengthByteArrayReader suffixReader;
private WritableColumnVector prefixLengthVector;
private WritableColumnVector suffixVector;
private byte[] previous = new byte[0];
private int currentRow = 0;

// temporary variable used by getBinary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getBinary -> readBinary. Also can we add some comments on what tempBinaryValVector is for?

private final WritableColumnVector binaryValVector;

VectorizedDeltaByteArrayReader() {
this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader();
this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
binaryValVector = new OnHeapColumnVector(1, BinaryType);
}

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
deltaByteArrayReader.initFromPage(valueCount, in);
prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
prefixLengthReader.initFromPage(valueCount, in);
prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
prefixLengthVector, 0);
suffixReader.initFromPage(valueCount, in);
suffixReader.readBinary(prefixLengthReader.getTotalValueCount(), suffixVector, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of eagerly read the suffixes, we can have a method in VectorizedDeltaLengthByteArrayReader that just return the suffix at rowId:

  public ByteBuffer getBytes(int rowId) {
    int length = lengthsVector.getInt(rowId);
    try {
      return in.slice(length);
    } catch (EOFException e) {
      throw new ParquetDecodingException("Failed to read " + length + " bytes");
    }
  }

I tried this approach here, and it can improve the benchmark.

}

@Override
public Binary readBinary(int len) {
return deltaByteArrayReader.readBytes();
readValues(1, binaryValVector, 0, ByteBufferOutputWriter::writeArrayByteBuffer);
return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
private void readValues(int total, WritableColumnVector c, int rowId,
ByteBufferOutputWriter outputWriter) {
for (int i = 0; i < total; i++) {
Binary binary = deltaByteArrayReader.readBytes();
ByteBuffer buffer = binary.toByteBuffer();
if (buffer.hasArray()) {
c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(),
binary.length());
// NOTE: due to PARQUET-246, it is important that we
// respect prefixLength which was read from prefixLengthReader,
// even for the *first* value of a page. Even though the first
// value of the page should have an empty prefix, it may not
// because of PARQUET-246.
int prefixLength = prefixLengthVector.getInt(currentRow);
byte[] suffix = suffixVector.getBinary(currentRow);
int length = prefixLength + suffix.length;

// We have to do this to materialize the output
if (prefixLength != 0) {
// We could do
// c.putByteArray(rowId + i, previous, 0, prefixLength);
// c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
// previous = c.getBinary(rowId+1);
// but it incurs the same cost of copying the values twice _and_ c.getBinary
// is a _slow_ byte by byte copy
// The following always uses the faster system arraycopy method
byte[] out = new byte[length];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also potentially skip this copying at least for OnHeapColumnVector. I tried it and it gives some extra performance improvements.

[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (0.0%):            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                            5721           5727           8          1.8         545.6       1.0X
[info] SQL Json                                           6289           6295           9          1.7         599.7       0.9X
[info] SQL Parquet Vectorized: DataPageV1                  700            800          87         15.0          66.7       8.2X
[info] SQL Parquet Vectorized: DataPageV2                  994           1031          52         10.5          94.8       5.8X
[info] SQL Parquet MR: DataPageV1                         2035           2051          23          5.2         194.1       2.8X
[info] SQL Parquet MR: DataPageV2                         2289           2454         232          4.6         218.3       2.5X
[info] ParquetReader Vectorized: DataPageV1                472            482          15         22.2          45.0      12.1X
[info] ParquetReader Vectorized: DataPageV2                640            645           4         16.4          61.0       8.9X
[info] SQL ORC Vectorized                                  670            694          35         15.7          63.9       8.5X
[info] SQL ORC MR                                         1846           2047         284          5.7         176.0       3.1X

[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (50.0%):           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                            4825           4890          91          2.2         460.2       1.0X
[info] SQL Json                                           5298           7385        2951          2.0         505.3       0.9X
[info] SQL Parquet Vectorized: DataPageV1                  701            889         169         14.9          66.9       6.9X
[info] SQL Parquet Vectorized: DataPageV2                  684            737          58         15.3          65.2       7.1X
[info] SQL Parquet MR: DataPageV1                         1857           1869          17          5.6         177.1       2.6X
[info] SQL Parquet MR: DataPageV2                         2034           2146         159          5.2         193.9       2.4X
[info] ParquetReader Vectorized: DataPageV1                474            493          11         22.1          45.2      10.2X
[info] ParquetReader Vectorized: DataPageV2                585            586           1         17.9          55.8       8.2X
[info] SQL ORC Vectorized                                  810            845          53         12.9          77.3       6.0X
[info] SQL ORC MR                                         1854           1935         114          5.7         176.8       2.6X

[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (95.0%):           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                            3212           3256          63          3.3         306.3       1.0X
[info] SQL Json                                           3693           3695           3          2.8         352.2       0.9X
[info] SQL Parquet Vectorized: DataPageV1                  147            203          46         71.2          14.0      21.8X
[info] SQL Parquet Vectorized: DataPageV2                  160            286         144         65.4          15.3      20.0X
[info] SQL Parquet MR: DataPageV1                         1229           1351         172          8.5         117.2       2.6X
[info] SQL Parquet MR: DataPageV2                         1074           1099          36          9.8         102.4       3.0X
[info] ParquetReader Vectorized: DataPageV1                107            109           2         97.9          10.2      30.0X
[info] ParquetReader Vectorized: DataPageV2                124            127           2         84.7          11.8      25.9X
[info] SQL ORC Vectorized                                  262            308          86         40.0          25.0      12.3X
[info] SQL ORC MR                                         1002           1070          96         10.5          95.5       3.2X
``

System.arraycopy(previous, 0, out, 0, prefixLength);
System.arraycopy(suffix, 0, out, prefixLength, suffix.length);
previous = out;
} else {
byte[] bytes = new byte[binary.length()];
buffer.get(bytes);
c.putByteArray(rowId + i, bytes);
previous = suffix;
}
outputWriter.write(c, rowId + i, ByteBuffer.wrap(previous), previous.length);
currentRow++;
}
}

@Override
public void skipBinary(int total) {
for (int i = 0; i < total; i++) {
deltaByteArrayReader.skip();
public void readBinary(int total, WritableColumnVector c, int rowId) {
readValues(total, c, rowId, ByteBufferOutputWriter::writeArrayByteBuffer);
}

/**
* There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not clear the
* previous value state that it tracks internally. This resulted in the first value of all pages
* (except for the first page) to be a delta from the last value of the previous page. In order to
* read corrupted files written with this bug, when reading a new page we need to recover the
* previous page's last value to use it (if needed) to read the first value.
*/
public void setPreviousReader(ValuesReader reader) {
if (reader != null) {
this.previous = ((VectorizedDeltaByteArrayReader) reader).previous;
}
}

@Override
public void skipBinary(int total) {
// we have to read all the values so that we always have the correct 'previous'
// we just don't write it to the output vector
readValues(total, null, currentRow, ByteBufferOutputWriter::skipWrite);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;

import static org.apache.spark.sql.types.DataTypes.IntegerType;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

/**
* An implementation of the Parquet DELTA_LENGTH_BYTE_ARRAY decoder that supports the vectorized
* interface.
*/
public class VectorizedDeltaLengthByteArrayReader extends VectorizedReaderBase implements
VectorizedValuesReader {

private final VectorizedDeltaBinaryPackedReader lengthReader;
private ByteBufferInputStream in;
private WritableColumnVector lengthsVector;
private int currentRow = 0;

VectorizedDeltaLengthByteArrayReader() {
lengthReader = new VectorizedDeltaBinaryPackedReader();
}

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
lengthsVector = new OnHeapColumnVector(valueCount, IntegerType);
lengthReader.initFromPage(valueCount, in);
lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0);
this.in = in.remainingStream();
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
ByteBuffer buffer;
ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
int length;
for (int i = 0; i < total; i++) {
length = lengthsVector.getInt(rowId + i);
try {
buffer = in.slice(length);
} catch (EOFException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes");
}
outputWriter.write(c, rowId + i, buffer, length);
}
currentRow += total;
}

@Override
public void skipBinary(int total) {
if (total == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return;
}
int length;
for (int i = 0; i < total; i++) {
length = lengthsVector.getInt(currentRow + i);
int remaining = length;
while (remaining > 0) {
remaining -= in.skip(remaining);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss anything? Do we really need length here?

}
}
currentRow += total;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ private void checkEndOfRowGroup() throws IOException {
datetimeRebaseMode,
datetimeRebaseTz,
int96RebaseMode,
int96RebaseTz);
int96RebaseTz,
writerVersion
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be on the previous line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet;

import java.nio.ByteBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line after the import.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -86,4 +87,20 @@ interface IntegerOutputWriter {
void write(WritableColumnVector outputColumnVector, int rowId, long val);
}

@FunctionalInterface
interface ByteBufferOutputWriter {
void write(WritableColumnVector c, int rowId, ByteBuffer val, int length);

static void writeArrayByteBuffer(WritableColumnVector c, int rowId, ByteBuffer val,
Copy link
Contributor

@LuciferYang LuciferYang Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good practice to add static methods to interface? I'm not sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it is frowned upon. In this case, not including in the interface only leads to some code bloat.

int length) {
c.putByteArray(rowId,
val.array(),
val.arrayOffset() + val.position(),
length);
}

static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { }

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}
Loading