Skip to content

Commit 9d75f87

Browse files
authored
[Parquet]Optimize the performance in record reader (#8607)
# Which issue does this PR close? Related to: - #7456 - #8565 # Rationale for this change Improve the performance in ParquetRecoredBatchReader, especially when the `rowselector` is short. - By changing a hash map to a enum array # What changes are included in this PR? For `parquet/src/arrow/array_reader/cached_array_reader.rs`, update the hash function # Are these changes tested? The hashmaps are already covered by existing tests. Also tested by manual read parquets. # Are there any user-facing changes? No # Performance results in arrow_reader_row_filter.rs on my 3950X Benchmark | Change | Verdict -- | -- | -- int64 == 9999 / all_columns / async | 🟢 -1.61% | Improved int64 == 9999 / all_columns / sync | 🔴 +1.56% | Regressed int64 == 9999 / exclude_filter_column / async | 🟢 -1.11% | Improved int64 == 9999 / exclude_filter_column / sync | ⚪ -0.97% | Within noise float64 > 99.0 / all_columns / async | 🟢 -6.25% | Improved float64 > 99.0 / all_columns / sync | 🟢 -11.24% | Improved float64 > 99.0 / exclude_filter_column / async | 🟢 -11.10% | Improved float64 > 99.0 / exclude_filter_column / sync | 🟢 -3.31% | Improved ts ≥ 9000 / all_columns / async | 🔴 +2.77% | Regressed ts ≥ 9000 / all_columns / sync | ⚪ -0.06% | Within noise ts ≥ 9000 / exclude_filter_column / async | 🟢 -2.54% | Improved ts ≥ 9000 / exclude_filter_column / sync | ⚪ +0.28% | Within noise int64 > 90 / all_columns / async | 🟢 -14.68% | Improved int64 > 90 / all_columns / sync | 🟢 -21.00% | Improved int64 > 90 / exclude_filter_column / async | 🟢 -17.66% | Improved int64 > 90 / exclude_filter_column / sync | 🟢 -14.53% | Improved float64 ≤ 99.0 / all_columns / async | 🟢 -9.20% | Improved float64 ≤ 99.0 / all_columns / sync | 🟢 -11.07% | Improved float64 ≤ 99.0 / exclude_filter_column / async | 🟢 -10.01% | Improved float64 ≤ 99.0 / exclude_filter_column / sync | 🟢 -11.80% | Improved ts < 9000 / all_columns / async | 🟢 -3.43% | Improved ts < 9000 / all_columns / sync | 🟢 -6.23% | Improved ts < 9000 / exclude_filter_column / async | 🟢 -4.00% | Improved ts < 9000 / exclude_filter_column / sync | 🟢 -3.91% | Improved utf8View <> '' / all_columns / async | 🟢 -16.56% | Improved utf8View <> '' / all_columns / sync | 🟢 -12.10% | Improved utf8View <> '' / exclude_filter_column / async | 🟢 -13.00% | Improved utf8View <> '' / exclude_filter_column / sync | 🟢 -17.29% | Improved float64 > 99.0 AND ts ≥ 9000 / all_columns / async | 🔴 +3.51% | Regressed float64 > 99.0 AND ts ≥ 9000 / all_columns / sync | 🟢 -2.19% | Improved float64 > 99.0 AND ts ≥ 9000 / exclude_filter_column / async | 🟢 -2.63% | Improved float64 > 99.0 AND ts ≥ 9000 / exclude_filter_column / sync | 🟢 -2.72% | Improved
1 parent 4fc9302 commit 9d75f87

File tree

2 files changed

+39
-29
lines changed

2 files changed

+39
-29
lines changed

parquet/src/basic.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,11 @@ impl EncodingMask {
771771
Self(mask)
772772
}
773773

774+
/// Mark the given [`Encoding`] as present in this mask.
775+
pub fn insert(&mut self, val: Encoding) {
776+
self.0 |= 1 << (val as i32);
777+
}
778+
774779
/// Test if a given [`Encoding`] is present in this mask.
775780
pub fn is_set(&self, val: Encoding) -> bool {
776781
self.0 & (1 << (val as i32)) != 0

parquet/src/column/reader/decoder.rs

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
19-
2018
use bytes::Bytes;
2119

22-
use crate::basic::Encoding;
20+
use crate::basic::{Encoding, EncodingMask};
2321
use crate::data_type::DataType;
2422
use crate::encodings::{
2523
decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder},
@@ -68,9 +66,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
6866
}
6967

7068
pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
71-
/// Read up to `num_levels` definition levels into `out`
69+
/// Read up to `num_levels` definition levels into `out`.
7270
///
73-
/// Returns the number of values skipped, and the number of levels skipped
71+
/// Returns the number of values read, and the number of levels read.
7472
///
7573
/// # Panics
7674
///
@@ -81,9 +79,9 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
8179
num_levels: usize,
8280
) -> Result<(usize, usize)>;
8381

