diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index baabebf30ec5..d8a7f07fba25 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -20,7 +20,12 @@ use arrow::datatypes::DataType; use criterion::measurement::WallTime; use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion}; use num::FromPrimitive; +use num_bigint::BigInt; +use parquet::arrow::array_reader::{ + make_byte_array_reader, make_fixed_len_byte_array_reader, +}; use parquet::basic::Type; +use parquet::data_type::FixedLenByteArrayType; use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator}; use parquet::{ arrow::array_reader::ArrayReader, @@ -47,6 +52,10 @@ fn build_test_schema() -> SchemaDescPtr { OPTIONAL INT32 optional_decimal1_leaf (DECIMAL(8,2)); REQUIRED INT64 mandatory_decimal2_leaf (DECIMAL(16,2)); OPTIONAL INT64 optional_decimal2_leaf (DECIMAL(16,2)); + REQUIRED BYTE_ARRAY mandatory_decimal3_leaf (DECIMAL(16,2)); + OPTIONAL BYTE_ARRAY optional_decimal3_leaf (DECIMAL(16,2)); + REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_decimal4_leaf (DECIMAL(16,2)); + OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_decimal4_leaf (DECIMAL(16,2)); } "; parse_message_type(message_type) @@ -65,6 +74,71 @@ pub fn seedable_rng() -> StdRng { StdRng::seed_from_u64(42) } +// support byte array for decimal +fn build_encoded_decimal_bytes_page_iterator( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + null_density: f32, + encoding: Encoding, + min: i128, + max: i128, +) -> impl PageIterator + Clone +where + T: parquet::data_type::DataType, + T::T: From>, +{ + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + // create the decimal value + let value = rng.gen_range(min..max); + // decimal of parquet use the big-endian to store + let bytes = match column_desc.physical_type() { + Type::BYTE_ARRAY => { + // byte array use the unfixed size + let big_int = BigInt::from(value); + big_int.to_signed_bytes_be() + } + Type::FIXED_LEN_BYTE_ARRAY => { + assert_eq!(column_desc.type_length(), 16); + // fixed length byte array use the fixed size + // the size is 16 + value.to_be_bytes().to_vec() + } + _ => unimplemented!(), + }; + let value = T::T::from(bytes); + values.push(value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(encoding, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + InMemoryPageIterator::new(schema, column_desc, pages) +} + fn build_encoded_primitive_page_iterator( schema: SchemaDescPtr, column_desc: ColumnDescPtr, @@ -326,6 +400,7 @@ fn bench_array_reader_skip(mut array_reader: Box) -> usize { } total_count } + fn create_primitive_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -354,11 +429,27 @@ fn create_primitive_array_reader( } } +fn create_decimal_by_bytes_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + let physical_type = column_desc.physical_type(); + match physical_type { + Type::BYTE_ARRAY => { + make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() + } + Type::FIXED_LEN_BYTE_ARRAY => { + make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None) + .unwrap() + } + _ => unimplemented!(), + } +} + fn create_string_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - use parquet::arrow::array_reader::make_byte_array_reader; make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() } @@ -378,6 +469,80 @@ fn create_string_byte_array_dictionary_reader( .unwrap() } +fn bench_byte_decimal( + group: &mut BenchmarkGroup, + schema: &SchemaDescPtr, + mandatory_column_desc: &ColumnDescPtr, + optional_column_desc: &ColumnDescPtr, + min: i128, + max: i128, +) where + T: parquet::data_type::DataType, + T::T: From>, +{ + // all are plain encoding + let mut count: usize = 0; + + // plain encoded, no NULLs + let data = build_encoded_decimal_bytes_page_iterator::( + schema.clone(), + mandatory_column_desc.clone(), + 0.0, + Encoding::PLAIN, + min, + max, + ); + group.bench_function("plain encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_decimal_by_bytes_reader( + data.clone(), + mandatory_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_encoded_decimal_bytes_page_iterator::( + schema.clone(), + optional_column_desc.clone(), + 0.0, + Encoding::PLAIN, + min, + max, + ); + group.bench_function("plain encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_decimal_by_bytes_reader( + data.clone(), + optional_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // half null + let data = build_encoded_decimal_bytes_page_iterator::( + schema.clone(), + optional_column_desc.clone(), + 0.5, + Encoding::PLAIN, + min, + max, + ); + group.bench_function("plain encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_decimal_by_bytes_reader( + data.clone(), + optional_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); +} + fn bench_primitive( group: &mut BenchmarkGroup, schema: &SchemaDescPtr, @@ -611,9 +776,39 @@ fn decimal_benches(c: &mut Criterion) { &schema, &mandatory_decimal2_leaf_desc, &optional_decimal2_leaf_desc, - // precision is 18: the max is 999999999999999999 - 999999999999000, - 999999999999999, + // precision is 16: the max is 9999999999999999 + 9999999999999000, + 9999999999999999, + ); + group.finish(); + + // parquet BYTE_ARRAY, logical type decimal(16,2) + let mut group = c.benchmark_group("arrow_array_reader/BYTE_ARRAY/Decimal128Array"); + let mandatory_decimal3_leaf_desc = schema.column(10); + let optional_decimal3_leaf_desc = schema.column(11); + bench_byte_decimal::( + &mut group, + &schema, + &mandatory_decimal3_leaf_desc, + &optional_decimal3_leaf_desc, + // precision is 16: the max is 9999999999999999 + 9999999999999000, + 9999999999999999, + ); + group.finish(); + + let mut group = + c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array"); + let mandatory_decimal4_leaf_desc = schema.column(12); + let optional_decimal4_leaf_desc = schema.column(13); + bench_byte_decimal::( + &mut group, + &schema, + &mandatory_decimal4_leaf_desc, + &optional_decimal4_leaf_desc, + // precision is 16: the max is 9999999999999999 + 9999999999999000, + 9999999999999999, ); group.finish(); } diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 440160039521..9cd36cf43dc8 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -313,6 +313,12 @@ impl From for FixedLenByteArray { } } +impl From> for FixedLenByteArray { + fn from(buf: Vec) -> FixedLenByteArray { + FixedLenByteArray(ByteArray::from(buf)) + } +} + impl From for ByteArray { fn from(other: FixedLenByteArray) -> Self { other.0 @@ -1141,9 +1147,9 @@ macro_rules! make_type { } fn get_column_reader( - column_writer: ColumnReader, + column_reader: ColumnReader, ) -> Option> { - match column_writer { + match column_reader { ColumnReader::$reader_ident(w) => Some(w), _ => None, } @@ -1248,29 +1254,29 @@ impl FromBytes for Int96 { // FIXME Needed to satisfy the constraint of many decoding functions but ByteArray does not // appear to actual be converted directly from bytes impl FromBytes for ByteArray { - type Buffer = [u8; 8]; + type Buffer = Vec; fn from_le_bytes(bs: Self::Buffer) -> Self { - ByteArray::from(bs.to_vec()) + ByteArray::from(bs) } fn from_be_bytes(_bs: Self::Buffer) -> Self { unreachable!() } fn from_ne_bytes(bs: Self::Buffer) -> Self { - ByteArray::from(bs.to_vec()) + ByteArray::from(bs) } } impl FromBytes for FixedLenByteArray { - type Buffer = [u8; 8]; + type Buffer = Vec; fn from_le_bytes(bs: Self::Buffer) -> Self { - Self(ByteArray::from(bs.to_vec())) + Self(ByteArray::from(bs)) } fn from_be_bytes(_bs: Self::Buffer) -> Self { unreachable!() } fn from_ne_bytes(bs: Self::Buffer) -> Self { - Self(ByteArray::from(bs.to_vec())) + Self(ByteArray::from(bs)) } }