@@ -46,8 +46,6 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
4646
4747 this : NativeColumnBuilder [T ] with WithCompressionSchemes =>
4848
49- import CompressionScheme ._
50-
5149 var compressionEncoders : Seq [Encoder [T ]] = _
5250
5351 abstract override def initialize (initialSize : Int , columnName : String , useCompression : Boolean ) {
@@ -81,28 +79,32 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
8179 }
8280 }
8381
84- abstract override def build () = {
85- val rawBuffer = super .build()
82+ override def build () = {
83+ val nonNullBuffer = buildNonNulls()
84+ val typeId = nonNullBuffer.getInt()
8685 val encoder : Encoder [T ] = {
8786 val candidate = compressionEncoders.minBy(_.compressionRatio)
8887 if (isWorthCompressing(candidate)) candidate else PassThrough .encoder
8988 }
9089
91- val headerSize = columnHeaderSize(rawBuffer)
90+ // Header = column type ID + null count + null positions
91+ val headerSize = 4 + 4 + nulls.limit()
9292 val compressedSize = if (encoder.compressedSize == 0 ) {
93- rawBuffer.limit - headerSize
93+ nonNullBuffer.remaining()
9494 } else {
9595 encoder.compressedSize
9696 }
9797
98- // Reserves 4 bytes for compression scheme ID
9998 val compressedBuffer = ByteBuffer
99+ // Reserves 4 bytes for compression scheme ID
100100 .allocate(headerSize + 4 + compressedSize)
101101 .order(ByteOrder .nativeOrder)
102-
103- copyColumnHeader(rawBuffer, compressedBuffer)
102+ // Write the header
103+ .putInt(typeId)
104+ .putInt(nullCount)
105+ .put(nulls)
104106
105107 logInfo(s " Compressor for [ $columnName]: $encoder, ratio: ${encoder.compressionRatio}" )
106- encoder.compress(rawBuffer , compressedBuffer, columnType)
108+ encoder.compress(nonNullBuffer , compressedBuffer, columnType)
107109 }
108110}
0 commit comments