Skip to content

Commit 0ae9f66

Browse files
etseidlalamb
andauthored
Truncate Parquet page data page statistics (#7555)
# Which issue does this PR close? Enables workaround for #7489 - Closes #7579 # Rationale for this change When `WriterProperties::statistics_truncate_length` is set, the column chunk statistics are truncated, but the page statistics are not. This can lead to very large page headers that blow up some readers. # What changes are included in this PR? Data Page Header statistics are now truncated as well. # Are there any user-facing changes? No --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3681540 commit 0ae9f66

File tree

3 files changed

+134
-51
lines changed

3 files changed

+134
-51
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,9 +1324,12 @@ mod tests {
13241324
use super::*;
13251325

13261326
use std::fs::File;
1327+
use std::io::Seek;
13271328

13281329
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
13291330
use crate::arrow::ARROW_SCHEMA_META_KEY;
1331+
use crate::format::PageHeader;
1332+
use crate::thrift::TCompactSliceInputProtocol;
13301333
use arrow::datatypes::ToByteSlice;
13311334
use arrow::datatypes::{DataType, Schema};
13321335
use arrow::error::Result as ArrowResult;
@@ -3766,4 +3769,68 @@ mod tests {
37663769
.unwrap();
37673770
assert_eq!(batches.len(), 0);
37683771
}
3772+
3773+
#[test]
3774+
fn test_page_stats_truncation() {
3775+
let string_field = Field::new("a", DataType::Utf8, false);
3776+
let binary_field = Field::new("b", DataType::Binary, false);
3777+
let schema = Schema::new(vec![string_field, binary_field]);
3778+
3779+
let raw_string_values = vec!["Blart Versenwald III"];
3780+
let raw_binary_values = [b"Blart Versenwald III".to_vec()];
3781+
let raw_binary_value_refs = raw_binary_values
3782+
.iter()
3783+
.map(|x| x.as_slice())
3784+
.collect::<Vec<_>>();
3785+
3786+
let string_values = StringArray::from(raw_string_values.clone());
3787+
let binary_values = BinaryArray::from(raw_binary_value_refs);
3788+
let batch = RecordBatch::try_new(
3789+
Arc::new(schema),
3790+
vec![Arc::new(string_values), Arc::new(binary_values)],
3791+
)
3792+
.unwrap();
3793+
3794+
let props = WriterProperties::builder()
3795+
.set_statistics_truncate_length(Some(2))
3796+
.set_dictionary_enabled(false)
3797+
.set_encoding(Encoding::PLAIN)
3798+
.set_compression(crate::basic::Compression::UNCOMPRESSED)
3799+
.build();
3800+
3801+
let mut file = roundtrip_opts(&batch, props);
3802+
3803+
// read file and decode page headers
3804+
// Note: use the thrift API as there is no Rust API to access the statistics in the page headers
3805+
let mut buf = vec![];
3806+
file.seek(std::io::SeekFrom::Start(0)).unwrap();
3807+
let read = file.read_to_end(&mut buf).unwrap();
3808+
assert!(read > 0);
3809+
3810+
// decode first page header
3811+
let first_page = &buf[4..];
3812+
let mut prot = TCompactSliceInputProtocol::new(first_page);
3813+
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3814+
let stats = hdr.data_page_header.unwrap().statistics;
3815+
assert!(stats.is_some());
3816+
let stats = stats.unwrap();
3817+
// check that min/max were properly truncated
3818+
assert!(!stats.is_max_value_exact.unwrap());
3819+
assert!(!stats.is_min_value_exact.unwrap());
3820+
assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3821+
assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3822+
3823+
// check second page now
3824+
let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
3825+
let mut prot = TCompactSliceInputProtocol::new(second_page);
3826+
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3827+
let stats = hdr.data_page_header.unwrap().statistics;
3828+
assert!(stats.is_some());
3829+
let stats = stats.unwrap();
3830+
// check that min/max were properly truncated
3831+
assert!(!stats.is_max_value_exact.unwrap());
3832+
assert!(!stats.is_min_value_exact.unwrap());
3833+
assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3834+
assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3835+
}
37693836
}

parquet/src/column/writer/mod.rs

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,59 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
949949
.unwrap_or_else(|| (data.to_vec(), false))
950950
}
951951

