diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 5a495d2c6085..0acbe6501924 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -274,7 +274,7 @@ impl ByteArrayDecoder { validate_utf8, )), Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary( - ByteArrayDecoderDictionary::new(data, num_levels, num_values), + ByteArrayDecoderDictionary::new(data, num_levels, num_values)?, ), Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength( ByteArrayDecoderDeltaLength::new(data, validate_utf8)?, @@ -563,10 +563,10 @@ pub struct ByteArrayDecoderDictionary { } impl ByteArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Result { + Ok(Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values)?, + }) } fn read( diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 4afe4264cb41..09de37a80ed9 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -293,7 +293,7 @@ where Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.slice(1..)); + decoder.set_data(data.slice(1..))?; MaybeDictionaryDecoder::Dict { decoder, max_remaining_values: num_values.unwrap_or(num_levels), diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index cc71647f4bc9..f881690f805f 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -236,7 +236,7 @@ impl ByteViewArrayDecoder { Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( data, num_levels, num_values, - )) + )?) } Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, @@ -426,10 +426,10 @@ pub struct ByteViewArrayDecoderDictionary { } impl ByteViewArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Result { + Ok(Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values)?, + }) } /// Reads the next indexes from self.decoder diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index a37bef568d1c..2297926add5f 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -381,7 +381,7 @@ impl ColumnValueDecoder for ValueDecoder { offset: 0, }, Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict { - decoder: DictIndexDecoder::new(data, num_levels, num_values), + decoder: DictIndexDecoder::new(data, num_levels, num_values)?, }, Encoding::DELTA_BYTE_ARRAY => Decoder::Delta { decoder: DeltaByteArrayDecoder::new(data)?, diff --git a/parquet/src/arrow/decoder/dictionary_index.rs b/parquet/src/arrow/decoder/dictionary_index.rs index 38f2b058360c..bb96f4bf98d6 100644 --- a/parquet/src/arrow/decoder/dictionary_index.rs +++ b/parquet/src/arrow/decoder/dictionary_index.rs @@ -42,18 +42,18 @@ pub struct DictIndexDecoder { impl DictIndexDecoder { /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels /// associated with this data page, and the number of non-null values (if known) - pub fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { + pub fn new(data: Bytes, num_levels: usize, num_values: Option) -> Result { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.slice(1..)); + decoder.set_data(data.slice(1..))?; - Self { + Ok(Self { decoder, index_buf: Box::new([0; 1024]), index_buf_len: 0, index_offset: 0, max_remaining_values: num_values.unwrap_or(num_levels), - } + }) } /// Read up to `len` values, returning the number of values read diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 34b728d6fa1e..8fe26a9b5234 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -131,11 +131,12 @@ impl DefinitionLevelBufferDecoder { impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { type Buffer = DefinitionLevelBuffer; - fn set_data(&mut self, encoding: Encoding, data: Bytes) { + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { match &mut self.decoder { MaybePacked::Packed(d) => d.set_data(encoding, data), - MaybePacked::Fallback(d) => d.set_data(encoding, data), - } + MaybePacked::Fallback(d) => d.set_data(encoding, data)?, + }; + Ok(()) } } diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ebde79e6a7f2..387a0602a60d 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -451,7 +451,7 @@ where self.rep_level_decoder .as_mut() .unwrap() - .set_data(rep_level_encoding, level_data); + .set_data(rep_level_encoding, level_data)?; } if max_def_level > 0 { @@ -466,7 +466,7 @@ where self.def_level_decoder .as_mut() .unwrap() - .set_data(def_level_encoding, level_data); + .set_data(def_level_encoding, level_data)?; } self.values_decoder.set_data( @@ -512,7 +512,7 @@ where self.rep_level_decoder.as_mut().unwrap().set_data( Encoding::RLE, buf.slice(..rep_levels_byte_len as usize), - ); + )?; } // DataPage v2 only supports RLE encoding for definition @@ -524,7 +524,7 @@ where rep_levels_byte_len as usize ..(rep_levels_byte_len + def_levels_byte_len) as usize, ), - ); + )?; } self.values_decoder.set_data( diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index e49906207577..053db813ce5d 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -32,7 +32,7 @@ pub trait ColumnLevelDecoder { type Buffer; /// Set data for this [`ColumnLevelDecoder`] - fn set_data(&mut self, encoding: Encoding, data: Bytes); + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>; } pub trait RepetitionLevelDecoder: ColumnLevelDecoder { @@ -266,15 +266,15 @@ enum LevelDecoder { } impl LevelDecoder { - fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self { + fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result { match encoding { Encoding::RLE => { let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data); - Self::Rle(decoder) + decoder.set_data(data)?; + Ok(Self::Rle(decoder)) } #[allow(deprecated)] - Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width), + Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data), bit_width)), _ => unreachable!("invalid level encoding: {}", encoding), } } @@ -310,8 +310,9 @@ impl DefinitionLevelDecoderImpl { impl ColumnLevelDecoder for DefinitionLevelDecoderImpl { type Buffer = Vec; - fn set_data(&mut self, encoding: Encoding, data: Bytes) { - self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)) + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { + self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?); + Ok(()) } } @@ -413,10 +414,11 @@ impl RepetitionLevelDecoderImpl { impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { type Buffer = Vec; - fn set_data(&mut self, encoding: Encoding, data: Bytes) { - self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)); + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { + self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?); self.buffer_len = 0; self.buffer_offset = 0; + Ok(()) } } @@ -499,14 +501,14 @@ mod tests { let data = Bytes::from(encoder.consume()); let mut decoder = RepetitionLevelDecoderImpl::new(1); - decoder.set_data(Encoding::RLE, data.clone()); + decoder.set_data(Encoding::RLE, data.clone()).unwrap(); let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap(); assert_eq!(levels, 4); // The length of the final bit packed run is ambiguous, so without the correct // levels limit, it will decode zero padding let mut decoder = RepetitionLevelDecoderImpl::new(1); - decoder.set_data(Encoding::RLE, data); + decoder.set_data(Encoding::RLE, data).unwrap(); let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap(); assert_eq!(levels, 6); } @@ -525,7 +527,7 @@ mod tests { let data = Bytes::from(encoder.consume()); let mut decoder = RepetitionLevelDecoderImpl::new(5); - decoder.set_data(Encoding::RLE, data); + decoder.set_data(Encoding::RLE, data).unwrap(); let total_records = encoded.iter().filter(|x| **x == 0).count(); let mut remaining_records = total_records; diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index f5336ca7c09a..8201b38753c6 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -393,7 +393,7 @@ impl Decoder for DictDecoder { )); } let mut rle_decoder = RleDecoder::new(bit_width); - rle_decoder.set_data(data.slice(1..)); + rle_decoder.set_data(data.slice(1..))?; self.num_values = num_values; self.rle_decoder = Some(rle_decoder); Ok(()) @@ -473,7 +473,7 @@ impl Decoder for RleValueDecoder { self.decoder = RleDecoder::new(1); self.decoder - .set_data(data.slice(I32_SIZE..I32_SIZE + data_size)); + .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?; self.values_left = num_values; Ok(()) } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 41c050132064..c95a46c634d2 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -321,14 +321,18 @@ impl RleDecoder { } #[inline] - pub fn set_data(&mut self, data: Bytes) { + pub fn set_data(&mut self, data: Bytes) -> Result<()> { if let Some(ref mut bit_reader) = self.bit_reader { bit_reader.reset(data); } else { self.bit_reader = Some(BitReader::new(data)); } - let _ = self.reload(); + // Initialize decoder state. The boolean only reports whether the first run contained data, + // and `get`/`get_batch` already interpret that result to drive iteration. We only need + // errors propagated here, so the flag returned is intentionally ignored. + let _ = self.reload()?; + Ok(()) } // These functions inline badly, they tend to inline and then create very large loop unrolls @@ -339,7 +343,7 @@ impl RleDecoder { assert!(size_of::() <= 8); while self.rle_left == 0 && self.bit_packed_left == 0 { - if !self.reload() { + if !self.reload()? { return Ok(None); } } @@ -349,14 +353,17 @@ impl RleDecoder { &self .current_value .as_mut() - .expect("current_value should be Some") + .ok_or_else(|| general_err!("current_value should be Some"))? .to_ne_bytes(), )?; self.rle_left -= 1; rle_value } else { // self.bit_packed_left > 0 - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be Some"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be Some"))?; let bit_packed_value = bit_reader .get_value(self.bit_width as usize) .ok_or_else(|| eof_err!("Not enough data for 'bit_packed_value'"))?; @@ -383,7 +390,10 @@ impl RleDecoder { } else if self.bit_packed_left > 0 { let mut num_values = cmp::min(buffer.len() - values_read, self.bit_packed_left as usize); - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| ParquetError::General("bit_reader should be set".into()))?; num_values = bit_reader.get_batch::( &mut buffer[values_read..values_read + num_values], @@ -396,7 +406,7 @@ impl RleDecoder { } self.bit_packed_left -= num_values as u32; values_read += num_values; - } else if !self.reload() { + } else if !self.reload()? { break; } } @@ -415,7 +425,10 @@ impl RleDecoder { } else if self.bit_packed_left > 0 { let mut num_values = cmp::min(num_values - values_skipped, self.bit_packed_left as usize); - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be set"))?; num_values = bit_reader.skip(num_values, self.bit_width as usize); if num_values == 0 { @@ -425,7 +438,7 @@ impl RleDecoder { } self.bit_packed_left -= num_values as u32; values_skipped += num_values; - } else if !self.reload() { + } else if !self.reload()? { break; } } @@ -459,7 +472,10 @@ impl RleDecoder { self.rle_left -= num_values as u32; values_read += num_values; } else if self.bit_packed_left > 0 { - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be set"))?; loop { let to_read = index_buf @@ -488,7 +504,7 @@ impl RleDecoder { break; } } - } else if !self.reload() { + } else if !self.reload()? { break; } } @@ -497,15 +513,18 @@ impl RleDecoder { } #[inline] - fn reload(&mut self) -> bool { - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + fn reload(&mut self) -> Result { + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be set"))?; if let Some(indicator_value) = bit_reader.get_vlq_int() { // fastparquet adds padding to the end of pages. This is not spec-compliant // but is handled by the C++ implementation // if indicator_value == 0 { - return false; + return Ok(false); } if indicator_value & 1 == 1 { self.bit_packed_left = ((indicator_value >> 1) * 8) as u32; @@ -513,14 +532,13 @@ impl RleDecoder { self.rle_left = (indicator_value >> 1) as u32; let value_width = bit_util::ceil(self.bit_width as usize, 8); self.current_value = bit_reader.get_aligned::(value_width); - assert!( - self.current_value.is_some(), - "parquet_data_error: not enough data for RLE decoding" - ); + self.current_value.ok_or_else(|| { + general_err!("parquet_data_error: not enough data for RLE decoding") + })?; } - true + Ok(true) } else { - false + Ok(false) } } } @@ -540,7 +558,7 @@ mod tests { // 00000011 10001000 11000110 11111010 let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![0; 8]; let expected = vec![0, 1, 2, 3, 4, 5, 6, 7]; let result = decoder.get_batch::(&mut buffer); @@ -554,7 +572,7 @@ mod tests { // 00000011 10001000 11000110 11111010 let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let expected = vec![2, 3, 4, 5, 6, 7]; let skipped = decoder.skip(2).expect("skipping values"); assert_eq!(skipped, 2); @@ -595,7 +613,7 @@ mod tests { ]; let mut decoder: RleDecoder = RleDecoder::new(1); - decoder.set_data(data1.into()); + decoder.set_data(data1.into()).unwrap(); let mut buffer = vec![false; 100]; let mut expected = vec![]; for i in 0..100 { @@ -609,7 +627,7 @@ mod tests { assert!(result.is_ok()); assert_eq!(buffer, expected); - decoder.set_data(data2.into()); + decoder.set_data(data2.into()).unwrap(); let mut buffer = vec![false; 100]; let mut expected = vec![]; for i in 0..100 { @@ -638,7 +656,7 @@ mod tests { ]; let mut decoder: RleDecoder = RleDecoder::new(1); - decoder.set_data(data1.into()); + decoder.set_data(data1.into()).unwrap(); let mut buffer = vec![true; 50]; let expected = vec![false; 50]; @@ -650,7 +668,7 @@ mod tests { assert_eq!(remainder, 50); assert_eq!(buffer, expected); - decoder.set_data(data2.into()); + decoder.set_data(data2.into()).unwrap(); let mut buffer = vec![false; 50]; let mut expected = vec![]; for i in 0..50 { @@ -676,7 +694,7 @@ mod tests { let dict = vec![10, 20, 30]; let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![0; 12]; let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let result = decoder.get_batch_with_dict::(&dict, &mut buffer, 12); @@ -689,7 +707,7 @@ mod tests { let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![""; 12]; let expected = vec![ "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff", @@ -707,7 +725,7 @@ mod tests { let dict = vec![10, 20, 30]; let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![0; 10]; let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let skipped = decoder.skip(2).expect("skipping two values"); @@ -724,7 +742,7 @@ mod tests { let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![""; 8]; let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"]; let skipped = decoder.skip(4).expect("skipping four values"); @@ -757,7 +775,7 @@ mod tests { // Verify read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.clone()); + decoder.set_data(buffer.clone()).unwrap(); for v in values { let val: i64 = decoder .get() @@ -767,7 +785,7 @@ mod tests { } // Verify batch read - decoder.set_data(buffer); + decoder.set_data(buffer).unwrap(); let mut values_read: Vec = vec![0; values.len()]; decoder .get_batch(&mut values_read[..]) @@ -872,7 +890,7 @@ mod tests { let data: Bytes = data.into(); let mut decoder = RleDecoder::new(8); - decoder.set_data(data.clone()); + decoder.set_data(data.clone()).unwrap(); let mut output = vec![0_u16; 100]; let read = decoder.get_batch(&mut output).unwrap(); @@ -881,7 +899,7 @@ mod tests { assert!(output.iter().take(20).all(|x| *x == 255)); // Reset decoder - decoder.set_data(data); + decoder.set_data(data).unwrap(); let dict: Vec = (0..256).collect(); let mut output = vec![0_u16; 100]; @@ -907,7 +925,7 @@ mod tests { buffer.push(0); let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.into()); + decoder.set_data(buffer.into()).unwrap(); // We don't always reliably know how many non-null values are contained in a page // and so the decoder must work correctly without a precise value count @@ -947,14 +965,14 @@ mod tests { let buffer: Bytes = writer.consume().into(); let mut decoder = RleDecoder::new(1); - decoder.set_data(buffer.clone()); + decoder.set_data(buffer.clone()).unwrap(); let mut decoded: Vec = vec![0; num_values]; let r = decoder.get_batch(&mut decoded).unwrap(); assert_eq!(r, num_values); assert_eq!(vec![1; num_values], decoded); - decoder.set_data(buffer); + decoder.set_data(buffer).unwrap(); let r = decoder .get_batch_with_dict(&[0, 23], &mut decoded, num_values) .unwrap(); @@ -973,7 +991,7 @@ mod tests { } let buffer = encoder.consume(); let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(Bytes::from(buffer)); + decoder.set_data(Bytes::from(buffer)).unwrap(); let mut actual_values: Vec = vec![0; values.len()]; decoder .get_batch(&mut actual_values) @@ -992,7 +1010,7 @@ mod tests { // Verify read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.clone()); + decoder.set_data(buffer.clone()).unwrap(); for v in values { let val = decoder .get::() @@ -1003,7 +1021,7 @@ mod tests { // Verify batch read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer); + decoder.set_data(buffer).unwrap(); let mut values_read: Vec = vec![0; values.len()]; decoder .get_batch(&mut values_read[..])