diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs index 172bdaac9eb6..64767e31cf23 100644 --- a/arrow/src/array/data.rs +++ b/arrow/src/array/data.rs @@ -506,6 +506,11 @@ impl ArrayDataBuilder { self } + pub fn null_count(mut self, null_count: usize) -> Self { + self.null_count = Some(null_count); + self + } + pub fn null_bit_buffer(mut self, buf: Buffer) -> Self { self.null_bit_buffer = Some(buf); self diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs index 3fae2205ad15..0194b9387c47 100644 --- a/arrow/src/array/transform/mod.rs +++ b/arrow/src/array/transform/mod.rs @@ -25,7 +25,7 @@ use std::mem; use super::{ data::{into_buffers, new_buffers}, - ArrayData, + ArrayData, ArrayDataBuilder, }; use crate::array::StringOffsetSizeTrait; @@ -63,7 +63,7 @@ struct _MutableArrayData<'a> { } impl<'a> _MutableArrayData<'a> { - fn freeze(self, dictionary: Option) -> ArrayData { + fn freeze(self, dictionary: Option) -> ArrayDataBuilder { let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2); let child_data = match self.data_type { @@ -76,19 +76,19 @@ impl<'a> _MutableArrayData<'a> { child_data } }; - ArrayData::new( - self.data_type, - self.len, - Some(self.null_count), - if self.null_count > 0 { - Some(self.null_buffer.into()) - } else { - None - }, - 0, - buffers, - child_data, - ) + + let mut array_data_builder = ArrayDataBuilder::new(self.data_type) + .offset(0) + .len(self.len) + .null_count(self.null_count) + .buffers(buffers) + .child_data(child_data); + if self.null_count > 0 { + array_data_builder = + array_data_builder.null_bit_buffer(self.null_buffer.into()); + } + + array_data_builder } } @@ -552,8 +552,13 @@ impl<'a> MutableArrayData<'a> { .map(|array| build_extend_null_bits(array, use_nulls)) .collect(); - let null_bytes = bit_util::ceil(array_capacity, 8); - let null_buffer = MutableBuffer::from_len_zeroed(null_bytes); + let null_buffer = if use_nulls { + let null_bytes = bit_util::ceil(array_capacity, 8); + MutableBuffer::from_len_zeroed(null_bytes) + } else { + // create 0 capacity mutable buffer with the intention that it won't be used + MutableBuffer::with_capacity(0) + }; let extend_values = match &data_type { DataType::Dictionary(_, _) => { @@ -605,13 +610,40 @@ impl<'a> MutableArrayData<'a> { /// Extends this [MutableArrayData] with null elements, disregarding the bound arrays pub fn extend_nulls(&mut self, len: usize) { + // TODO: null_buffer should probably be extended here as well + // otherwise is_valid() could later panic + // add test to confirm self.data.null_count += len; (self.extend_nulls)(&mut self.data, len); self.data.len += len; } + /// Returns the current length + #[inline] + pub fn len(&self) -> usize { + self.data.len + } + + /// Returns true if len is 0 + #[inline] + pub fn is_empty(&self) -> bool { + self.data.len == 0 + } + + /// Returns the current null count + #[inline] + pub fn null_count(&self) -> usize { + self.data.null_count + } + /// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`. pub fn freeze(self) -> ArrayData { + self.data.freeze(self.dictionary).build() + } + + /// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`. + /// This is useful for extending the default behavior of MutableArrayData. + pub fn into_builder(self) -> ArrayDataBuilder { self.data.freeze(self.dictionary) } } diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs index 35b123d1fc44..7d336e0a938f 100644 --- a/arrow/src/buffer/mutable.rs +++ b/arrow/src/buffer/mutable.rs @@ -327,10 +327,10 @@ impl MutableBuffer { } /// Extends the buffer with a new item, without checking for sufficient capacity - /// Safety + /// # Safety /// Caller must ensure that the capacity()-len()>=size_of() #[inline] - unsafe fn push_unchecked(&mut self, item: T) { + pub unsafe fn push_unchecked(&mut self, item: T) { let additional = std::mem::size_of::(); let dst = self.data.as_ptr().add(self.len) as *mut T; std::ptr::write(dst, item); diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs index b15692e90f2f..c5efaddd971b 100644 --- a/arrow/src/compute/kernels/filter.rs +++ b/arrow/src/compute/kernels/filter.rs @@ -44,10 +44,10 @@ enum State { /// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be /// "taken" from an array to be filtered. #[derive(Debug)] -pub(crate) struct SlicesIterator<'a> { +pub struct SlicesIterator<'a> { iter: Enumerate>, state: State, - filter_count: usize, + filter: &'a BooleanArray, remainder_mask: u64, remainder_len: usize, chunk_len: usize, @@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> { } impl<'a> SlicesIterator<'a> { - pub(crate) fn new(filter: &'a BooleanArray) -> Self { + pub fn new(filter: &'a BooleanArray) -> Self { let values = &filter.data_ref().buffers()[0]; - - // this operation is performed before iteration - // because it is fast and allows reserving all the needed memory - let filter_count = values.count_set_bits_offset(filter.offset(), filter.len()); - let chunks = values.bit_chunks(filter.offset(), filter.len()); Self { iter: chunks.iter().enumerate(), state: State::Chunks, - filter_count, + filter, remainder_len: chunks.remainder_len(), chunk_len: chunks.chunk_len(), remainder_mask: chunks.remainder_bits(), @@ -83,6 +78,12 @@ impl<'a> SlicesIterator<'a> { } } + /// Counts the number of set bits in the filter array. + fn filter_count(&self) -> usize { + let values = self.filter.values(); + values.count_set_bits_offset(self.filter.offset(), self.filter.len()) + } + #[inline] fn current_start(&self) -> usize { self.current_chunk * 64 + self.current_bit @@ -193,7 +194,7 @@ impl<'a> Iterator for SlicesIterator<'a> { /// Therefore, it is considered undefined behavior to pass `filter` with null values. pub fn build_filter(filter: &BooleanArray) -> Result { let iter = SlicesIterator::new(filter); - let filter_count = iter.filter_count; + let filter_count = iter.filter_count(); let chunks = iter.collect::>(); Ok(Box::new(move |array: &ArrayData| { @@ -253,7 +254,8 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result { } let iter = SlicesIterator::new(predicate); - match iter.filter_count { + let filter_count = iter.filter_count(); + match filter_count { 0 => { // return empty Ok(new_empty_array(array.data_type())) @@ -266,7 +268,7 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result { _ => { // actually filter let mut mutable = - MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count); + MutableArrayData::new(vec![array.data_ref()], false, filter_count); iter.for_each(|(start, end)| mutable.extend(0, start, end)); let data = mutable.freeze(); Ok(make_array(data)) @@ -599,7 +601,7 @@ mod tests { let filter = BooleanArray::from(filter_values); let iter = SlicesIterator::new(&filter); - let filter_count = iter.filter_count; + let filter_count = iter.filter_count(); let chunks = iter.collect::>(); assert_eq!(chunks, vec![(1, 2)]); @@ -612,7 +614,7 @@ mod tests { let filter = BooleanArray::from(filter_values); let iter = SlicesIterator::new(&filter); - let filter_count = iter.filter_count; + let filter_count = iter.filter_count(); let chunks = iter.collect::>(); assert_eq!(chunks, vec![(0, 1), (2, 64)]); @@ -625,7 +627,7 @@ mod tests { let filter = BooleanArray::from(filter_values); let iter = SlicesIterator::new(&filter); - let filter_count = iter.filter_count; + let filter_count = iter.filter_count(); let chunks = iter.collect::>(); assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]); diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 1e54047bff0c..d66ee237eeca 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true } base64 = { version = "0.13", optional = true } clap = { version = "2.33.3", optional = true } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } +rand = "0.8" [dev-dependencies] criterion = "0.3" @@ -76,3 +77,7 @@ required-features = ["cli"] [[bench]] name = "arrow_writer" harness = false + +[[bench]] +name = "arrow_array_reader" +harness = false \ No newline at end of file diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs new file mode 100644 index 000000000000..6e875128c2a8 --- /dev/null +++ b/parquet/benches/arrow_array_reader.rs @@ -0,0 +1,766 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main, Criterion}; +use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator}; +use parquet::{ + arrow::array_reader::ArrayReader, + basic::Encoding, + column::page::PageIterator, + data_type::{ByteArrayType, Int32Type}, + schema::types::{ColumnDescPtr, SchemaDescPtr}, +}; +use std::{collections::VecDeque, sync::Arc}; + +fn build_test_schema() -> SchemaDescPtr { + use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor}; + let message_type = " + message test_schema { + REQUIRED INT32 mandatory_int32_leaf; + REPEATED Group test_mid_int32 { + OPTIONAL INT32 optional_int32_leaf; + } + REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8); + REPEATED Group test_mid_string { + OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8); + } + } + "; + parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap() +} + +// test data params +const NUM_ROW_GROUPS: usize = 1; +const PAGES_PER_GROUP: usize = 2; +const VALUES_PER_PAGE: usize = 10_000; +const BATCH_SIZE: usize = 8192; + +use rand::{rngs::StdRng, Rng, SeedableRng}; + +pub fn seedable_rng() -> StdRng { + StdRng::seed_from_u64(42) +} + +fn build_plain_encoded_int32_page_iterator( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + null_density: f32, +) -> impl PageIterator + Clone { + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + let mut int32_value = 0; + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + int32_value += 1; + values.push(int32_value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(Encoding::PLAIN, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + + InMemoryPageIterator::new(schema, column_desc, pages) +} + +fn build_dictionary_encoded_int32_page_iterator( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + null_density: f32, +) -> impl PageIterator + Clone { + use parquet::encoding::{DictEncoder, Encoder}; + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + // generate 1% unique values + const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100; + let unique_values = (0..NUM_UNIQUE_VALUES) + .map(|x| (x + 1) as i32) + .collect::>(); + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = VecDeque::new(); + let mem_tracker = Arc::new(parquet::memory::MemTracker::new()); + let mut dict_encoder = + DictEncoder::::new(column_desc.clone(), mem_tracker); + // add data pages + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + // select random value from list of unique values + let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)]; + values.push(int32_value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + let _ = dict_encoder.put(&values); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + page_builder.add_indices(indices); + column_chunk_pages.push_back(page_builder.consume()); + } + // add dictionary page + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = parquet::column::page::Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + column_chunk_pages.push_front(dict_page); + pages.push(column_chunk_pages.into()); + } + + InMemoryPageIterator::new(schema, column_desc, pages) +} + +fn build_plain_encoded_string_page_iterator( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + null_density: f32, +) -> impl PageIterator + Clone { + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + let string_value = + format!("Test value {}, row group: {}, page: {}", k, i, j); + values + .push(parquet::data_type::ByteArray::from(string_value.as_str())); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(Encoding::PLAIN, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + + InMemoryPageIterator::new(schema, column_desc, pages) +} + +fn build_dictionary_encoded_string_page_iterator( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + null_density: f32, +) -> impl PageIterator + Clone { + use parquet::encoding::{DictEncoder, Encoder}; + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + // generate 1% unique values + const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100; + let unique_values = (0..NUM_UNIQUE_VALUES) + .map(|x| format!("Dictionary value {}", x)) + .collect::>(); + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = VecDeque::new(); + let mem_tracker = Arc::new(parquet::memory::MemTracker::new()); + let mut dict_encoder = + DictEncoder::::new(column_desc.clone(), mem_tracker); + // add data pages + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + // select random value from list of unique values + let string_value = + unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str(); + values.push(parquet::data_type::ByteArray::from(string_value)); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + let _ = dict_encoder.put(&values); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + page_builder.add_indices(indices); + column_chunk_pages.push_back(page_builder.consume()); + } + // add dictionary page + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = parquet::column::page::Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + column_chunk_pages.push_front(dict_page); + pages.push(column_chunk_pages.into()); + } + + InMemoryPageIterator::new(schema, column_desc, pages) +} + +fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize { + // test procedure: read data in batches of 8192 until no more data + let mut total_count = 0; + loop { + let array = array_reader.next_batch(BATCH_SIZE); + let array_len = array.unwrap().len(); + total_count += array_len; + if array_len < BATCH_SIZE { + break; + } + } + total_count +} + +fn create_int32_arrow_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> impl ArrayReader { + use parquet::arrow::arrow_array_reader::{ArrowArrayReader, PrimitiveArrayConverter}; + let converter = PrimitiveArrayConverter::::new(); + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap() +} + +fn create_int32_primitive_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> impl ArrayReader { + use parquet::arrow::array_reader::PrimitiveArrayReader; + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap() +} + +fn create_string_arrow_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> impl ArrayReader { + use parquet::arrow::arrow_array_reader::{ArrowArrayReader, StringArrayConverter}; + let converter = StringArrayConverter::new(); + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap() +} + +fn create_string_complex_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> impl ArrayReader { + use parquet::arrow::array_reader::ComplexObjectArrayReader; + use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap() +} + +fn add_benches(c: &mut Criterion) { + const EXPECTED_VALUE_COUNT: usize = + NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE; + let mut group = c.benchmark_group("arrow_array_reader"); + + let mut count: usize = 0; + + let schema = build_test_schema(); + let mandatory_int32_column_desc = schema.column(0); + let optional_int32_column_desc = schema.column(1); + let mandatory_string_column_desc = schema.column(2); + // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc); + let optional_string_column_desc = schema.column(3); + // println!("optional_string_column_desc: {:?}", optional_string_column_desc); + + // primitive / int32 benchmarks + // ============================= + + // int32, plain encoded, no NULLs + let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator( + schema.clone(), + mandatory_int32_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read Int32Array, plain encoded, mandatory, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + plain_int32_no_null_data.clone(), + mandatory_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read Int32Array, plain encoded, mandatory, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_int32_arrow_array_reader( + plain_int32_no_null_data.clone(), + mandatory_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator( + schema.clone(), + optional_int32_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read Int32Array, plain encoded, optional, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + plain_int32_no_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read Int32Array, plain encoded, optional, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_int32_arrow_array_reader( + plain_int32_no_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // int32, plain encoded, half NULLs + let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator( + schema.clone(), + optional_int32_column_desc.clone(), + 0.5, + ); + group.bench_function( + "read Int32Array, plain encoded, optional, half NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + plain_int32_half_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read Int32Array, plain encoded, optional, half NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_int32_arrow_array_reader( + plain_int32_half_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // int32, dictionary encoded, no NULLs + let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( + schema.clone(), + mandatory_int32_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read Int32Array, dictionary encoded, mandatory, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + dictionary_int32_no_null_data.clone(), + mandatory_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read Int32Array, dictionary encoded, mandatory, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_int32_arrow_array_reader( + dictionary_int32_no_null_data.clone(), + mandatory_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( + schema.clone(), + optional_int32_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read Int32Array, dictionary encoded, optional, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + dictionary_int32_no_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read Int32Array, dictionary encoded, optional, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_int32_arrow_array_reader( + dictionary_int32_no_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // int32, dictionary encoded, half NULLs + let dictionary_int32_half_null_data = build_dictionary_encoded_int32_page_iterator( + schema.clone(), + optional_int32_column_desc.clone(), + 0.5, + ); + group.bench_function( + "read Int32Array, dictionary encoded, optional, half NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + dictionary_int32_half_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read Int32Array, dictionary encoded, optional, half NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_int32_arrow_array_reader( + dictionary_int32_half_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // string benchmarks + //============================== + + // string, plain encoded, no NULLs + let plain_string_no_null_data = build_plain_encoded_string_page_iterator( + schema.clone(), + mandatory_string_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read StringArray, plain encoded, mandatory, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_string_complex_array_reader( + plain_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read StringArray, plain encoded, mandatory, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_arrow_array_reader( + plain_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + let plain_string_no_null_data = build_plain_encoded_string_page_iterator( + schema.clone(), + optional_string_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read StringArray, plain encoded, optional, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_string_complex_array_reader( + plain_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read StringArray, plain encoded, optional, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_arrow_array_reader( + plain_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // string, plain encoded, half NULLs + let plain_string_half_null_data = build_plain_encoded_string_page_iterator( + schema.clone(), + optional_string_column_desc.clone(), + 0.5, + ); + group.bench_function( + "read StringArray, plain encoded, optional, half NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_string_complex_array_reader( + plain_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read StringArray, plain encoded, optional, half NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_arrow_array_reader( + plain_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // string, dictionary encoded, no NULLs + let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( + schema.clone(), + mandatory_string_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read StringArray, dictionary encoded, mandatory, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_string_complex_array_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read StringArray, dictionary encoded, mandatory, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_arrow_array_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( + schema.clone(), + optional_string_column_desc.clone(), + 0.0, + ); + group.bench_function( + "read StringArray, dictionary encoded, optional, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_string_complex_array_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read StringArray, dictionary encoded, optional, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_arrow_array_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + // string, dictionary encoded, half NULLs + let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator( + schema, + optional_string_column_desc.clone(), + 0.5, + ); + group.bench_function( + "read StringArray, dictionary encoded, optional, half NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_string_complex_array_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.bench_function( + "read StringArray, dictionary encoded, optional, half NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_arrow_array_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }) + }, + ); + assert_eq!(count, EXPECTED_VALUE_COUNT); + + group.finish(); +} + +criterion_group!(benches, add_benches); +criterion_main!(benches); diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f54e446192b1..bd57cf316b04 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -60,7 +60,7 @@ use crate::arrow::converter::{ Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter, - LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, + LargeUtf8ArrayConverter, LargeUtf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -570,7 +570,7 @@ where T: DataType, C: Converter>, ArrayRef> + 'static, { - fn new( + pub fn new( pages: Box, column_desc: ColumnDescPtr, converter: C, @@ -1499,12 +1499,12 @@ impl<'a> ArrayReaderBuilder { arrow_type, )?)) } else { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new( - page_iterator, + use crate::arrow::arrow_array_reader::{ + ArrowArrayReader, StringArrayConverter, + }; + let converter = StringArrayConverter::new(); + Ok(Box::new(ArrowArrayReader::try_new( + *page_iterator, column_desc, converter, arrow_type, @@ -1728,7 +1728,7 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { use super::*; - use crate::arrow::converter::Utf8Converter; + use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::{Page, PageReader}; diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs new file mode 100644 index 000000000000..c06d872533ac --- /dev/null +++ b/parquet/src/arrow/arrow_array_reader.rs @@ -0,0 +1,1562 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::array_reader::ArrayReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::Encoding; +use crate::errors::{ParquetError, Result}; +use crate::{ + column::page::{Page, PageIterator}, + memory::ByteBufferPtr, + schema::types::{ColumnDescPtr, ColumnDescriptor}, +}; +use arrow::{ + array::{ArrayRef, Int16Array}, + buffer::MutableBuffer, + datatypes::{DataType as ArrowType, ToByteSlice}, +}; +use std::{any::Any, collections::VecDeque, marker::PhantomData}; +use std::{cell::RefCell, rc::Rc}; + +struct UnzipIter { + shared_state: Rc>, + select_item_buffer: fn(&mut State) -> &mut VecDeque, + consume_source_item: fn(source_item: Source, state: &mut State) -> Target, +} + +impl UnzipIter { + fn new( + shared_state: Rc>, + item_buffer_selector: fn(&mut State) -> &mut VecDeque, + source_item_consumer: fn(source_item: Source, state: &mut State) -> Target, + ) -> Self { + Self { + shared_state, + select_item_buffer: item_buffer_selector, + consume_source_item: source_item_consumer, + } + } +} + +trait UnzipIterState { + type SourceIter: Iterator; + fn source_iter(&mut self) -> &mut Self::SourceIter; +} + +impl> Iterator + for UnzipIter +{ + type Item = Target; + + fn next(&mut self) -> Option { + let mut inner = self.shared_state.borrow_mut(); + // try to get one from the stored data + (self.select_item_buffer)(&mut *inner) + .pop_front() + .or_else(|| + // nothing stored, we need a new element. + inner.source_iter().next().map(|s| { + (self.consume_source_item)(s, &mut inner) + })) + } +} + +struct PageBufferUnzipIterState { + iter: It, + value_iter_buffer: VecDeque, + def_level_iter_buffer: VecDeque, + rep_level_iter_buffer: VecDeque, +} + +impl> UnzipIterState<(V, L, L)> + for PageBufferUnzipIterState +{ + type SourceIter = It; + + #[inline] + fn source_iter(&mut self) -> &mut Self::SourceIter { + &mut self.iter + } +} + +type ValueUnzipIter = + UnzipIter<(V, L, L), V, PageBufferUnzipIterState>; +type LevelUnzipIter = + UnzipIter<(V, L, L), L, PageBufferUnzipIterState>; +type PageUnzipResult = ( + ValueUnzipIter, + LevelUnzipIter, + LevelUnzipIter, +); + +fn unzip_iter>(it: It) -> PageUnzipResult { + let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState { + iter: it, + value_iter_buffer: VecDeque::new(), + def_level_iter_buffer: VecDeque::new(), + rep_level_iter_buffer: VecDeque::new(), + })); + + let value_iter = UnzipIter::new( + shared_data.clone(), + |state| &mut state.value_iter_buffer, + |(v, d, r), state| { + state.def_level_iter_buffer.push_back(d); + state.rep_level_iter_buffer.push_back(r); + v + }, + ); + + let def_level_iter = UnzipIter::new( + shared_data.clone(), + |state| &mut state.def_level_iter_buffer, + |(v, d, r), state| { + state.value_iter_buffer.push_back(v); + state.rep_level_iter_buffer.push_back(r); + d + }, + ); + + let rep_level_iter = UnzipIter::new( + shared_data, + |state| &mut state.rep_level_iter_buffer, + |(v, d, r), state| { + state.value_iter_buffer.push_back(v); + state.def_level_iter_buffer.push_back(d); + r + }, + ); + + (value_iter, def_level_iter, rep_level_iter) +} + +pub trait ArrayConverter { + fn convert_value_bytes( + &self, + value_decoder: &mut impl ValueDecoder, + num_values: usize, + ) -> Result; +} + +pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> { + column_desc: ColumnDescPtr, + data_type: ArrowType, + def_level_decoder: Box, + rep_level_decoder: Box, + value_decoder: Box, + last_def_levels: Option, + last_rep_levels: Option, + array_converter: C, +} + +pub(crate) struct ColumnChunkContext { + dictionary_values: Option>, +} + +impl ColumnChunkContext { + fn new() -> Self { + Self { + dictionary_values: None, + } + } + + fn set_dictionary(&mut self, dictionary_values: Vec) { + self.dictionary_values = Some(dictionary_values); + } +} + +type PageDecoderTuple = ( + Box, + Box, + Box, +); + +impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { + pub fn try_new( + column_chunk_iterator: P, + column_desc: ColumnDescPtr, + array_converter: C, + arrow_type: Option, + ) -> Result { + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + type PageIteratorItem = Result<(Page, Rc>)>; + let page_iter = column_chunk_iterator + // build iterator of pages across column chunks + .flat_map(|x| -> Box> { + // attach column chunk context + let context = Rc::new(RefCell::new(ColumnChunkContext::new())); + match x { + Ok(page_reader) => Box::new( + page_reader.map(move |pr| pr.map(|p| (p, context.clone()))), + ), + // errors from reading column chunks / row groups are propagated to page level + Err(e) => Box::new(std::iter::once(Err(e))), + } + }); + // capture a clone of column_desc in closure so that it can outlive current function + let map_page_fn_factory = |column_desc: ColumnDescPtr| { + move |x: Result<(Page, Rc>)>| { + x.and_then(|(page, context)| { + Self::map_page(page, context, column_desc.as_ref()) + }) + } + }; + let map_page_fn = map_page_fn_factory(column_desc.clone()); + // map page iterator into tuple of buffer iterators for (values, def levels, rep levels) + // errors from lower levels are surfaced through the value decoder iterator + let decoder_iter = page_iter.map(map_page_fn).map(|x| match x { + Ok(iter_tuple) => iter_tuple, + // errors from reading pages are propagated to decoder iterator level + Err(e) => Self::map_page_error(e), + }); + // split tuple iterator into separate iterators for (values, def levels, rep levels) + let (value_iter, def_level_iter, rep_level_iter) = unzip_iter(decoder_iter); + + Ok(Self { + column_desc, + data_type, + def_level_decoder: Box::new(CompositeValueDecoder::new(def_level_iter)), + rep_level_decoder: Box::new(CompositeValueDecoder::new(rep_level_iter)), + value_decoder: Box::new(CompositeValueDecoder::new(value_iter)), + last_def_levels: None, + last_rep_levels: None, + array_converter, + }) + } + + #[inline] + fn def_levels_available(column_desc: &ColumnDescriptor) -> bool { + column_desc.max_def_level() > 0 + } + + #[inline] + fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool { + column_desc.max_rep_level() > 0 + } + + fn map_page_error(err: ParquetError) -> PageDecoderTuple { + ( + Box::new(::once(Err(err.clone()))), + Box::new(::once(Err(err.clone()))), + Box::new(::once(Err(err))), + ) + } + + // Split Result into Result<(Iterator, Iterator, Iterator)> + // this method could fail, e.g. if the page encoding is not supported + fn map_page( + page: Page, + column_chunk_context: Rc>, + column_desc: &ColumnDescriptor, + ) -> Result { + use crate::encodings::levels::LevelDecoder; + match page { + Page::DictionaryPage { + buf, + num_values, + encoding, + .. + } => { + let mut column_chunk_context = column_chunk_context.borrow_mut(); + if column_chunk_context.dictionary_values.is_some() { + return Err(general_err!( + "Column chunk cannot have more than one dictionary" + )); + } + // create plain decoder for dictionary values + let mut dict_decoder = Self::get_dictionary_page_decoder( + buf, + num_values as usize, + encoding, + column_desc, + )?; + // decode and cache dictionary values + let dictionary_values = dict_decoder.read_dictionary_values()?; + column_chunk_context.set_dictionary(dictionary_values); + + // a dictionary page doesn't return any values + Ok(( + Box::new(::empty()), + Box::new(::empty()), + Box::new(::empty()), + )) + } + Page::DataPage { + buf, + num_values, + encoding, + def_level_encoding, + rep_level_encoding, + statistics: _, + } => { + let mut buffer_ptr = buf; + // create rep level decoder iterator + let rep_level_iter: Box = + if Self::rep_levels_available(&column_desc) { + let mut rep_decoder = LevelDecoder::v1( + rep_level_encoding, + column_desc.max_rep_level(), + ); + let rep_level_byte_len = + rep_decoder.set_data(num_values as usize, buffer_ptr.all()); + // advance buffer pointer + buffer_ptr = buffer_ptr.start_from(rep_level_byte_len); + Box::new(LevelValueDecoder::new(rep_decoder)) + } else { + Box::new(::once(Err(ParquetError::General( + "rep levels are not available".to_string(), + )))) + }; + // create def level decoder iterator + let def_level_iter: Box = + if Self::def_levels_available(&column_desc) { + let mut def_decoder = LevelDecoder::v1( + def_level_encoding, + column_desc.max_def_level(), + ); + let def_levels_byte_len = + def_decoder.set_data(num_values as usize, buffer_ptr.all()); + // advance buffer pointer + buffer_ptr = buffer_ptr.start_from(def_levels_byte_len); + Box::new(LevelValueDecoder::new(def_decoder)) + } else { + Box::new(::once(Err(ParquetError::General( + "def levels are not available".to_string(), + )))) + }; + // create value decoder iterator + let value_iter = Self::get_value_decoder( + buffer_ptr, + num_values as usize, + encoding, + column_desc, + column_chunk_context, + )?; + Ok((value_iter, def_level_iter, rep_level_iter)) + } + Page::DataPageV2 { + buf, + num_values, + encoding, + num_nulls: _, + num_rows: _, + def_levels_byte_len, + rep_levels_byte_len, + is_compressed: _, + statistics: _, + } => { + let mut offset = 0; + // create rep level decoder iterator + let rep_level_iter: Box = + if Self::rep_levels_available(&column_desc) { + let rep_levels_byte_len = rep_levels_byte_len as usize; + let mut rep_decoder = + LevelDecoder::v2(column_desc.max_rep_level()); + rep_decoder.set_data_range( + num_values as usize, + &buf, + offset, + rep_levels_byte_len, + ); + offset += rep_levels_byte_len; + Box::new(LevelValueDecoder::new(rep_decoder)) + } else { + Box::new(::once(Err(ParquetError::General( + "rep levels are not available".to_string(), + )))) + }; + // create def level decoder iterator + let def_level_iter: Box = + if Self::def_levels_available(&column_desc) { + let def_levels_byte_len = def_levels_byte_len as usize; + let mut def_decoder = + LevelDecoder::v2(column_desc.max_def_level()); + def_decoder.set_data_range( + num_values as usize, + &buf, + offset, + def_levels_byte_len, + ); + offset += def_levels_byte_len; + Box::new(LevelValueDecoder::new(def_decoder)) + } else { + Box::new(::once(Err(ParquetError::General( + "def levels are not available".to_string(), + )))) + }; + + // create value decoder iterator + let values_buffer = buf.start_from(offset); + let value_iter = Self::get_value_decoder( + values_buffer, + num_values as usize, + encoding, + column_desc, + column_chunk_context, + )?; + Ok((value_iter, def_level_iter, rep_level_iter)) + } + } + } + + fn get_dictionary_page_decoder( + values_buffer: ByteBufferPtr, + num_values: usize, + mut encoding: Encoding, + column_desc: &ColumnDescriptor, + ) -> Result> { + if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY + } + + if encoding == Encoding::RLE_DICTIONARY { + Ok( + Self::get_plain_value_decoder(values_buffer, num_values, column_desc) + .into_dictionary_decoder(), + ) + } else { + Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )) + } + } + + fn get_value_decoder( + values_buffer: ByteBufferPtr, + num_values: usize, + mut encoding: Encoding, + column_desc: &ColumnDescriptor, + column_chunk_context: Rc>, + ) -> Result> { + if encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY; + } + + match encoding { + Encoding::PLAIN => { + Ok( + Self::get_plain_value_decoder(values_buffer, num_values, column_desc) + .into_value_decoder(), + ) + } + Encoding::RLE_DICTIONARY => { + if column_chunk_context.borrow().dictionary_values.is_some() { + let value_bit_len = Self::get_column_physical_bit_len(column_desc); + let dictionary_decoder: Box = if value_bit_len == 0 + { + Box::new(VariableLenDictionaryDecoder::new( + column_chunk_context, + values_buffer, + num_values, + )) + } else { + Box::new(FixedLenDictionaryDecoder::new( + column_chunk_context, + values_buffer, + num_values, + value_bit_len, + )) + }; + Ok(dictionary_decoder) + } else { + Err(general_err!("Dictionary values have not been initialized.")) + } + } + // Encoding::RLE => Box::new(RleValueDecoder::new()), + // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()), + // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()), + // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()), + e => return Err(nyi_err!("Encoding {} is not supported", e)), + } + } + + fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize { + use crate::basic::Type as PhysicalType; + // parquet only supports a limited number of physical types + // later converters cast to a more specific arrow / logical type if necessary + match column_desc.physical_type() { + PhysicalType::BOOLEAN => 1, + PhysicalType::INT32 | PhysicalType::FLOAT => 32, + PhysicalType::INT64 | PhysicalType::DOUBLE => 64, + PhysicalType::INT96 => 96, + PhysicalType::BYTE_ARRAY => 0, + PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8, + } + } + + fn get_plain_value_decoder( + values_buffer: ByteBufferPtr, + num_values: usize, + column_desc: &ColumnDescriptor, + ) -> Box { + let value_bit_len = Self::get_column_physical_bit_len(column_desc); + if value_bit_len == 0 { + Box::new(VariableLenPlainDecoder::new(values_buffer, num_values)) + } else { + Box::new(FixedLenPlainDecoder::new( + values_buffer, + num_values, + value_bit_len, + )) + } + } + + fn build_level_array( + level_decoder: &mut impl ValueDecoder, + batch_size: usize, + ) -> Result { + use arrow::datatypes::Int16Type; + let level_converter = PrimitiveArrayConverter::::new(); + let array_data = + level_converter.convert_value_bytes(level_decoder, batch_size)?; + Ok(Int16Array::from(array_data)) + } +} + +impl ArrayReader for ArrowArrayReader<'static, C> { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + if Self::rep_levels_available(&self.column_desc) { + // read rep levels if available + let rep_level_array = + Self::build_level_array(&mut self.rep_level_decoder, batch_size)?; + self.last_rep_levels = Some(rep_level_array); + } + + // check if def levels are available + let (values_to_read, null_bitmap_array) = + if !Self::def_levels_available(&self.column_desc) { + // if no def levels - just read (up to) batch_size values + (batch_size, None) + } else { + // if def levels are available - they determine how many values will be read + // decode def levels, return first error if any + let def_level_array = + Self::build_level_array(&mut self.def_level_decoder, batch_size)?; + let def_level_count = def_level_array.len(); + // use eq_scalar to efficiently build null bitmap array from def levels + let null_bitmap_array = arrow::compute::eq_scalar( + &def_level_array, + self.column_desc.max_def_level(), + )?; + self.last_def_levels = Some(def_level_array); + // efficiently calculate values to read + let values_to_read = null_bitmap_array + .values() + .count_set_bits_offset(0, def_level_count); + let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() { + Some(null_bitmap_array) + } else { + // shortcut if no NULLs + None + }; + (values_to_read, maybe_null_bitmap) + }; + + // read a batch of values + // converter only creates a no-null / all value array data + let mut value_array_data = self + .array_converter + .convert_value_bytes(&mut self.value_decoder, values_to_read)?; + + if let Some(null_bitmap_array) = null_bitmap_array { + // Only if def levels are available - insert null values efficiently using MutableArrayData. + // This will require value bytes to be copied again, but converter requirements are reduced. + // With a small number of NULLs, this will only be a few copies of large byte sequences. + let actual_batch_size = null_bitmap_array.len(); + // use_nulls is false, because null_bitmap_array is already calculated and re-used + let mut mutable = arrow::array::MutableArrayData::new( + vec![&value_array_data], + false, + actual_batch_size, + ); + // SlicesIterator slices only the true values, NULLs are inserted to fill any gaps + arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each( + |(start, end)| { + // the gap needs to be filled with NULLs + if start > mutable.len() { + let nulls_to_add = start - mutable.len(); + mutable.extend_nulls(nulls_to_add); + } + // fill values, adjust start and end with NULL count so far + let nulls_added = mutable.null_count(); + mutable.extend(0, start - nulls_added, end - nulls_added); + }, + ); + // any remaining part is NULLs + if mutable.len() < actual_batch_size { + let nulls_to_add = actual_batch_size - mutable.len(); + mutable.extend_nulls(nulls_to_add); + } + + value_array_data = mutable + .into_builder() + .null_bit_buffer(null_bitmap_array.values().clone()) + .build(); + } + let mut array = arrow::array::make_array(value_array_data); + if array.data_type() != &self.data_type { + // cast array to self.data_type if necessary + array = arrow::compute::cast(&array, &self.data_type)? + } + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.last_def_levels.as_ref().map(|x| x.values()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.last_rep_levels.as_ref().map(|x| x.values()) + } +} + +use crate::encodings::rle::RleDecoder; + +pub trait ValueDecoder { + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result; +} + +trait DictionaryValueDecoder { + fn read_dictionary_values(&mut self) -> Result>; +} + +trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder { + fn into_value_decoder(self: Box) -> Box; + fn into_dictionary_decoder(self: Box) -> Box; +} + +impl PlainValueDecoder for T +where + T: ValueDecoder + DictionaryValueDecoder + 'static, +{ + fn into_value_decoder(self: Box) -> Box { + self + } + + fn into_dictionary_decoder(self: Box) -> Box { + self + } +} + +impl dyn ValueDecoder { + fn empty() -> impl ValueDecoder { + SingleValueDecoder::new(Ok(0)) + } + + fn once(value: Result) -> impl ValueDecoder { + SingleValueDecoder::new(value) + } +} + +impl ValueDecoder for Box { + #[inline] + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + self.as_mut().read_value_bytes(num_values, read_bytes) + } +} + +struct SingleValueDecoder { + value: Result, +} + +impl SingleValueDecoder { + fn new(value: Result) -> Self { + Self { value } + } +} + +impl ValueDecoder for SingleValueDecoder { + fn read_value_bytes( + &mut self, + _num_values: usize, + _read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + self.value.clone() + } +} + +struct CompositeValueDecoder>> { + current_decoder: Option>, + decoder_iter: I, +} + +impl>> CompositeValueDecoder { + fn new(mut decoder_iter: I) -> Self { + let current_decoder = decoder_iter.next(); + Self { + current_decoder, + decoder_iter, + } + } +} + +impl>> ValueDecoder + for CompositeValueDecoder +{ + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + let mut values_to_read = num_values; + while values_to_read > 0 { + let value_decoder = match self.current_decoder.as_mut() { + Some(d) => d, + // no more decoders + None => break, + }; + while values_to_read > 0 { + let values_read = + value_decoder.read_value_bytes(values_to_read, read_bytes)?; + if values_read > 0 { + values_to_read -= values_read; + } else { + // no more values in current decoder + self.current_decoder = self.decoder_iter.next(); + break; + } + } + } + + Ok(num_values - values_to_read) + } +} + +struct LevelValueDecoder { + level_decoder: crate::encodings::levels::LevelDecoder, + level_value_buffer: Vec, +} + +impl LevelValueDecoder { + fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self { + Self { + level_decoder, + level_value_buffer: vec![0i16; 2048], + } + } +} + +impl ValueDecoder for LevelValueDecoder { + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + let value_size = std::mem::size_of::(); + let mut total_values_read = 0; + while total_values_read < num_values { + let values_to_read = std::cmp::min( + num_values - total_values_read, + self.level_value_buffer.len(), + ); + let values_read = match self + .level_decoder + .get(&mut self.level_value_buffer[..values_to_read]) + { + Ok(values_read) => values_read, + Err(e) => return Err(e), + }; + if values_read > 0 { + let level_value_bytes = + &self.level_value_buffer.to_byte_slice()[..values_read * value_size]; + read_bytes(level_value_bytes, values_read); + total_values_read += values_read; + } else { + break; + } + } + Ok(total_values_read) + } +} + +pub(crate) struct FixedLenPlainDecoder { + data: ByteBufferPtr, + num_values: usize, + value_bit_len: usize, +} + +impl FixedLenPlainDecoder { + pub(crate) fn new( + data: ByteBufferPtr, + num_values: usize, + value_bit_len: usize, + ) -> Self { + Self { + data, + num_values, + value_bit_len, + } + } +} + +impl DictionaryValueDecoder for FixedLenPlainDecoder { + fn read_dictionary_values(&mut self) -> Result> { + let value_byte_len = self.value_bit_len / 8; + let available_values = self.data.len() / value_byte_len; + let values_to_read = std::cmp::min(available_values, self.num_values); + let byte_len = values_to_read * value_byte_len; + let values = vec![self.data.range(0, byte_len)]; + self.num_values = 0; + self.data.set_range(self.data.start(), 0); + Ok(values) + } +} + +impl ValueDecoder for FixedLenPlainDecoder { + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + let available_values = self.data.len() * 8 / self.value_bit_len; + if available_values > 0 { + let values_to_read = std::cmp::min(available_values, num_values); + let byte_len = values_to_read * self.value_bit_len / 8; + read_bytes(&self.data.data()[..byte_len], values_to_read); + self.data + .set_range(self.data.start() + byte_len, self.data.len() - byte_len); + Ok(values_to_read) + } else { + Ok(0) + } + } +} + +pub(crate) struct VariableLenPlainDecoder { + data: ByteBufferPtr, + num_values: usize, + position: usize, +} + +impl VariableLenPlainDecoder { + pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self { + Self { + data, + num_values, + position: 0, + } + } +} + +impl DictionaryValueDecoder for VariableLenPlainDecoder { + fn read_dictionary_values(&mut self) -> Result> { + const LEN_SIZE: usize = std::mem::size_of::(); + let data = self.data.data(); + let data_len = data.len(); + let values_to_read = self.num_values; + let mut values = Vec::with_capacity(values_to_read); + let mut values_read = 0; + while self.position < data_len && values_read < values_to_read { + let len: usize = + read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize; + self.position += LEN_SIZE; + if data_len < self.position + len { + return Err(eof_err!("Not enough bytes to decode")); + } + values.push(self.data.range(self.position, len)); + self.position += len; + values_read += 1; + } + self.num_values -= values_read; + Ok(values) + } +} + +impl ValueDecoder for VariableLenPlainDecoder { + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + const LEN_SIZE: usize = std::mem::size_of::(); + let data = self.data.data(); + let data_len = data.len(); + let values_to_read = std::cmp::min(self.num_values, num_values); + let mut values_read = 0; + while self.position < data_len && values_read < values_to_read { + let len: usize = + read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize; + self.position += LEN_SIZE; + if data_len < self.position + len { + return Err(eof_err!("Not enough bytes to decode")); + } + read_bytes(&data[self.position..][..len], 1); + self.position += len; + values_read += 1; + } + self.num_values -= values_read; + Ok(values_read) + } +} + +pub(crate) struct FixedLenDictionaryDecoder { + context_ref: Rc>, + key_data_bufer: ByteBufferPtr, + num_values: usize, + rle_decoder: RleDecoder, + value_byte_len: usize, + keys_buffer: Vec, +} + +impl FixedLenDictionaryDecoder { + pub(crate) fn new( + column_chunk_context: Rc>, + key_data_bufer: ByteBufferPtr, + num_values: usize, + value_bit_len: usize, + ) -> Self { + assert!( + value_bit_len % 8 == 0, + "value_bit_size must be a multiple of 8" + ); + // First byte in `data` is bit width + let bit_width = key_data_bufer.data()[0]; + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(key_data_bufer.start_from(1)); + + Self { + context_ref: column_chunk_context, + key_data_bufer, + num_values, + rle_decoder, + value_byte_len: value_bit_len / 8, + keys_buffer: vec![0; 2048], + } + } +} + +impl ValueDecoder for FixedLenDictionaryDecoder { + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + if self.num_values == 0 { + return Ok(0); + } + let context = self.context_ref.borrow(); + let values = context.dictionary_values.as_ref().unwrap(); + let input_value_bytes = values[0].data(); + // read no more than available values or requested values + let values_to_read = std::cmp::min(self.num_values, num_values); + let mut values_read = 0; + while values_read < values_to_read { + // read values in batches of up to self.keys_buffer.len() + let keys_to_read = + std::cmp::min(values_to_read - values_read, self.keys_buffer.len()); + let keys_read = match self + .rle_decoder + .get_batch(&mut self.keys_buffer[..keys_to_read]) + { + Ok(keys_read) => keys_read, + Err(e) => return Err(e), + }; + if keys_read == 0 { + self.num_values = 0; + return Ok(values_read); + } + for i in 0..keys_read { + let key = self.keys_buffer[i] as usize; + read_bytes( + &input_value_bytes[key * self.value_byte_len..] + [..self.value_byte_len], + 1, + ); + } + values_read += keys_read; + } + self.num_values -= values_read; + Ok(values_read) + } +} + +pub(crate) struct VariableLenDictionaryDecoder { + context_ref: Rc>, + key_data_bufer: ByteBufferPtr, + num_values: usize, + rle_decoder: RleDecoder, + keys_buffer: Vec, +} + +impl VariableLenDictionaryDecoder { + pub(crate) fn new( + column_chunk_context: Rc>, + key_data_bufer: ByteBufferPtr, + num_values: usize, + ) -> Self { + // First byte in `data` is bit width + let bit_width = key_data_bufer.data()[0]; + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(key_data_bufer.start_from(1)); + + Self { + context_ref: column_chunk_context, + key_data_bufer, + num_values, + rle_decoder, + keys_buffer: vec![0; 2048], + } + } +} + +impl ValueDecoder for VariableLenDictionaryDecoder { + fn read_value_bytes( + &mut self, + num_values: usize, + read_bytes: &mut dyn FnMut(&[u8], usize), + ) -> Result { + if self.num_values == 0 { + return Ok(0); + } + let context = self.context_ref.borrow(); + let values = context.dictionary_values.as_ref().unwrap(); + let values_to_read = std::cmp::min(self.num_values, num_values); + let mut values_read = 0; + while values_read < values_to_read { + // read values in batches of up to self.keys_buffer.len() + let keys_to_read = + std::cmp::min(values_to_read - values_read, self.keys_buffer.len()); + let keys_read = match self + .rle_decoder + .get_batch(&mut self.keys_buffer[..keys_to_read]) + { + Ok(keys_read) => keys_read, + Err(e) => return Err(e), + }; + if keys_read == 0 { + self.num_values = 0; + return Ok(values_read); + } + for i in 0..keys_read { + let key = self.keys_buffer[i] as usize; + read_bytes(values[key].data(), 1); + } + values_read += keys_read; + } + self.num_values -= values_read; + Ok(values_read) + } +} + +use arrow::datatypes::ArrowPrimitiveType; + +pub struct PrimitiveArrayConverter { + _phantom_data: PhantomData, +} + +impl PrimitiveArrayConverter { + pub fn new() -> Self { + Self { + _phantom_data: PhantomData, + } + } +} + +impl ArrayConverter for PrimitiveArrayConverter { + fn convert_value_bytes( + &self, + value_decoder: &mut impl ValueDecoder, + num_values: usize, + ) -> Result { + let value_size = T::get_byte_width(); + let values_byte_capacity = num_values * value_size; + let mut values_buffer = MutableBuffer::new(values_byte_capacity); + + value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| { + values_buffer.extend_from_slice(value_bytes); + })?; + + // calculate actual data_len, which may be different from the iterator's upper bound + let value_count = values_buffer.len() / value_size; + let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE) + .len(value_count) + .add_buffer(values_buffer.into()) + .build(); + Ok(array_data) + } +} + +pub struct StringArrayConverter {} + +impl StringArrayConverter { + pub fn new() -> Self { + Self {} + } +} + +impl ArrayConverter for StringArrayConverter { + fn convert_value_bytes( + &self, + value_decoder: &mut impl ValueDecoder, + num_values: usize, + ) -> Result { + use arrow::datatypes::ArrowNativeType; + let offset_size = std::mem::size_of::(); + let mut offsets_buffer = MutableBuffer::new((num_values + 1) * offset_size); + // allocate initial capacity of 1 byte for each item + let values_byte_capacity = num_values; + let mut values_buffer = MutableBuffer::new(values_byte_capacity); + + let mut length_so_far = i32::default(); + offsets_buffer.push(length_so_far); + + value_decoder.read_value_bytes(num_values, &mut |value_bytes, values_read| { + debug_assert_eq!( + values_read, 1, + "offset length value buffers can only contain bytes for a single value" + ); + length_so_far += + ::from_usize(value_bytes.len()).unwrap(); + // this should be safe because a ValueDecoder should not read more than num_values + unsafe { + offsets_buffer.push_unchecked(length_so_far); + } + values_buffer.extend_from_slice(value_bytes); + })?; + // calculate actual data_len, which may be different from the iterator's upper bound + let data_len = (offsets_buffer.len() / offset_size) - 1; + let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8) + .len(data_len) + .add_buffer(offsets_buffer.into()) + .add_buffer(values_buffer.into()) + .build(); + Ok(array_data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::column::page::Page; + use crate::data_type::ByteArray; + use crate::data_type::ByteArrayType; + use crate::schema::parser::parse_message_type; + use crate::schema::types::SchemaDescriptor; + use crate::util::test_common::page_util::{ + DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, + }; + use crate::{ + basic::Encoding, column::page::PageReader, schema::types::SchemaDescPtr, + }; + use arrow::array::{PrimitiveArray, StringArray}; + use arrow::datatypes::Int32Type as ArrowInt32; + use rand::{distributions::uniform::SampleUniform, thread_rng, Rng}; + use std::sync::Arc; + + /// Iterator for testing reading empty columns + struct EmptyPageIterator { + schema: SchemaDescPtr, + } + + impl EmptyPageIterator { + fn new(schema: SchemaDescPtr) -> Self { + EmptyPageIterator { schema } + } + } + + impl Iterator for EmptyPageIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + None + } + } + + impl PageIterator for EmptyPageIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.schema.column(0)) + } + } + + #[test] + fn test_array_reader_empty_pages() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + let page_iterator = EmptyPageIterator::new(schema); + + let converter = PrimitiveArrayConverter::::new(); + let mut array_reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) + .unwrap(); + + // expect no values to be read + let array = array_reader.next_batch(50).unwrap(); + assert!(array.is_empty()); + } + + fn make_column_chunks( + column_desc: ColumnDescPtr, + encoding: Encoding, + num_levels: usize, + min_value: T::T, + max_value: T::T, + def_levels: &mut Vec, + rep_levels: &mut Vec, + values: &mut Vec, + page_lists: &mut Vec>, + use_v2: bool, + num_chunks: usize, + ) where + T::T: PartialOrd + SampleUniform + Copy, + { + for _i in 0..num_chunks { + let mut pages = VecDeque::new(); + let mut data = Vec::new(); + let mut page_def_levels = Vec::new(); + let mut page_rep_levels = Vec::new(); + + crate::util::test_common::make_pages::( + column_desc.clone(), + encoding, + 1, + num_levels, + min_value, + max_value, + &mut page_def_levels, + &mut page_rep_levels, + &mut data, + &mut pages, + use_v2, + ); + + def_levels.append(&mut page_def_levels); + rep_levels.append(&mut page_rep_levels); + values.append(&mut data); + page_lists.push(Vec::from(pages)); + } + } + + #[test] + fn test_primitive_array_reader_data() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = + InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); + + let converter = PrimitiveArrayConverter::::new(); + let mut array_reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) + .unwrap(); + + // Read first 50 values, which are all from the first column chunk + let array = array_reader.next_batch(50).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from(data[0..50].to_vec()), + array + ); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from(data[50..150].to_vec()), + array + ); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from(data[150..200].to_vec()), + array + ); + } + } + + #[test] + fn test_primitive_array_reader_def_and_rep_levels() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL INT32 leaf; + } + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut def_levels = Vec::new(); + let mut rep_levels = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut def_levels, + &mut rep_levels, + &mut Vec::new(), + &mut page_lists, + true, + 2, + ); + + let page_iterator = + InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); + + let converter = PrimitiveArrayConverter::::new(); + let mut array_reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) + .unwrap(); + + let mut accu_len: usize = 0; + + // Read first 50 values, which are all from the first column chunk + let array = array_reader.next_batch(50).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + + assert_eq!(accu_len + array.len(), 200); + } + } + + #[test] + fn test_arrow_array_reader_string() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let num_pages = 2; + let values_per_page = 100; + let str_base = "Hello World"; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + + assert_eq!(max_def_level, 2); + assert_eq!(max_rep_level, 1); + + let mut rng = thread_rng(); + let mut pages: Vec> = Vec::new(); + + let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); + let mut def_levels = Vec::with_capacity(num_pages * values_per_page); + let mut all_values = Vec::with_capacity(num_pages * values_per_page); + + for i in 0..num_pages { + let mut values = Vec::with_capacity(values_per_page); + + for _ in 0..values_per_page { + let def_level = rng.gen_range(0..max_def_level + 1); + let rep_level = rng.gen_range(0..max_rep_level + 1); + if def_level == max_def_level { + let len = rng.gen_range(1..str_base.len()); + let slice = &str_base[..len]; + values.push(ByteArray::from(slice)); + all_values.push(Some(slice.to_string())); + } else { + all_values.push(None) + } + rep_levels.push(rep_level); + def_levels.push(def_level) + } + + let range = i * values_per_page..(i + 1) * values_per_page; + let mut pb = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + + pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); + pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); + pb.add_values::(Encoding::PLAIN, values.as_slice()); + + let data_page = pb.consume(); + pages.push(vec![data_page]); + } + + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + let converter = StringArrayConverter::new(); + let mut array_reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) + .unwrap(); + + let mut accu_len: usize = 0; + + let array = array_reader.next_batch(values_per_page / 2).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, + // and the last values_per_page/2 ones are from the second column chunk + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + let strings = array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_valid(i) { + assert_eq!( + all_values[i + accu_len].as_ref().unwrap().as_str(), + strings.value(i) + ) + } else { + assert_eq!(all_values[i + accu_len], None) + } + } + accu_len += array.len(); + + // Try to read values_per_page values, however there are only values_per_page/2 values + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } +} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index b1aa39ebafa3..afb1fdced5c9 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -49,10 +49,11 @@ //!} //! ``` -pub(in crate::arrow) mod array_reader; +pub mod array_reader; +pub mod arrow_array_reader; pub mod arrow_reader; pub mod arrow_writer; -pub(in crate::arrow) mod converter; +pub mod converter; pub(in crate::arrow) mod levels; pub(in crate::arrow) mod record_reader; pub mod schema; diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 7e3b6a847e7c..4dd7da910fd0 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -464,6 +464,14 @@ mod tests { } } + impl Iterator for TestPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } + } + #[test] fn test_read_required_records() { // Construct column schema diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index b3515780884d..b75d3b5028bb 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -28,6 +28,7 @@ use crate::util::memory::ByteBufferPtr; /// List of supported pages. /// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which /// used to store uncompressed bytes of the page. +#[derive(Clone)] pub enum Page { DataPage { buf: ByteBufferPtr, @@ -188,7 +189,7 @@ impl PageWriteSpec { /// API for reading pages from a column chunk. /// This offers a iterator like API to get the next page. -pub trait PageReader { +pub trait PageReader: Iterator> { /// Gets the next page in the column chunk associated with this reader. /// Returns `None` if there are no pages left. fn get_next_page(&mut self) -> Result>; diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 1181565bdcf4..63be17b7dd1f 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -1353,4 +1353,12 @@ mod tests { Ok(self.pages.next()) } } + + impl Iterator for TestPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } + } } diff --git a/parquet/src/encodings/mod.rs b/parquet/src/encodings/mod.rs index 33b1e233d893..6046ddaec805 100644 --- a/parquet/src/encodings/mod.rs +++ b/parquet/src/encodings/mod.rs @@ -18,4 +18,4 @@ pub mod decoding; pub mod encoding; pub mod levels; -mod rle; +pub(crate) mod rle; diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index 021c1f063f89..be1a22192954 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -22,7 +22,7 @@ use std::{cell, convert, io, result, str}; #[cfg(any(feature = "arrow", test))] use arrow::error::ArrowError; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum ParquetError { /// General Parquet error. /// Returned when code violates normal workflow of working with Parquet files. diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 0877e622ce4b..a4d79a3bc5b7 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -271,6 +271,14 @@ impl SerializedPageReader { } } +impl Iterator for SerializedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index a931b95622df..900e2b5c2b61 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -46,7 +46,7 @@ pub use self::encodings::{decoding, encoding}; pub use self::util::memory; #[macro_use] -mod util; +pub mod util; #[cfg(any(feature = "arrow", test))] pub mod arrow; pub mod column; diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 03b2500a3cd8..1aa8c266175e 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -757,11 +757,13 @@ impl ColumnDescriptor { } /// Returns maximum definition level for this column. + #[inline] pub fn max_def_level(&self) -> i16 { self.max_def_level } /// Returns maximum repetition level for this column. + #[inline] pub fn max_rep_level(&self) -> i16 { self.max_rep_level } diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index 57d0c243fe65..1642a4b12473 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -292,6 +292,7 @@ impl BufferPtr { } /// Returns slice of data in this buffer. + #[inline] pub fn data(&self) -> &[T] { &self.data[self.start..self.start + self.len] } @@ -299,12 +300,20 @@ impl BufferPtr { /// Updates this buffer with new `start` position and length `len`. /// /// Range should be within current start position and length. + #[inline] pub fn with_range(mut self, start: usize, len: usize) -> Self { - assert!(start <= self.len); - assert!(start + len <= self.len); + self.set_range(start, len); + self + } + + /// Updates this buffer with new `start` position and length `len`. + /// + /// Range should be within current start position and length. + #[inline] + pub fn set_range(&mut self, start: usize, len: usize) { + assert!(self.start <= start && start + len <= self.start + self.len); self.start = start; self.len = len; - self } /// Adds memory tracker to this buffer. @@ -314,16 +323,19 @@ impl BufferPtr { } /// Returns start position of this buffer. + #[inline] pub fn start(&self) -> usize { self.start } /// Returns length of this buffer + #[inline] pub fn len(&self) -> usize { self.len } /// Returns whether this buffer is empty + #[inline] pub fn is_empty(&self) -> bool { self.len == 0 } @@ -393,6 +405,7 @@ impl Drop for BufferPtr { } impl AsRef<[u8]> for BufferPtr { + #[inline] fn as_ref(&self) -> &[u8] { &self.data[self.start..self.start + self.len] } diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index af9a1aa1ebac..8f6d85d469e5 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -22,6 +22,7 @@ pub mod bit_util; mod bit_packing; pub mod cursor; pub mod hash_util; - -#[cfg(test)] -pub mod test_common; +pub(crate) mod test_common; +pub use self::test_common::page_util::{ + DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, +}; diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 2e0e8e926bc7..581845a3c1cf 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -32,7 +32,6 @@ use rand::distributions::uniform::SampleUniform; use std::collections::VecDeque; use std::mem; use std::sync::Arc; -use std::vec::IntoIter; pub trait DataPageBuilder { fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]); @@ -78,6 +77,9 @@ impl DataPageBuilderImpl { // Adds levels to the buffer and return number of encoded bytes fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 { + if max_level <= 0 { + return 0; + } let size = max_buffer_size(Encoding::RLE, max_level, levels.len()); let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]); level_encoder.put(levels).expect("put() should be OK"); @@ -163,60 +165,65 @@ impl DataPageBuilder for DataPageBuilderImpl { } /// A utility page reader which stores pages in memory. -pub struct InMemoryPageReader { - pages: Box>, +pub struct InMemoryPageReader> { + page_iter: P, } -impl InMemoryPageReader { - pub fn new(pages: Vec) -> Self { +impl> InMemoryPageReader

{ + pub fn new(pages: impl IntoIterator) -> Self { Self { - pages: Box::new(pages.into_iter()), + page_iter: pages.into_iter(), } } } -impl PageReader for InMemoryPageReader { +impl> PageReader for InMemoryPageReader

{ fn get_next_page(&mut self) -> Result> { - Ok(self.pages.next()) + Ok(self.page_iter.next()) + } +} + +impl> Iterator for InMemoryPageReader

{ + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() } } /// A utility page iterator which stores page readers in memory, used for tests. -pub struct InMemoryPageIterator { +#[derive(Clone)] +pub struct InMemoryPageIterator>> { schema: SchemaDescPtr, column_desc: ColumnDescPtr, - page_readers: IntoIter>, + page_reader_iter: I, } -impl InMemoryPageIterator { +impl>> InMemoryPageIterator { pub fn new( schema: SchemaDescPtr, column_desc: ColumnDescPtr, - pages: Vec>, + pages: impl IntoIterator, IntoIter = I>, ) -> Self { - let page_readers = pages - .into_iter() - .map(|pages| Box::new(InMemoryPageReader::new(pages)) as Box) - .collect::>>() - .into_iter(); - Self { schema, column_desc, - page_readers, + page_reader_iter: pages.into_iter(), } } } -impl Iterator for InMemoryPageIterator { +impl>> Iterator for InMemoryPageIterator { type Item = Result>; fn next(&mut self) -> Option { - self.page_readers.next().map(Ok) + self.page_reader_iter + .next() + .map(|x| Ok(Box::new(InMemoryPageReader::new(x)) as Box)) } } -impl PageIterator for InMemoryPageIterator { +impl>> PageIterator for InMemoryPageIterator { fn schema(&mut self) -> Result { Ok(self.schema.clone()) }