diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 3f95ea9d4982..b53a469e3d13 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -389,31 +389,15 @@ pub(crate) fn decode_page( // TODO: page header could be huge because of statistics. We should set a // maximum page header size and abort if that is exceeded. - let buffer = match decompressor { - Some(decompressor) if can_decompress => { - let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?; - if offset > buffer.len() || offset > uncompressed_page_size { - return Err(general_err!("Invalid page header")); - } - let decompressed_size = uncompressed_page_size - offset; - let mut decompressed = Vec::with_capacity(uncompressed_page_size); - decompressed.extend_from_slice(&buffer.as_ref()[..offset]); - if decompressed_size > 0 { - let compressed = &buffer.as_ref()[offset..]; - decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?; - } - - if decompressed.len() != uncompressed_page_size { - return Err(general_err!( - "Actual decompressed size doesn't match the expected one ({} vs {})", - decompressed.len(), - uncompressed_page_size - )); - } - - Bytes::from(decompressed) - } - _ => buffer, + let buffer = if can_decompress { + decompress_buffer( + buffer, + offset, + decompressor, + page_header.uncompressed_page_size, + )? + } else { + buffer }; let result = match page_header.r#type { @@ -471,6 +455,47 @@ pub(crate) fn decode_page( Ok(result) } +/// Decompressed the specified buffer, starting from the specified offset, using +/// the provided decompressor if available and applicable. If the buffer is not +/// compressed, it will be returned as is. +fn decompress_buffer( + buffer: Bytes, + offset: usize, + mut decompressor: Option<&mut Box>, + uncompressed_page_size: i32, +) -> Result { + let Some(decompressor) = decompressor.as_mut() else { + return Ok(buffer); + }; + + let uncompressed_page_size = usize::try_from(uncompressed_page_size)?; + if offset > buffer.len() || offset > uncompressed_page_size { + return Err(general_err!("Invalid page header")); + } + + let decompressed_size = uncompressed_page_size - offset; + + if decompressed_size == 0 { + // Not compressed, return buffer as is + return Ok(buffer.slice(..offset)); + } + + let mut decompressed = Vec::with_capacity(uncompressed_page_size); + decompressed.extend_from_slice(&buffer[..offset]); + let compressed = &buffer.as_ref()[offset..]; + decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?; + + if decompressed.len() != uncompressed_page_size { + return Err(general_err!( + "Actual decompressed size doesn't match the expected one ({} vs {})", + decompressed.len(), + uncompressed_page_size + )); + } + + Ok(Bytes::from(decompressed)) +} + enum SerializedPageReaderState { Values { /// The current byte offset in the reader