Skip to content

Commit f2e5d0a

Browse files
authored
Merge branch 'main' into optimize_projected_union
2 parents c516035 + 1dddf03 commit f2e5d0a

File tree

92 files changed

+1550
-1733
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+1550
-1733
lines changed

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@6cc14f7f2f4b3129aff07a8b071d2d4f2733465d # v2.62.50
45+
uses: taiki-e/install-action@0be4756f42223b67aa4b7df5effad59010cbf4b9 # v2.62.51
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/rust.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ jobs:
434434
sudo apt-get update -qq
435435
sudo apt-get install -y -qq clang
436436
- name: Setup wasm-pack
437-
uses: taiki-e/install-action@6cc14f7f2f4b3129aff07a8b071d2d4f2733465d # v2.62.50
437+
uses: taiki-e/install-action@0be4756f42223b67aa4b7df5effad59010cbf4b9 # v2.62.51
438438
with:
439439
tool: wasm-pack
440440
- name: Run tests with headless mode
@@ -761,7 +761,7 @@ jobs:
761761
- name: Setup Rust toolchain
762762
uses: ./.github/actions/setup-builder
763763
- name: Install cargo-msrv
764-
uses: taiki-e/install-action@6cc14f7f2f4b3129aff07a8b071d2d4f2733465d # v2.62.50
764+
uses: taiki-e/install-action@0be4756f42223b67aa4b7df5effad59010cbf4b9 # v2.62.51
765765
with:
766766
tool: cargo-msrv
767767

