Skip to content
68 changes: 26 additions & 42 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
Expand All @@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
Expand All @@ -81,9 +80,15 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;

mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod offset_buffer;

#[cfg(test)]
mod test_util;

pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -271,7 +276,8 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -829,17 +835,18 @@ fn remove_indices(
size
),
ArrowType::Struct(fields) => {
let struct_array = arr.as_any()
let struct_array = arr
.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields.into_iter()
let new_columns = fields
.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field,
remove_indices(column.clone(), dt, indices.clone())?))
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand Down Expand Up @@ -2025,7 +2009,7 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
Expand Down
106 changes: 36 additions & 70 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,12 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
}

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
self.decoder.as_mut().expect("decoder set").read(
out,
range.end - range.start,
self.dict.as_ref(),
)
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;

decoder.read(out, range.end - range.start, self.dict.as_ref())
}
}

Expand Down Expand Up @@ -266,7 +267,9 @@ impl ByteArrayDecoder {
match self {
ByteArrayDecoder::Plain(d) => d.read(out, len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict.expect("dictionary set");
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.read(out, dict, len)
}
ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
Expand Down Expand Up @@ -546,6 +549,10 @@ impl ByteArrayDecoderDictionary {
dict: &OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
if dict.is_empty() {
return Ok(0); // All data must be NULL
}

let mut values_read = 0;

while values_read != len && self.max_remaining_values != 0 {
Expand Down Expand Up @@ -579,69 +586,16 @@ impl ByteArrayDecoderDictionary {
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::basic::Type as PhysicalType;
use crate::data_type::{ByteArray, ByteArrayType};
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
use crate::util::memory::MemTracker;
use arrow::array::{Array, StringArray};
use std::sync::Arc;

fn column() -> ColumnDescPtr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is extracted out into test_utils

let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
.with_converted_type(ConvertedType::UTF8)
.build()
.unwrap();

Arc::new(ColumnDescriptor::new(
Arc::new(t),
1,
0,
ColumnPath::new(vec![]),
))
}

fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
let descriptor = column();
let mem_tracker = Arc::new(MemTracker::new());
let mut encoder =
get_encoder::<ByteArrayType>(descriptor, encoding, mem_tracker).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
}

#[test]
fn test_byte_array_decoder() {
let data: Vec<_> = vec!["hello", "world", "a", "b"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is extracted out into test_utils so that it can be reused

.into_iter()
.map(ByteArray::from)
.collect();

let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column(), Arc::new(MemTracker::new()));

dict_encoder.put(&data).unwrap();
let encoded_rle = dict_encoder.flush_buffer().unwrap();
let encoded_dictionary = dict_encoder.write_dict().unwrap();

// A column chunk with all the encodings!
let pages = vec![
(Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
(
Encoding::DELTA_BYTE_ARRAY,
get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
),
(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
),
(Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
(Encoding::RLE_DICTIONARY, encoded_rle),
];
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);

let column_desc = column();
let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
Expand All @@ -668,15 +622,9 @@ mod tests {
assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);

let valid = vec![false, false, true, true, false, true, true, false, false];
let rev_position_iter = valid
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of how changing pad_nulls to take a packed bitmask makes the tests less verbose

.iter()
.enumerate()
.rev()
.filter_map(|(i, valid)| valid.then(|| i));

let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), rev_position_iter);
output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

Expand All @@ -696,4 +644,22 @@ mod tests {
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
}
}
}
Loading