From 0ef56fa1f65835df6bbdc8931076829aa991f7bb Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 30 Sep 2024 22:47:14 +0800 Subject: [PATCH 1/3] support to query partitioned table for dynamic file catalog --- .../core/src/datasource/listing/table.rs | 33 +++- .../sqllogictest/test_files/dynamic_file.slt | 167 +++++++++++++++++- 2 files changed, 195 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2a35fddeb0337..a34bff36cc8f3 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,7 +32,7 @@ use crate::datasource::{ physical_plan::{FileScanConfig, FileSinkConfig}, }; use crate::execution::context::SessionState; -use datafusion_catalog::TableProvider; +use datafusion_catalog::{DynamicFileCatalog, TableProvider}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; @@ -53,6 +53,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use datafusion_catalog::Session; +use datafusion_optimizer::OptimizerConfig; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -159,6 +160,26 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(file_format) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()); + // If the catalog is a `DynamicFileCatalog`, infer the partition columns from the table path + let listing_options = if Self::is_enable_url_table(state) { + let partitions = listing_options + .infer_partitions(state, self.table_paths.first().unwrap()) + .await? + .into_iter() + .map(|col_name| { + ( + col_name, + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ), + ) + }) + .collect::>(); + listing_options.with_table_partition_cols(partitions) + } else { + listing_options + }; Ok(Self { table_paths: self.table_paths, @@ -167,6 +188,14 @@ impl ListingTableConfig { }) } + fn is_enable_url_table(session: &SessionState) -> bool { + session + .catalog_list() + .as_any() + .downcast_ref::() + .is_some() + } + /// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using. pub async fn infer_schema(self, state: &SessionState) -> Result { match self.options { @@ -504,7 +533,7 @@ impl ListingOptions { /// Infer the partitioning at the given path on the provided object store. /// For performance reasons, it doesn't read all the files on disk /// and therefore may fail to detect invalid partitioning. - async fn infer_partitions( + pub(crate) async fn infer_partitions( &self, state: &SessionState, table_path: &ListingTableUrl, diff --git a/datafusion/sqllogictest/test_files/dynamic_file.slt b/datafusion/sqllogictest/test_files/dynamic_file.slt index e177fd3de2437..69f9a43ad4077 100644 --- a/datafusion/sqllogictest/test_files/dynamic_file.slt +++ b/datafusion/sqllogictest/test_files/dynamic_file.slt @@ -25,9 +25,170 @@ SELECT * FROM '../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0; 1 foo true 2 bar false -# dynamic file query doesn't support partitioned table -statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/partitioned_table_arrow' not found -SELECT * FROM '../core/tests/data/partitioned_table_arrow' ORDER BY f0; +# Read partitioned file +statement ok +CREATE TABLE src_table_1 ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + partition_col INT +) AS VALUES +(1, 'aaa', 100, 1), +(2, 'bbb', 200, 1), +(3, 'ccc', 300, 1), +(4, 'ddd', 400, 1); + +statement ok +CREATE TABLE src_table_2 ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + partition_col INT +) AS VALUES +(5, 'eee', 500, 2), +(6, 'fff', 600, 2), +(7, 'ggg', 700, 2), +(8, 'hhh', 800, 2); + +# Read partitioned csv file + +query I +COPY src_table_1 TO 'test_files/scratch/dynamic_file/csv_partitions' +STORED AS CSV +PARTITIONED BY (partition_col); +---- +4 + +query I +COPY src_table_2 TO 'test_files/scratch/dynamic_file/csv_partitions' +STORED AS CSV +PARTITIONED BY (partition_col); +---- +4 + +query ITIT rowsort +SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/csv_partitions'; +---- +1 aaa 100 1 +2 bbb 200 1 +3 ccc 300 1 +4 ddd 400 1 +5 eee 500 2 +6 fff 600 2 +7 ggg 700 2 +8 hhh 800 2 + +# Read partitioned json file + +query I +COPY src_table_1 TO 'test_files/scratch/dynamic_file/json_partitions' +STORED AS JSON +PARTITIONED BY (partition_col); +---- +4 + +query I +COPY src_table_2 TO 'test_files/scratch/dynamic_file/json_partitions' +STORED AS JSON +PARTITIONED BY (partition_col); +---- +4 + +query ITIT rowsort +SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/json_partitions'; +---- +1 aaa 100 1 +2 bbb 200 1 +3 ccc 300 1 +4 ddd 400 1 +5 eee 500 2 +6 fff 600 2 +7 ggg 700 2 +8 hhh 800 2 + +# Read partitioned arrow file + +query I +COPY src_table_1 TO 'test_files/scratch/dynamic_file/arrow_partitions' +STORED AS ARROW +PARTITIONED BY (partition_col); +---- +4 + +query I +COPY src_table_2 TO 'test_files/scratch/dynamic_file/arrow_partitions' +STORED AS ARROW +PARTITIONED BY (partition_col); +---- +4 + +query ITIT rowsort +SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/arrow_partitions'; +---- +1 aaa 100 1 +2 bbb 200 1 +3 ccc 300 1 +4 ddd 400 1 +5 eee 500 2 +6 fff 600 2 +7 ggg 700 2 +8 hhh 800 2 + +# Read partitioned parquet file + +query I +COPY src_table_1 TO 'test_files/scratch/dynamic_file/parquet_partitions' +STORED AS PARQUET +PARTITIONED BY (partition_col); +---- +4 + +query I +COPY src_table_2 TO 'test_files/scratch/dynamic_file/parquet_partitions' +STORED AS PARQUET +PARTITIONED BY (partition_col); +---- +4 + +query ITIT rowsort +select * from 'test_files/scratch/dynamic_file/parquet_partitions'; +---- +1 aaa 100 1 +2 bbb 200 1 +3 ccc 300 1 +4 ddd 400 1 +5 eee 500 2 +6 fff 600 2 +7 ggg 700 2 +8 hhh 800 2 + +# Read partitioned parquet file with multiple partition columns + +query I +COPY src_table_1 TO 'test_files/scratch/dynamic_file/nested_partition' +STORED AS PARQUET +PARTITIONED BY (partition_col, string_col); +---- +4 + +query I +COPY src_table_2 TO 'test_files/scratch/dynamic_file/nested_partition' +STORED AS PARQUET +PARTITIONED BY (partition_col, string_col); +---- +4 + +query IITT rowsort +select * from 'test_files/scratch/dynamic_file/nested_partition'; +---- +1 100 1 aaa +2 200 1 bbb +3 300 1 ccc +4 400 1 ddd +5 500 2 eee +6 600 2 fff +7 700 2 ggg +8 800 2 hhh # read avro file query IT From e88a5a7c688d15b4d518a57bde9e2e816d2387d5 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 30 Sep 2024 23:44:41 +0800 Subject: [PATCH 2/3] cargo clippy --- datafusion/core/src/datasource/listing/table.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a34bff36cc8f3..85eb587bea7ba 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -53,7 +53,6 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use datafusion_catalog::Session; -use datafusion_optimizer::OptimizerConfig; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; From 3600f077bc74a0a07d79dd769f52f827eaa57ab4 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Wed, 2 Oct 2024 00:04:12 +0800 Subject: [PATCH 3/3] split partitions inferring to another function --- .../core/src/datasource/dynamic_file.rs | 13 +++- .../core/src/datasource/listing/table.rs | 64 ++++++++++--------- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/datafusion/core/src/datasource/dynamic_file.rs b/datafusion/core/src/datasource/dynamic_file.rs index 3c409af29703e..6654d0871c3f6 100644 --- a/datafusion/core/src/datasource/dynamic_file.rs +++ b/datafusion/core/src/datasource/dynamic_file.rs @@ -69,11 +69,18 @@ impl UrlTableFactory for DynamicListTableFactory { .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?; match ListingTableConfig::new(table_url.clone()) - .infer(state) + .infer_options(state) .await { - Ok(cfg) => ListingTable::try_new(cfg) - .map(|table| Some(Arc::new(table) as Arc)), + Ok(cfg) => { + let cfg = cfg + .infer_partitions_from_path(state) + .await? + .infer_schema(state) + .await?; + ListingTable::try_new(cfg) + .map(|table| Some(Arc::new(table) as Arc)) + } Err(_) => Ok(None), } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 85eb587bea7ba..59ecdb695ae25 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,8 +32,8 @@ use crate::datasource::{ physical_plan::{FileScanConfig, FileSinkConfig}, }; use crate::execution::context::SessionState; -use datafusion_catalog::{DynamicFileCatalog, TableProvider}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_catalog::TableProvider; +use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; @@ -159,26 +159,6 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(file_format) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()); - // If the catalog is a `DynamicFileCatalog`, infer the partition columns from the table path - let listing_options = if Self::is_enable_url_table(state) { - let partitions = listing_options - .infer_partitions(state, self.table_paths.first().unwrap()) - .await? - .into_iter() - .map(|col_name| { - ( - col_name, - DataType::Dictionary( - Box::new(DataType::UInt16), - Box::new(DataType::Utf8), - ), - ) - }) - .collect::>(); - listing_options.with_table_partition_cols(partitions) - } else { - listing_options - }; Ok(Self { table_paths: self.table_paths, @@ -187,14 +167,6 @@ impl ListingTableConfig { }) } - fn is_enable_url_table(session: &SessionState) -> bool { - session - .catalog_list() - .as_any() - .downcast_ref::() - .is_some() - } - /// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using. pub async fn infer_schema(self, state: &SessionState) -> Result { match self.options { @@ -219,6 +191,38 @@ impl ListingTableConfig { pub async fn infer(self, state: &SessionState) -> Result { self.infer_options(state).await?.infer_schema(state).await } + + /// Infer the partition columns from the path. Requires `self.options` to be set prior to using. + pub async fn infer_partitions_from_path(self, state: &SessionState) -> Result { + match self.options { + Some(options) => { + let Some(url) = self.table_paths.first() else { + return config_err!("No table path found"); + }; + let partitions = options + .infer_partitions(state, url) + .await? + .into_iter() + .map(|col_name| { + ( + col_name, + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ), + ) + }) + .collect::>(); + let options = options.with_table_partition_cols(partitions); + Ok(Self { + table_paths: self.table_paths, + file_schema: self.file_schema, + options: Some(options), + }) + } + None => config_err!("No `ListingOptions` set for inferring schema"), + } + } } /// Options for creating a [`ListingTable`]