Skip to content

Commit 83ce363

Browse files
committed
Merge branch 'main' of https://github.com/apache/datafusion into schema-fix
2 parents e45d1bb + 67cf1d6 commit 83ce363

84 files changed

Lines changed: 2083 additions & 1195 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ rand = "0.8"
135135
regex = "1.8"
136136
rstest = "0.22.0"
137137
serde_json = "1"
138-
sqlparser = { version = "0.49", features = ["visitor"] }
138+
sqlparser = { version = "0.50.0", features = ["visitor"] }
139139
tempfile = "3"
140140
thiserror = "1.0.44"
141141
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }

datafusion-cli/Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ readme = "README.md"
3131

3232
[dependencies]
3333
arrow = { version = "52.2.0" }
34-
async-trait = "0.1.41"
34+
async-trait = "0.1.73"
3535
aws-config = "0.55"
3636
aws-credential-types = "0.55"
3737
clap = { version = "3", features = ["derive", "cargo"] }

datafusion-cli/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
FROM rust:1.78-bookworm as builder
18+
FROM rust:1.78-bookworm AS builder
1919

2020
COPY . /usr/src/datafusion
2121
COPY ./datafusion /usr/src/datafusion/datafusion

datafusion-examples/examples/catalog.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ async fn main() -> Result<()> {
4646

4747
let ctx = SessionContext::new();
4848
let state = ctx.state();
49-
let catlist = Arc::new(CustomCatalogProviderList::new());
49+
let cataloglist = Arc::new(CustomCatalogProviderList::new());
5050

5151
// use our custom catalog list for context. each context has a single catalog list.
5252
// context will by default have [`MemoryCatalogProviderList`]
53-
ctx.register_catalog_list(catlist.clone());
53+
ctx.register_catalog_list(cataloglist.clone());
5454

5555
// initialize our catalog and schemas
5656
let catalog = DirCatalog::new();
@@ -81,7 +81,7 @@ async fn main() -> Result<()> {
8181
ctx.register_catalog("dircat", Arc::new(catalog));
8282
{
8383
// catalog was passed down into our custom catalog list since we override the ctx's default
84-
let catalogs = catlist.catalogs.read().unwrap();
84+
let catalogs = cataloglist.catalogs.read().unwrap();
8585
assert!(catalogs.contains_key("dircat"));
8686
};
8787

@@ -143,8 +143,8 @@ impl DirSchema {
143143
async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) -> Result<Arc<Self>> {
144144
let DirSchemaOpts { ext, dir, format } = opts;
145145
let mut tables = HashMap::new();
146-
let listdir = std::fs::read_dir(dir).unwrap();
147-
for res in listdir {
146+
let direntries = std::fs::read_dir(dir).unwrap();
147+
for res in direntries {
148148
let entry = res.unwrap();
149149
let filename = entry.file_name().to_str().unwrap().to_string();
150150
if !filename.ends_with(ext) {

datafusion/catalog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ version.workspace = true
2929

3030
[dependencies]
3131
arrow-schema = { workspace = true }
32-
async-trait = "0.1.41"
32+
async-trait = { workspace = true }
3333
datafusion-common = { workspace = true }
3434
datafusion-execution = { workspace = true }
3535
datafusion-expr = { workspace = true }

datafusion/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
mod column;
2121
mod dfschema;
22-
mod error;
2322
mod functional_dependencies;
2423
mod join_type;
2524
mod param_value;
@@ -33,6 +32,7 @@ pub mod alias;
3332
pub mod cast;
3433
pub mod config;
3534
pub mod display;
35+
pub mod error;
3636
pub mod file_options;
3737
pub mod format;
3838
pub mod hash_utils;

datafusion/core/example.parquet

-976 Bytes
Binary file not shown.

datafusion/core/src/dataframe/mod.rs

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,21 +1441,27 @@ impl DataFrame {
14411441
/// ```
14421442
pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
14431443
let window_func_exprs = find_window_exprs(&[expr.clone()]);
1444-
let plan = if window_func_exprs.is_empty() {
1445-
self.plan
1444+
1445+
let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() {
1446+
(self.plan, false, false)
14461447
} else {
1447-
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
1448+
(
1449+
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
1450+
true,
1451+
true,
1452+
)
14481453
};
14491454

14501455
let new_column = expr.alias(name);
1451-
let mut col_exists = false;
14521456
let mut fields: Vec<Expr> = plan
14531457
.schema()
14541458
.iter()
14551459
.map(|(qualifier, field)| {
14561460
if field.name() == name {
14571461
col_exists = true;
14581462
new_column.clone()
1463+
} else if window_func && qualifier.is_none() {
1464+
col(Column::from((qualifier, field))).alias(name)
14591465
} else {
14601466
col(Column::from((qualifier, field)))
14611467
}
@@ -1710,6 +1716,7 @@ mod tests {
17101716
WindowFrameUnits, WindowFunctionDefinition,
17111717
};
17121718
use datafusion_functions_aggregate::expr_fn::{array_agg, count_distinct};
1719+
use datafusion_functions_window::expr_fn::row_number;
17131720
use datafusion_physical_expr::expressions::Column;
17141721
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
17151722
use sqlparser::ast::NullTreatment;
@@ -2956,6 +2963,35 @@ mod tests {
29562963
Ok(())
29572964
}
29582965

2966+
// Test issue: https://github.com/apache/datafusion/issues/11982
2967+
// Window function was creating unwanted projection when using with_column() method.
2968+
#[tokio::test]
2969+
async fn test_window_function_with_column() -> Result<()> {
2970+
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
2971+
let ctx = SessionContext::new();
2972+
let df_impl = DataFrame::new(ctx.state(), df.plan.clone());
2973+
let func = row_number().alias("row_num");
2974+
2975+
// Should create an additional column with alias 'r' that has window func results
2976+
let df = df_impl.with_column("r", func)?.limit(0, Some(2))?;
2977+
assert_eq!(4, df.schema().fields().len());
2978+
2979+
let df_results = df.clone().collect().await?;
2980+
assert_batches_sorted_eq!(
2981+
[
2982+
"+----+----+-----+---+",
2983+
"| c1 | c2 | c3 | r |",
2984+
"+----+----+-----+---+",
2985+
"| c | 2 | 1 | 1 |",
2986+
"| d | 5 | -40 | 2 |",
2987+
"+----+----+-----+---+",
2988+
],
2989+
&df_results
2990+
);
2991+
2992+
Ok(())
2993+
}
2994+
29592995
// Test issue: https://github.com/apache/datafusion/issues/7790
29602996
// The join operation outputs two identical column names, but they belong to different relations.
29612997
#[tokio::test]
@@ -3010,13 +3046,12 @@ mod tests {
30103046
assert_eq!(
30113047
"\
30123048
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
3013-
\n Limit: skip=0, fetch=1\
3014-
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
3015-
\n Inner Join: t1.c1 = t2.c1\
3016-
\n SubqueryAlias: t1\
3017-
\n TableScan: aggregate_test_100 projection=[c1]\
3018-
\n SubqueryAlias: t2\
3019-
\n TableScan: aggregate_test_100 projection=[c1]",
3049+
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
3050+
\n Inner Join: t1.c1 = t2.c1\
3051+
\n SubqueryAlias: t1\
3052+
\n TableScan: aggregate_test_100 projection=[c1]\
3053+
\n SubqueryAlias: t2\
3054+
\n TableScan: aggregate_test_100 projection=[c1]",
30203055
format!("{}", df_with_column.clone().into_optimized_plan()?)
30213056
);
30223057

@@ -3204,13 +3239,12 @@ mod tests {
32043239

32053240
assert_eq!("\
32063241
Projection: t1.c1 AS AAA, t1.c2, t1.c3, t2.c1, t2.c2, t2.c3\
3207-
\n Limit: skip=0, fetch=1\
3208-
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
3209-
\n Inner Join: t1.c1 = t2.c1\
3210-
\n SubqueryAlias: t1\
3211-
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
3212-
\n SubqueryAlias: t2\
3213-
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
3242+
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
3243+
\n Inner Join: t1.c1 = t2.c1\
3244+
\n SubqueryAlias: t1\
3245+
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
3246+
\n SubqueryAlias: t2\
3247+
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
32143248
format!("{}", df_renamed.clone().into_optimized_plan()?)
32153249
);
32163250

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ impl ExecutionPlan for ArrowExec {
197197
Ok(self.projected_statistics.clone())
198198
}
199199

200+
fn fetch(&self) -> Option<usize> {
201+
self.base_config.limit
202+
}
203+
200204
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
201205
let new_config = self.base_config.clone().with_limit(limit);
202206

0 commit comments

Comments
 (0)