952+
/// Truncate the min and max values that will be written to a data page
953+
/// header or column chunk Statistics
954+
fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
955+
let backwards_compatible_min_max = self.descr.sort_order().is_signed();
956+
match statistics {
957+
Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
958+
let (min, did_truncate_min) = self.truncate_min_value(
959+
self.props.statistics_truncate_length(),
960+
stats.min_bytes_opt().unwrap(),
961+
);
962+
let (max, did_truncate_max) = self.truncate_max_value(
963+
self.props.statistics_truncate_length(),
964+
stats.max_bytes_opt().unwrap(),
965+
);
966+
Statistics::ByteArray(
967+
ValueStatistics::new(
968+
Some(min.into()),
969+
Some(max.into()),
970+
stats.distinct_count(),
971+
stats.null_count_opt(),
972+
backwards_compatible_min_max,
973+
)
974+
.with_max_is_exact(!did_truncate_max)
975+
.with_min_is_exact(!did_truncate_min),
976+
)
977+
}
978+
Statistics::FixedLenByteArray(stats)
979+
if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
980+
{
981+
let (min, did_truncate_min) = self.truncate_min_value(
982+
self.props.statistics_truncate_length(),
983+
stats.min_bytes_opt().unwrap(),
984+
);
985+
let (max, did_truncate_max) = self.truncate_max_value(
986+
self.props.statistics_truncate_length(),
987+
stats.max_bytes_opt().unwrap(),
988+
);
989+
Statistics::FixedLenByteArray(
990+
ValueStatistics::new(
991+
Some(min.into()),
992+
Some(max.into()),
993+
stats.distinct_count(),
994+
stats.null_count_opt(),
995+
backwards_compatible_min_max,
996+
)
997+
.with_max_is_exact(!did_truncate_max)
998+
.with_min_is_exact(!did_truncate_min),
999+
)
1000+
}
1001+
stats => stats,
1002+
}
1003+
}
1004+
9521005
/// Adds data page.
9531006
/// Data page is either buffered in case of dictionary encoding or written directly.
9541007
fn add_data_page(&mut self) -> Result<()> {
@@ -992,6 +1045,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
9921045
.update_variable_length_bytes(values_data.variable_length_bytes);
9931046

9941047
let page_statistics = page_statistics.map(Statistics::from);
1048+
let page_statistics = page_statistics.map(|stats| self.truncate_statistics(stats));
9951049

9961050
let compressed_page = match self.props.writer_version() {
9971051
WriterVersion::PARQUET_1_0 => {
@@ -1147,53 +1201,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11471201
.with_backwards_compatible_min_max(backwards_compatible_min_max)
11481202
.into();
11491203

1150-
let statistics = match statistics {
1151-
Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1152-
let (min, did_truncate_min) = self.truncate_min_value(
1153-
self.props.statistics_truncate_length(),
1154-
stats.min_bytes_opt().unwrap(),
1155-
);
1156-
let (max, did_truncate_max) = self.truncate_max_value(
1157-
self.props.statistics_truncate_length(),
1158-
stats.max_bytes_opt().unwrap(),
1159-
);
1160-
Statistics::ByteArray(
1161-
ValueStatistics::new(
1162-
Some(min.into()),
1163-
Some(max.into()),
1164-
stats.distinct_count(),
1165-
stats.null_count_opt(),
1166-
backwards_compatible_min_max,
1167-
)
1168-
.with_max_is_exact(!did_truncate_max)
1169-
.with_min_is_exact(!did_truncate_min),
1170-
)
1171-
}
1172-
Statistics::FixedLenByteArray(stats)
1173-
if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1174-
{
1175-
let (min, did_truncate_min) = self.truncate_min_value(
1176-
self.props.statistics_truncate_length(),
1177-
stats.min_bytes_opt().unwrap(),
1178-
);
1179-
let (max, did_truncate_max) = self.truncate_max_value(
1180-
self.props.statistics_truncate_length(),
1181-
stats.max_bytes_opt().unwrap(),
1182-
);
1183-
Statistics::FixedLenByteArray(
1184-
ValueStatistics::new(
1185-
Some(min.into()),
1186-
Some(max.into()),
1187-
stats.distinct_count(),
1188-
stats.null_count_opt(),
1189-
backwards_compatible_min_max,
1190-
)
1191-
.with_max_is_exact(!did_truncate_max)
1192-
.with_min_is_exact(!did_truncate_min),
1193-
)
1194-
}
1195-
stats => stats,
1196-
};
1204+
let statistics = self.truncate_statistics(statistics);
11971205

11981206
builder = builder
11991207
.set_statistics(statistics)

parquet/src/file/properties.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,13 @@ impl WriterProperties {
302302
self.column_index_truncate_length
303303
}
304304

305-
/// Returns the maximum length of truncated min/max values in statistics.
305+
/// Returns the maximum length of truncated min/max values in [`Statistics`].
306306
///
307307
/// `None` if truncation is disabled, must be greater than 0 otherwise.
308308
///
309309
/// For more details see [`WriterPropertiesBuilder::set_statistics_truncate_length`]
310+
///
311+
/// [`Statistics`]: crate::file::statistics::Statistics
310312
pub fn statistics_truncate_length(&self) -> Option<usize> {
311313
self.statistics_truncate_length
312314
}
@@ -646,16 +648,22 @@ impl WriterPropertiesBuilder {
646648
self
647649
}
648650

649-
/// Sets the max length of min/max value fields in row group level
651+
/// Sets the max length of min/max value fields in row group and data page header
650652
/// [`Statistics`] (defaults to `None` (no limit) via [`DEFAULT_STATISTICS_TRUNCATE_LENGTH`]).
651653
///
652654
/// # Notes
653-
/// Row group level [`Statistics`] are written when [`Self::set_statistics_enabled`] is
654-
/// set to [`EnabledStatistics::Chunk`] or [`EnabledStatistics::Page`].
655+
/// Row group [`Statistics`] are written when [`Self::set_statistics_enabled`] is
656+
/// set to [`EnabledStatistics::Chunk`] or [`EnabledStatistics::Page`]. Data page header
657+
/// [`Statistics`] are written when [`Self::set_statistics_enabled`] is set to
658+
/// [`EnabledStatistics::Page`].
655659
///
656660
/// * If `Some`, must be greater than 0, otherwise will panic
657661
/// * If `None`, there's no effective limit.
658662
///
663+
/// # See also
664+
/// Truncation of Page Index statistics is controlled separately via
665+
/// [`WriterPropertiesBuilder::set_column_index_truncate_length`]
666+
///
659667
/// [`Statistics`]: crate::file::statistics::Statistics
660668
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>) -> Self {
661669
if let Some(value) = max_length {

0 commit comments

Comments
 (0)