From 205e9f2928daf95bbea5c0b20f0eaf9d060100a3 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 15 Jul 2024 13:54:27 -0400 Subject: [PATCH 1/8] add a knob to force string view in benchmark --- benchmarks/src/clickbench.rs | 4 ++- benchmarks/src/tpch/run.rs | 1 + benchmarks/src/util/options.rs | 4 +++ datafusion/common/src/config.rs | 4 +++ .../core/src/datasource/listing/table.rs | 27 ++++++++++++++++++- 5 files changed, 38 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 41dffc55f371e..34004d9fb1c80 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,7 +116,9 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let config = self.common.config(); + let mut config = self.common.config(); + config.options_mut().execution.schema_force_string_view = self.common.string_view; + let ctx = SessionContext::new_with_config(config); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index f2a93d2ea5495..afaf5f2c5f375 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,6 +120,7 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + config.options_mut().execution.schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); // register tables diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b9398e5b522f2..b4d4ac47ca635 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -37,6 +37,10 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[structopt(short, long)] pub debug: bool, + + /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray. + #[structopt(long)] + pub string_view: bool, } impl CommonOpt { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 880f0119ce0da..fb2b0a431a0ab 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -311,6 +311,10 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + + /// If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, + /// and `Binary/BinaryLarge` with `BinaryView`. + pub schema_force_string_view: bool, default = false } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ea4d396a14cb3..0142d42767721 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -410,7 +410,32 @@ impl ListingOptions { .try_collect() .await?; - self.format.infer_schema(state, &store, &files).await + let mut schema = self.format.infer_schema(state, &store, &files).await?; + + if state.config_options().execution.schema_force_string_view { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new( + field.name(), + DataType::Utf8View, + field.is_nullable(), + )), + DataType::Binary | DataType::LargeBinary => Arc::new(Field::new( + field.name(), + DataType::BinaryView, + field.is_nullable(), + )), + _ => field.clone(), + }) + .collect(); + schema = Arc::new(Schema::new_with_metadata( + transformed_fields, + schema.metadata.clone(), + )); + } + Ok(schema) } /// Infers the partition columns stored in `LOCATION` and compares From 7326f1fd3e47b4a64ae9730f5c101e9fd85519b1 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 15 Jul 2024 14:47:00 -0400 Subject: [PATCH 2/8] fix sql logic test --- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 95bea1223a9ce..a4ae927db6afc 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -205,6 +205,7 @@ datafusion.execution.parquet.statistics_enabled NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 +datafusion.execution.schema_force_string_view false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -289,6 +290,7 @@ datafusion.execution.parquet.statistics_enabled NULL Sets if statistics are enab datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.schema_force_string_view false If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). From c066d9bc68598d7ab9d0d3861083adf829718613 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 15 Jul 2024 15:11:01 -0400 Subject: [PATCH 3/8] update doc --- docs/source/user-guide/configs.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5130b0a56d0e9..a2bccd1b4e33c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -86,7 +86,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | -| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.keep_partition_by_columns | false | Should Datafusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.schema_force_string_view | false | If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | From aeeb77ea651631a5e4400fa36be62659946f45c7 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 17 Jul 2024 09:41:51 -0400 Subject: [PATCH 4/8] fix ci --- Cargo.toml | 1 - docs/source/user-guide/configs.md | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2c19d422d1861..b1f07aa531df3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,7 +154,6 @@ large_futures = "warn" [workspace.lints.rust] unused_imports = "deny" - ## Temporary arrow-rs patch until 52.2.0 is released [patch.crates-io] diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index a2bccd1b4e33c..ae30204bbe8f1 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -86,7 +86,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | -| datafusion.execution.keep_partition_by_columns | false | Should Datafusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | | datafusion.execution.schema_force_string_view | false | If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | From 9e2476a0851c585227ef88e9e80ac00cc5e50e7b Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 17 Jul 2024 10:08:46 -0400 Subject: [PATCH 5/8] fix ci only test --- benchmarks/src/tpch/run.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index afaf5f2c5f375..3a31f48a4bd43 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -340,6 +340,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, + string_view: false, }; let opt = RunOpt { query: Some(query), @@ -373,6 +374,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, + string_view: false, }; let opt = RunOpt { query: Some(query), From 9c11a27b3834f9b1a8fcd132a22fd4eedee97e68 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 18 Jul 2024 16:37:07 -0400 Subject: [PATCH 6/8] Update benchmarks/src/util/options.rs Co-authored-by: Andrew Lamb --- benchmarks/src/util/options.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b4d4ac47ca635..02591e293272e 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -38,7 +38,8 @@ pub struct CommonOpt { #[structopt(short, long)] pub debug: bool, - /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray. + /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray + /// when reading ParquetFiles #[structopt(long)] pub string_view: bool, } From f980fc64479c9ae60bab1ecc1caa0d211b3ceefc Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 18 Jul 2024 17:00:43 -0400 Subject: [PATCH 7/8] Update datafusion/common/src/config.rs Co-authored-by: Andrew Lamb --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index fb2b0a431a0ab..5f5b441bd6066 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -312,7 +312,7 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false - /// If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, + /// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, /// and `Binary/BinaryLarge` with `BinaryView`. pub schema_force_string_view: bool, default = false } From 38633d2eab4a403c7c6935f6888b317e4c76e283 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Jul 2024 17:55:07 -0400 Subject: [PATCH 8/8] update tests --- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- docs/source/user-guide/configs.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index a4ae927db6afc..9be3e9fd4146d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -290,7 +290,7 @@ datafusion.execution.parquet.statistics_enabled NULL Sets if statistics are enab datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system -datafusion.execution.schema_force_string_view false If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.schema_force_string_view false If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ae30204bbe8f1..106f4369f970e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -87,7 +87,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | -| datafusion.execution.schema_force_string_view | false | If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.schema_force_string_view | false | If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |