From aa2684fdb2b2ee89f876f5fd55ad885b3c6ffa49 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:28:55 +1000 Subject: [PATCH 1/3] fix: Prune partitions when no filters are defined --- datafusion/catalog-listing/src/helpers.rs | 22 +++++++--- .../core/src/datasource/listing/table.rs | 43 +++++++++++++++++++ 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 7ac13b99cce6..a5f0796af270 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -44,7 +44,7 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchema, DataFusionError}; use datafusion_expr::{Expr, Volatility}; use datafusion_physical_expr::create_physical_expr; -use object_store::path::Path; +use object_store::path::{Path}; use object_store::{ObjectMeta, ObjectStore}; /// Check whether the given expression can be resolved using only the columns `col_names`. @@ -158,6 +158,7 @@ pub fn split_files( chunks } +#[derive(Debug)] pub struct Partition { /// The path to the partition, including the table prefix path: Path, @@ -247,7 +248,12 @@ async fn prune_partitions( partition_cols: &[(String, DataType)], ) -> Result> { if filters.is_empty() { - return Ok(partitions); + // prune partitions which don't contain the partition columns + return Ok(partitions.into_iter().filter(|p| { + let cols = partition_cols.iter().map(|x| x.0.as_str()); + !parse_partitions_for_path(table_path, &p.path, cols) + .unwrap_or_default().is_empty() + }).collect()); } let mut builders: Vec<_> = (0..partition_cols.len()) @@ -458,10 +464,10 @@ pub async fn pruned_partition_list<'a>( } let partition_prefix = evaluate_partition_prefix(partition_cols, filters); + let partitions = list_partitions(store, table_path, partition_cols.len(), partition_prefix) .await?; - debug!("Listed {} partitions", partitions.len()); let pruned = prune_partitions(table_path, partitions, filters, partition_cols).await?; @@ -528,12 +534,12 @@ where let subpath = table_path.strip_prefix(file_path)?; let mut part_values = vec![]; - for (part, pn) in subpath.zip(table_partition_cols) { + for (part, expected_partition) in subpath.zip(table_partition_cols) { match part.split_once('=') { - Some((name, val)) if name == pn => part_values.push(val), + Some((name, val)) if name == expected_partition => part_values.push(val), _ => { debug!( - "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{pn}'", + "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'", ); return None; } @@ -649,6 +655,8 @@ mod tests { ("tablepath/mypartition=val1/notparquetfile", 100), ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/file.parquet", 100), + ("tablepath/notapartition/file.parquet", 100), + ("tablepath/notmypartition=val1/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( @@ -674,6 +682,8 @@ mod tests { ("tablepath/mypartition=val2/file.parquet", 100), ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ("tablepath/notapartition/file.parquet", 100), + ("tablepath/notmypartition=val1/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 51da8e770e65..8dd921ab2cb9 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -2801,6 +2801,48 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { + let files = vec![ + "bucket/test/pid=1/file1", + "bucket/test/pid=1/file2", + "bucket/test/pid=2/file3", + "bucket/test/pid=2/file4", + "bucket/test/other/file5", + ]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_table_partition_cols(vec![ + ("pid".to_string(), DataType::Int32) + ]); + + let table_path = ListingTableUrl::parse("test:///bucket/test/").unwrap(); + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(Arc::new(schema)); + + let table = ListingTable::try_new(config)?; + + let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + assert_eq!(file_list.len(), 1); + + let files = file_list[0].clone(); + + assert_eq!(files.iter().map(|f| f.path().to_string()).collect::>(), vec![ + "bucket/test/pid=1/file1", + "bucket/test/pid=1/file2", + "bucket/test/pid=2/file3", + "bucket/test/pid=2/file4", + ]); + + Ok(()) + } + #[cfg(feature = "parquet")] #[tokio::test] async fn test_table_stats_behaviors() -> Result<()> { @@ -2819,6 +2861,7 @@ mod tests { let config_default = ListingTableConfig::new(table_path.clone()) .with_listing_options(opt_default) .with_schema(schema_default); + let table_default = ListingTable::try_new(config_default)?; let exec_default = table_default.scan(&state, None, &[], None).await?; From 51770c5c74a1012f58f34a1e795e56f4479c24a6 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:31:11 +1000 Subject: [PATCH 2/3] fix: Backport for DF49: --- datafusion/core/src/datasource/listing/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 8dd921ab2cb9..445f36bb0d54 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -2828,7 +2828,7 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], &[], None).await?; assert_eq!(file_list.len(), 1); let files = file_list[0].clone(); From 4e4bf66d4287e821ec9c5d14c0cd49f92108e6fc Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 8 Oct 2025 17:18:16 +1000 Subject: [PATCH 3/3] review: Address comments --- datafusion/catalog-listing/src/helpers.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index a5f0796af270..ceed25ccc7ac 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -44,7 +44,7 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchema, DataFusionError}; use datafusion_expr::{Expr, Volatility}; use datafusion_physical_expr::create_physical_expr; -use object_store::path::{Path}; +use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; /// Check whether the given expression can be resolved using only the columns `col_names`. @@ -468,6 +468,7 @@ pub async fn pruned_partition_list<'a>( let partitions = list_partitions(store, table_path, partition_cols.len(), partition_prefix) .await?; + debug!("Listed {} partitions", partitions.len()); let pruned = prune_partitions(table_path, partitions, filters, partition_cols).await?;