84-
/// Skips over `num_levels` definition levels
82+
/// Skips over `num_levels` definition levels.
8583
///
86-
/// Returns the number of values skipped, and the number of levels skipped
84+
/// Returns the number of values skipped, and the number of levels skipped.
8785
fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
8886
}
8987

@@ -136,14 +134,22 @@ pub trait ColumnValueDecoder {
136134
fn skip_values(&mut self, num_values: usize) -> Result<usize>;
137135
}
138136

137+
/// Bucket-based storage for decoder instances keyed by `Encoding`.
138+
///
139+
/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the
140+
/// hot decoding paths.
141+
const ENCODING_SLOTS: usize = Encoding::BYTE_STREAM_SPLIT as usize + 1;
142+
139143
/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
140144
pub struct ColumnValueDecoderImpl<T: DataType> {
141145
descr: ColumnDescPtr,
142146

143147
current_encoding: Option<Encoding>,
144148

145-
// Cache of decoders for existing encodings
146-
decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
149+
/// Cache of decoders for existing encodings.
150+
/// Uses `EncodingMask` and dense storage keyed by encoding discriminant.
151+
decoder_mask: EncodingMask,
152+
decoders: [Option<Box<dyn Decoder<T>>>; ENCODING_SLOTS],
147153
}
148154

149155
impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
@@ -153,7 +159,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
153159
Self {
154160
descr: descr.clone(),
155161
current_encoding: None,
156-
decoders: Default::default(),
162+
decoder_mask: EncodingMask::default(),
163+
decoders: std::array::from_fn(|_| None),
157164
}
158165
}
159166

@@ -168,7 +175,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
168175
encoding = Encoding::RLE_DICTIONARY
169176
}
170177

171-
if self.decoders.contains_key(&encoding) {
178+
if self.decoder_mask.is_set(encoding) {
172179
return Err(general_err!("Column cannot have more than one dictionary"));
173180
}
174181

@@ -178,7 +185,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
178185

179186
let mut decoder = DictDecoder::new();
180187
decoder.set_dict(Box::new(dictionary))?;
181-
self.decoders.insert(encoding, Box::new(decoder));
188+
self.decoders[encoding as usize] = Some(Box::new(decoder));
189+
self.decoder_mask.insert(encoding);
182190
Ok(())
183191
} else {
184192
Err(nyi_err!(
@@ -195,25 +203,24 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
195203
num_levels: usize,
196204
num_values: Option<usize>,
197205
) -> Result<()> {
198-
use std::collections::hash_map::Entry;
199-
200206
if encoding == Encoding::PLAIN_DICTIONARY {
201207
encoding = Encoding::RLE_DICTIONARY;
202208
}
203209

204210
let decoder = if encoding == Encoding::RLE_DICTIONARY {
205-
self.decoders
206-
.get_mut(&encoding)
211+
self.decoders[encoding as usize]
212+
.as_mut()
207213
.expect("Decoder for dict should have been set")
208214
} else {
209-
// Search cache for data page decoder
210-
match self.decoders.entry(encoding) {
211-
Entry::Occupied(e) => e.into_mut(),
212-
Entry::Vacant(v) => {
213-
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
214-
v.insert(data_decoder)
215-
}
215+
let slot = encoding as usize;
216+
if self.decoders[slot].is_none() {
217+
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
218+
self.decoders[slot] = Some(data_decoder);
219+
self.decoder_mask.insert(encoding);
216220
}
221+
self.decoders[slot]
222+
.as_mut()
223+
.expect("decoder should have been inserted")
217224
};
218225

219226
decoder.set_data(data, num_values.unwrap_or(num_levels))?;
@@ -226,9 +233,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
226233
.current_encoding
227234
.expect("current_encoding should be set");
228235

229-
let current_decoder = self
230-
.decoders
231-
.get_mut(&encoding)
236+
let current_decoder = self.decoders[encoding as usize]
237+
.as_mut()
232238
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
233239

234240
// TODO: Push vec into decoder (#5177)
@@ -244,9 +250,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
244250
.current_encoding
245251
.expect("current_encoding should be set");
246252

247-
let current_decoder = self
248-
.decoders
249-
.get_mut(&encoding)
253+
let current_decoder = self.decoders[encoding as usize]
254+
.as_mut()
250255
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
251256

252257
current_decoder.skip(num_values)

0 commit comments

Comments
 (0)