@@ -39,16 +39,18 @@ public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
3939 private final VectorizedDeltaBinaryPackedReader prefixLengthReader ;
4040 private final VectorizedDeltaLengthByteArrayReader suffixReader ;
4141 private WritableColumnVector prefixLengthVector ;
42- private ByteBuffer previous = null ;
42+ private ByteBuffer previous ;
4343 private int currentRow = 0 ;
4444
4545 // temporary variable used by getBinary
4646 private final WritableColumnVector binaryValVector ;
47+ private final WritableColumnVector tempBinaryValVector ;
4748
4849 VectorizedDeltaByteArrayReader () {
4950 this .prefixLengthReader = new VectorizedDeltaBinaryPackedReader ();
5051 this .suffixReader = new VectorizedDeltaLengthByteArrayReader ();
5152 binaryValVector = new OnHeapColumnVector (1 , BinaryType );
53+ tempBinaryValVector = new OnHeapColumnVector (1 , BinaryType );
5254 }
5355
5456 @ Override
@@ -62,12 +64,11 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
6264
6365 @ Override
6466 public Binary readBinary (int len ) {
65- readValues (1 , binaryValVector , 0 , ByteBufferOutputWriter :: writeArrayByteBuffer );
67+ readValues (1 , binaryValVector , 0 );
6668 return Binary .fromConstantByteArray (binaryValVector .getBinary (0 ));
6769 }
6870
69- private void readValues (int total , WritableColumnVector c , int rowId ,
70- ByteBufferOutputWriter outputWriter ) {
71+ private void readValues (int total , WritableColumnVector c , int rowId ) {
7172 for (int i = 0 ; i < total ; i ++) {
7273 // NOTE: due to PARQUET-246, it is important that we
7374 // respect prefixLength which was read from prefixLengthReader,
@@ -81,29 +82,21 @@ private void readValues(int total, WritableColumnVector c, int rowId,
8182 int length = prefixLength + suffixLength ;
8283
8384 // We have to do this to materialize the output
85+ WritableColumnVector arrayData = c .arrayData ();
86+ int offset = arrayData .getElementsAppended ();
8487 if (prefixLength != 0 ) {
85- // We could do
86- // c.putByteArray(rowId + i, previous, 0, prefixLength);
87- // c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
88- // previous = c.getBinary(rowId+1);
89- // but it incurs the same cost of copying the values twice _and_ c.getBinary
90- // is a _slow_ byte by byte copy
91- // The following always uses the faster system arraycopy method
92- byte [] out = new byte [length ];
93- System .arraycopy (previous .array (), previous .position (), out , 0 , prefixLength );
94- System .arraycopy (suffixArray , suffix .position (), out , prefixLength , suffixLength );
95- previous = ByteBuffer .wrap (out );
96- } else {
97- previous = suffix ;
88+ arrayData .appendBytes (prefixLength , previous .array (), previous .position ());
9889 }
99- outputWriter .write (c , rowId + i , previous , previous .limit () - previous .position ());
90+ arrayData .appendBytes (suffixLength , suffixArray , suffix .position ());
91+ c .putArray (rowId + i , offset , length );
92+ previous = arrayData .getBytesUnsafe (offset , length );
10093 currentRow ++;
10194 }
10295 }
10396
10497 @ Override
10598 public void readBinary (int total , WritableColumnVector c , int rowId ) {
106- readValues (total , c , rowId , ByteBufferOutputWriter :: writeArrayByteBuffer );
99+ readValues (total , c , rowId );
107100 }
108101
109102 /**
@@ -121,9 +114,29 @@ public void setPreviousReader(ValuesReader reader) {
121114
122115 @ Override
123116 public void skipBinary (int total ) {
124- // we have to read all the values so that we always have the correct 'previous'
125- // we just don't write it to the output vector
126- readValues (total , null , currentRow , ByteBufferOutputWriter ::skipWrite );
117+ WritableColumnVector c1 = tempBinaryValVector ;
118+ WritableColumnVector c2 = binaryValVector ;
119+
120+ for (int i = 0 ; i < total ; i ++) {
121+ int prefixLength = prefixLengthVector .getInt (currentRow );
122+ ByteBuffer suffix = suffixReader .getBytes (currentRow );
123+ byte [] suffixArray = suffix .array ();
124+ int suffixLength = suffix .limit () - suffix .position ();
125+ int length = prefixLength + suffixLength ;
126+
127+ WritableColumnVector arrayData = c1 .arrayData ();
128+ c1 .reset ();
129+ if (prefixLength != 0 ) {
130+ arrayData .appendBytes (prefixLength , previous .array (), previous .position ());
131+ }
132+ arrayData .appendBytes (suffixLength , suffixArray , suffix .position ());
133+ previous = arrayData .getBytesUnsafe (0 , length );
134+ currentRow ++;
135+
136+ WritableColumnVector tmp = c1 ;
137+ c1 = c2 ;
138+ c2 = tmp ;
139+ }
127140 }
128141
129142}
0 commit comments