Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions crates/polars-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH: &str =
"POLARS_PARQUET_BINARY_STATISTICS_TRUNCATE_LEN";
const DEFAULT_PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH: u64 = 64;

const PRUNE_PARQUET_METADATA: &str = "POLARS_PRUNE_PARQUET_METADATA";
const DEFAULT_PRUNE_PARQUET_METADATA: bool = false;

// Private.
const VERBOSE_SENSITIVE: &str = "POLARS_VERBOSE_SENSITIVE";
const DEFAULT_VERBOSE_SENSITIVE: bool = false;
Expand Down Expand Up @@ -71,6 +74,7 @@ static KNOWN_OPTIONS: &[&str] = &[
STREAMING_CHUNK_SIZE,
ENGINE_AFFINITY,
PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH,
PRUNE_PARQUET_METADATA,
/*
Not yet supported public options:

Expand Down Expand Up @@ -113,6 +117,7 @@ pub struct Config {
ideal_morsel_size: AtomicU64,
engine_affinity: AtomicU8,
parquet_binary_statistics_truncate_length: AtomicU64,
prune_parquet_metadata: AtomicBool,

// Private.
verbose_sensitive: AtomicBool,
Expand All @@ -137,6 +142,7 @@ impl Config {
parquet_binary_statistics_truncate_length: AtomicU64::new(
DEFAULT_PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH,
),
prune_parquet_metadata: AtomicBool::new(DEFAULT_PRUNE_PARQUET_METADATA),

// Private.
verbose_sensitive: AtomicBool::new(DEFAULT_VERBOSE_SENSITIVE),
Expand Down Expand Up @@ -204,6 +210,11 @@ impl Config {
Ordering::Relaxed,
)
},
PRUNE_PARQUET_METADATA => self.prune_parquet_metadata.store(
val.and_then(|x| parse::parse_bool(var, x))
.unwrap_or(DEFAULT_PRUNE_PARQUET_METADATA),
Ordering::Relaxed,
),

// Private flags.
VERBOSE_SENSITIVE => self.verbose_sensitive.store(
Expand Down Expand Up @@ -290,6 +301,12 @@ impl Config {
.load(Ordering::Relaxed)
}

/// Whether the optimizer should prune parquet metadata to projected/predicate columns
/// before serializing the IR plan. See `parquet_metadata_prune` in `polars-plan`.
pub fn prune_parquet_metadata(&self) -> bool {
self.prune_parquet_metadata.load(Ordering::Relaxed)
}

/// Whether we should do verbose printing on sensitive information.
pub fn verbose_sensitive(&self) -> bool {
self.verbose_sensitive.load(Ordering::Relaxed)
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/arrow/read/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use arrow::datatypes::{ArrowDataType, Field, IntegerType, IntervalUnit, TimeUnit
use arrow::types::{days_ms, i256};
use ethnum::I256;
use num_traits::{AsPrimitive, FromBytes};
use polars_buffer::Buffer;
use polars_utils::IdxSize;
use polars_utils::float16::pf16;
use polars_utils::pl_str::PlSmallStr;
Expand Down Expand Up @@ -293,7 +292,7 @@ pub fn deserialize_all(
field: &Field,
row_groups: &[RowGroupMetadata],
field_idx: usize,
footer_buf: &Buffer<u8>,
footer_buf: &[u8],
) -> ParquetResult<Option<ArrowColumnStatisticsArrays>> {
assert!(!row_groups.is_empty());
use ArrowDataType as D;
Expand Down Expand Up @@ -553,7 +552,7 @@ pub fn deserialize_all(
pub fn deserialize<'a>(
field: &Field,
columns: &mut impl ExactSizeIterator<Item = &'a ColumnChunkMetadata>,
footer_buf: &Buffer<u8>,
footer_buf: &[u8],
) -> ParquetResult<Option<Statistics>> {
use ArrowDataType as D;
match field.dtype() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use polars_buffer::Buffer;
use polars_parquet_format::Statistics as ParquetStatistics;

use super::column_descriptor::{ColumnDescriptor, ColumnDescriptorRef};
Expand Down Expand Up @@ -39,6 +38,14 @@ impl ColumnChunkMetadata {
&self.column_chunk.meta_data
}

/// The full `CompactColumnChunk` wrapper. Used by the prune pass
/// (`FileMetadata::pruned`) to rebuild a smaller `FileMetadata` for
/// serialisation.
#[inline]
pub(crate) fn compact_column_chunk(&self) -> &CompactColumnChunk {
&self.column_chunk
}

/// The [`ColumnDescriptor`] for this column. This descriptor contains
/// the physical and logical type of the pages.
pub fn descriptor(&self) -> &ColumnDescriptor {
Expand All @@ -59,7 +66,7 @@ impl ColumnChunkMetadata {
///
/// `footer_buf` must be the same buffer this chunk was decoded from,
/// typically [`super::FileMetadata::footer_buf`].
pub fn statistics(&self, footer_buf: &Buffer<u8>) -> Option<ParquetResult<Statistics>> {
pub fn statistics(&self, footer_buf: &[u8]) -> Option<ParquetResult<Statistics>> {
let stats = self.compact_metadata().statistics.as_ref()?;
let parquet_stats = compact_stats_to_parquet(stats, footer_buf);
Some(Statistics::deserialize(
Expand Down
92 changes: 92 additions & 0 deletions crates/polars-parquet/src/parquet/metadata/file_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub use crate::parquet::thrift_format::KeyValue;
// per-row-group structures, and the footer buffer that backs lazily-resolved
// column-chunk statistics. Built from `CompactFileMetaData` (the hand-written
// decoder's output) via `Self::from_compact`.
//
// Custom `Serialize`/`Deserialize` (in `file_metadata_serde`) emit a pruned
// wire form: schema without `leaves`, stats materialised to owned bytes,
// `footer_buf` reconstructed on deserialize. Cheap enough to ship in IR
// plans for distributed execution.
#[derive(Debug, Clone)]
pub struct FileMetadata {
/// version of this file.
Expand Down Expand Up @@ -74,6 +79,93 @@ impl FileMetadata {
.unwrap_or(ColumnOrder::Undefined)
}

/// Prune to projected columns, keeping statistics only for predicate
/// columns.
///
/// Returns a new [`FileMetadata`] containing only:
/// - top-level schema fields whose name is in `keep_top_level_names`,
/// - row-group chunks corresponding to those fields' leaves,
/// - statistics on chunks whose column is in `predicate_top_level_names`.
///
/// `predicate_top_level_names` is treated as a subset of
/// `keep_top_level_names`; pass `&[]` to drop all stats. `created_by`,
/// `key_value_metadata`, and `column_orders` are also dropped (not
/// needed by the read hot path).
///
/// Returns `Err` only when [`RowGroupMetadata::from_compact`] rejects
/// the rebuilt row group (chunks-vs-leaves desync). Callers can fall
/// back to unpruned metadata; the unpruned form is always valid.
///
/// TODO: a planner-side pass could pre-evaluate static predicates
/// against stats and drop fully-skipped row groups, removing stats
/// from the wire for those cases.
pub fn pruned(
&self,
keep_top_level_names: &[polars_utils::pl_str::PlSmallStr],
predicate_top_level_names: &[polars_utils::pl_str::PlSmallStr],
Copy link
Copy Markdown
Collaborator

@nameexhaustion nameexhaustion May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking:

Could we make a lookup for 1 side (either the kept names or the parquet schema names)? We've had cases in the past where users had >=10,000 columns and saw quadratic slowdowns on lookups.
If we do kept names, I'm thinking maybe HashMap<PlSmallStr, bool>, where the bool indicates true if we want to keep statistics and false otherwise.

) -> ParquetResult<Self> {
let is_kept = |name: &str| keep_top_level_names.iter().any(|n| n.as_str() == name);

// 1. Filter top-level fields, preserving order from the source schema.
let pruned_fields: Vec<crate::parquet::schema::types::ParquetType> = self
.schema_descr
.fields()
.iter()
.filter(|f| is_kept(f.get_field_info().name.as_str()))
.cloned()
.collect();

// 2. Build the pruned SchemaDescriptor (DFS derives leaves Arc).
let pruned_schema = SchemaDescriptor::new(self.schema_descr.name().into(), pruned_fields);

// 3. Per row group: pick chunks whose top-level field is in `keep`,
// drop stats from non-predicate columns.
let keep_stats_for =
|name: &str| predicate_top_level_names.iter().any(|n| n.as_str() == name);
let mut max_row_group_height = 0;
let row_groups: Vec<RowGroupMetadata> = self
.row_groups
.iter()
.map(|rg| {
let kept_chunks: Vec<crate::parquet::metadata::compact::CompactColumnChunk> = rg
.parquet_columns()
.iter()
.filter(|c| is_kept(c.descriptor().path_in_schema[0].as_str()))
.map(|c| {
let mut chunk = c.compact_column_chunk().clone();
if !keep_stats_for(c.descriptor().path_in_schema[0].as_str()) {
chunk.meta_data.statistics = None;
}
chunk
})
.collect();

let compact_rg = crate::parquet::metadata::compact::CompactRowGroup {
columns: kept_chunks,
total_byte_size: rg.total_byte_size() as i64,
num_rows: rg.num_rows() as i64,
sorting_columns: rg.sorting_columns().map(|sc| sc.to_vec()),
};

let md = RowGroupMetadata::from_compact(&pruned_schema, compact_rg)?;
max_row_group_height = max_row_group_height.max(md.num_rows());
Ok(md)
})
.collect::<ParquetResult<_>>()?;

Ok(FileMetadata {
version: self.version,
num_rows: self.num_rows,
max_row_group_height,
created_by: None,
row_groups,
key_value_metadata: None,
schema_descr: pruned_schema,
column_orders: None,
footer_buf: self.footer_buf.clone(),
})
}

/// Build a `FileMetadata` from a [`CompactFileMetaData`], the output of
/// the hand-written Thrift decoder. Parses the schema, attaches each
/// row group's chunks to the schema's descriptors, and stores the
Expand Down
Loading
Loading