@@ -806,4 +806,4 @@ jobs:
806806
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
807807
with:
808808
persist-credentials: false
809-
- uses: crate-ci/typos@1af53e3774f068183ffd0c7193eb061a2b65a531 # v1.39.1
809+
- uses: crate-ci/typos@626c4bedb751ce0b7f03262ca97ddda9a076ae1c # v1.39.2

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2392,6 +2392,7 @@ impl DataFrame {
23922392
/// # Ok(())
23932393
/// # }
23942394
/// ```
2395+
#[expect(clippy::needless_pass_by_value)]
23952396
pub fn fill_null(
23962397
&self,
23972398
value: ScalarValue,

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod tests {
3333
use datafusion_datasource_csv::CsvFormat;
3434
use object_store::ObjectStore;
3535

36+
use crate::datasource::file_format::FileFormat;
3637
use crate::prelude::CsvReadOptions;
3738
use crate::prelude::SessionContext;
3839
use crate::test::partitioned_file_groups;
@@ -104,12 +105,13 @@ mod tests {
104105
let path = format!("{}/csv", arrow_test_data());
105106
let filename = "aggregate_test_100.csv";
106107
let tmp_dir = TempDir::new()?;
108+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
107109

108110
let file_groups = partitioned_file_groups(
109111
path.as_str(),
110112
filename,
111113
1,
112-
Arc::new(CsvFormat::default()),
114+
&csv_format,
113115
file_compression_type.to_owned(),
114116
tmp_dir.path(),
115117
)?;
@@ -176,12 +178,13 @@ mod tests {
176178
let path = format!("{}/csv", arrow_test_data());
177179
let filename = "aggregate_test_100.csv";
178180
let tmp_dir = TempDir::new()?;
181+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
179182

180183
let file_groups = partitioned_file_groups(
181184
path.as_str(),
182185
filename,
183186
1,
184-
Arc::new(CsvFormat::default()),
187+
&csv_format,
185188
file_compression_type.to_owned(),
186189
tmp_dir.path(),
187190
)?;
@@ -247,12 +250,13 @@ mod tests {
247250
let path = format!("{}/csv", arrow_test_data());
248251
let filename = "aggregate_test_100.csv";
249252
let tmp_dir = TempDir::new()?;
253+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
250254

251255
let file_groups = partitioned_file_groups(
252256
path.as_str(),
253257
filename,
254258
1,
255-
Arc::new(CsvFormat::default()),
259+
&csv_format,
256260
file_compression_type.to_owned(),
257261
tmp_dir.path(),
258262
)?;
@@ -317,12 +321,13 @@ mod tests {
317321
let path = format!("{}/csv", arrow_test_data());
318322
let filename = "aggregate_test_100.csv";
319323
let tmp_dir = TempDir::new()?;
324+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
320325

321326
let file_groups = partitioned_file_groups(
322327
path.as_str(),
323328
filename,
324329
1,
325-
Arc::new(CsvFormat::default()),
330+
&csv_format,
326331
file_compression_type.to_owned(),
327332
tmp_dir.path(),
328333
)?;
@@ -378,12 +383,13 @@ mod tests {
378383
let path = format!("{}/csv", arrow_test_data());
379384
let filename = "aggregate_test_100.csv";
380385
let tmp_dir = TempDir::new()?;
386+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
381387

382388
let mut file_groups = partitioned_file_groups(
383389
path.as_str(),
384390
filename,
385391
1,
386-
Arc::new(CsvFormat::default()),
392+
&csv_format,
387393
file_compression_type.to_owned(),
388394
tmp_dir.path(),
389395
)?;
@@ -489,12 +495,13 @@ mod tests {
489495
let path = format!("{}/csv", arrow_test_data());
490496
let filename = "aggregate_test_100.csv";
491497
let tmp_dir = TempDir::new()?;
498+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
492499

493500
let file_groups = partitioned_file_groups(
494501
path.as_str(),
495502
filename,
496503
1,
497-
Arc::new(CsvFormat::default()),
504+
&csv_format,
498505
file_compression_type.to_owned(),
499506
tmp_dir.path(),
500507
)

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ mod tests {
6969
let store = state.runtime_env().object_store(&store_url).unwrap();
7070

7171
let filename = "1.json";
72+
let json_format: Arc<dyn FileFormat> = Arc::new(JsonFormat::default());
73+
7274
let file_groups = partitioned_file_groups(
7375
TEST_DATA_BASE,
7476
filename,
7577
1,
76-
Arc::new(JsonFormat::default()),
78+
&json_format,
7779
file_compression_type.to_owned(),
7880
work_dir,
7981
)
@@ -104,11 +106,13 @@ mod tests {
104106
ctx.register_object_store(&url, store.clone());
105107
let filename = "1.json";
106108
let tmp_dir = TempDir::new()?;
109+
let json_format: Arc<dyn FileFormat> = Arc::new(JsonFormat::default());
110+
107111
let file_groups = partitioned_file_groups(
108112
TEST_DATA_BASE,
109113
filename,
110114
1,
111-
Arc::new(JsonFormat::default()),
115+
&json_format,
112116
file_compression_type.to_owned(),
113117
tmp_dir.path(),
114118
)

datafusion/core/src/execution/context/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,18 +1054,18 @@ impl SessionContext {
10541054
} else if allow_missing {
10551055
return self.return_empty_dataframe();
10561056
} else {
1057-
return self.schema_doesnt_exist_err(name);
1057+
return self.schema_doesnt_exist_err(&name);
10581058
}
10591059
};
10601060
let dereg = catalog.deregister_schema(name.schema_name(), cascade)?;
10611061
match (dereg, allow_missing) {
10621062
(None, true) => self.return_empty_dataframe(),
1063-
(None, false) => self.schema_doesnt_exist_err(name),
1063+
(None, false) => self.schema_doesnt_exist_err(&name),
10641064
(Some(_), _) => self.return_empty_dataframe(),
10651065
}
10661066
}
10671067

1068-
fn schema_doesnt_exist_err(&self, schemaref: SchemaReference) -> Result<DataFrame> {
1068+
fn schema_doesnt_exist_err(&self, schemaref: &SchemaReference) -> Result<DataFrame> {
10691069
exec_err!("Schema '{schemaref}' doesn't exist.")
10701070
}
10711071

datafusion/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
)
3636
)]
3737
#![warn(missing_docs, clippy::needless_borrow)]
38+
#![deny(clippy::needless_pass_by_value)]
39+
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
3840

3941
//! [DataFusion] is an extensible query engine written in Rust that
4042
//! uses [Apache Arrow] as its in-memory format. DataFusion's target users are

datafusion/core/src/physical_planner.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ impl DefaultPhysicalPlanner {
498498
output_schema,
499499
}) => {
500500
let output_schema = Arc::clone(output_schema.inner());
501-
self.plan_describe(Arc::clone(schema), output_schema)?
501+
self.plan_describe(&Arc::clone(schema), output_schema)?
502502
}
503503

504504
// 1 Child
@@ -2246,6 +2246,7 @@ impl DefaultPhysicalPlanner {
22462246

22472247
/// Optimize a physical plan by applying each physical optimizer,
22482248
/// calling observer(plan, optimizer after each one)
2249+
#[expect(clippy::needless_pass_by_value)]
22492250
pub fn optimize_physical_plan<F>(
22502251
&self,
22512252
plan: Arc<dyn ExecutionPlan>,
@@ -2280,7 +2281,7 @@ impl DefaultPhysicalPlanner {
22802281

22812282
// This only checks the schema in release build, and performs additional checks in debug mode.
22822283
OptimizationInvariantChecker::new(optimizer)
2283-
.check(&new_plan, before_schema)?;
2284+
.check(&new_plan, &before_schema)?;
22842285

22852286
debug!(
22862287
"Optimized physical plan by {}:\n{}\n",
@@ -2313,7 +2314,7 @@ impl DefaultPhysicalPlanner {
23132314
// return an record_batch which describes a table's schema.
23142315
fn plan_describe(
23152316
&self,
2316-
table_schema: Arc<Schema>,
2317+
table_schema: &Arc<Schema>,
23172318
output_schema: Arc<Schema>,
23182319
) -> Result<Arc<dyn ExecutionPlan>> {
23192320
let mut column_names = StringBuilder::new();
@@ -2516,10 +2517,10 @@ impl<'a> OptimizationInvariantChecker<'a> {
25162517
pub fn check(
25172518
&mut self,
25182519
plan: &Arc<dyn ExecutionPlan>,
2519-
previous_schema: Arc<Schema>,
2520+
previous_schema: &Arc<Schema>,
25202521
) -> Result<()> {
25212522
// if the rule is not permitted to change the schema, confirm that it did not change.
2522-
if self.rule.schema_check() && plan.schema() != previous_schema {
2523+
if self.rule.schema_check() && plan.schema() != *previous_schema {
25232524
internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
25242525
self.rule.name(),
25252526
previous_schema,
@@ -3709,20 +3710,20 @@ digraph {
37093710

37103711
// Test: check should pass with same schema
37113712
let equal_schema = ok_plan.schema();
3712-
OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;
3713+
OptimizationInvariantChecker::new(&rule).check(&ok_plan, &equal_schema)?;
37133714

37143715
// Test: should fail with schema changed
37153716
let different_schema =
37163717
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
37173718
let expected_err = OptimizationInvariantChecker::new(&rule)
3718-
.check(&ok_plan, different_schema)
3719+
.check(&ok_plan, &different_schema)
37193720
.unwrap_err();
37203721
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema"));
37213722

37223723
// Test: should fail when extension node fails it's own invariant check
37233724
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
37243725
let expected_err = OptimizationInvariantChecker::new(&rule)
3725-
.check(&failing_node, ok_plan.schema())
3726+
.check(&failing_node, &ok_plan.schema())
37263727
.unwrap_err();
37273728
assert!(expected_err
37283729
.to_string()
@@ -3735,7 +3736,7 @@ digraph {
37353736
Arc::clone(&child),
37363737
])?;
37373738
let expected_err = OptimizationInvariantChecker::new(&rule)
3738-
.check(&invalid_plan, ok_plan.schema())
3739+
.check(&invalid_plan, &ok_plan.schema())
37393740
.unwrap_err();
37403741
assert!(expected_err
37413742
.to_string()

datafusion/core/src/test/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,13 @@ pub fn scan_partitioned_csv(
8787
let schema = aggr_test_schema();
8888
let filename = "aggregate_test_100.csv";
8989
let path = format!("{}/csv", arrow_test_data());
90+
let csv_format: Arc<dyn FileFormat> = Arc::new(CsvFormat::default());
91+
9092
let file_groups = partitioned_file_groups(
9193
path.as_str(),
9294
filename,
9395
partitions,
94-
Arc::new(CsvFormat::default()),
96+
&csv_format,
9597
FileCompressionType::UNCOMPRESSED,
9698
work_dir,
9799
)?;
@@ -114,7 +116,7 @@ pub fn partitioned_file_groups(
114116
path: &str,
115117
filename: &str,
116118
partitions: usize,
117-
file_format: Arc<dyn FileFormat>,
119+
file_format: &Arc<dyn FileFormat>,
118120
file_compression_type: FileCompressionType,
119121
work_dir: &Path,
120122
) -> Result<Vec<FileGroup>> {
@@ -198,7 +200,7 @@ pub fn partitioned_file_groups(
198200
.collect::<Vec<_>>())
199201
}
200202

201-
pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
203+
pub fn assert_fields_eq(plan: &LogicalPlan, expected: &[&str]) {
202204
let actual: Vec<String> = plan
203205
.schema()
204206
.fields()

0 commit comments

Comments
 (0)