From 50e1be98f5673eb762a09dde311d7e1c807f74f2 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Fri, 14 Mar 2025 22:10:03 -0700 Subject: [PATCH 1/9] fix: Remove incorrect predicate to skip input wrapping when rewriting union inputs --- datafusion/expr/src/logical_plan/plan.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0dbce941a8d4..f9617beddd75 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2679,24 +2679,16 @@ impl Union { Ok(Union { inputs, schema }) } - /// When constructing a `UNION BY NAME`, we may need to wrap inputs + /// When constructing a `UNION BY NAME`, we need to wrap inputs /// in an additional `Projection` to account for absence of columns - /// in input schemas. + /// in input schemas or differing projection orders. fn rewrite_inputs_from_schema( - schema: &DFSchema, + schema: &Arc, inputs: Vec>, ) -> Result>> { let schema_width = schema.iter().count(); let mut wrapped_inputs = Vec::with_capacity(inputs.len()); for input in inputs { - // If the input plan's schema contains the same number of fields - // as the derived schema, then it does not to be wrapped in an - // additional `Projection`. - if input.schema().iter().count() == schema_width { - wrapped_inputs.push(input); - continue; - } - // Any columns that exist within the derived schema but do not exist // within an input's schema should be replaced with `NULL` aliased // to the appropriate column in the derived schema. @@ -2711,9 +2703,9 @@ impl Union { expr.push(Expr::Literal(ScalarValue::Null).alias(column.name())); } } - wrapped_inputs.push(Arc::new(LogicalPlan::Projection(Projection::try_new( - expr, input, - )?))); + wrapped_inputs.push(Arc::new(LogicalPlan::Projection( + Projection::try_new_with_schema(expr, input, Arc::clone(schema))?, + ))); } Ok(wrapped_inputs) From 5d9cca60d65e5f988c3c383c781de6903d190bf1 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Fri, 14 Mar 2025 22:10:13 -0700 Subject: [PATCH 2/9] chore: Add/update tests --- .../sqllogictest/test_files/union_by_name.slt | 36 ++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 63a43a36ff16..06494ea6b664 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -54,13 +54,13 @@ INSERT INTO t2 VALUES (2, 2), (4, 4); # Test binding query I -SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 3 query I -SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 1 @@ -70,13 +70,13 @@ SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; 3 query I -SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 3 query I -SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 1 @@ -287,3 +287,31 @@ SELECT '0' as c UNION ALL BY NAME SELECT 0 as c; ---- 0 0 + +# Regression tests for https://github.com/apache/datafusion/issues/15236 +# Ensure that the correct output is produced even if the width of an input node's +# schema is the same as the resulting schema width after the union is applied. + +statement ok +create table t3 (x varchar(255), y varchar(255), z varchar(255)); + +statement ok +create table t4 (x varchar(255), y varchar(255), z varchar(255)); + +statement ok +insert into t3 values ('a', 'b', 'c'); + +statement ok +insert into t4 values ('a', 'b', 'c'); + +query TTTT rowsort +select t3.x, t3.y, t3.z from t3 union by name select t3.z, t3.y, t3.x, 'd' as zz from t3; +---- +a b c NULL +a b c d + +query TTTT rowsort +select t3.x, t3.y, t3.z from t3 union by name select t4.z, t4.y, t4.x, 'd' as zz from t4; +---- +a b c NULL +a b c d From 3a7e3daa6ef6d10eccb7b6ff50345dcc15a490a4 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Mon, 17 Mar 2025 15:54:04 -0700 Subject: [PATCH 3/9] fix: SQL integration tests --- datafusion/sql/tests/sql_integration.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 023ea88cb55f..96670aea01f9 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1901,8 +1901,10 @@ fn union_by_name_different_columns() { \n Projection: NULL AS Int64(1), order_id\ \n Projection: orders.order_id\ \n TableScan: orders\ - \n Projection: orders.order_id, Int64(1)\ - \n TableScan: orders"; + \n Projection: Int64(1), order_id\ + \n Projection: orders.order_id, Int64(1)\ + \n TableScan: orders"; + quick_test(sql, expected); } @@ -1939,19 +1941,23 @@ fn union_all_by_name_different_columns() { \n Projection: NULL AS Int64(1), order_id\ \n Projection: orders.order_id\ \n TableScan: orders\ - \n Projection: orders.order_id, Int64(1)\ - \n TableScan: orders"; + \n Projection: Int64(1), order_id\ + \n Projection: orders.order_id, Int64(1)\ + \n TableScan: orders"; quick_test(sql, expected); } #[test] fn union_all_by_name_same_column_names() { let sql = "SELECT order_id from orders UNION ALL BY NAME SELECT order_id FROM orders"; - let expected = "Union\ - \n Projection: orders.order_id\ - \n TableScan: orders\ - \n Projection: orders.order_id\ - \n TableScan: orders"; + let expected = "\ + Union\ + \n Projection: order_id\ + \n Projection: orders.order_id\ + \n TableScan: orders\ + \n Projection: order_id\ + \n Projection: orders.order_id\ + \n TableScan: orders"; quick_test(sql, expected); } From 9c94a486ef755cdf12876e5f2ca5752716ebf912 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Mon, 17 Mar 2025 15:55:26 -0700 Subject: [PATCH 4/9] test: Add union all by name SLT tests --- .../sqllogictest/test_files/union_by_name.slt | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 06494ea6b664..2181734c374e 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -315,3 +315,21 @@ select t3.x, t3.y, t3.z from t3 union by name select t4.z, t4.y, t4.x, 'd' as zz ---- a b c NULL a b c d + +query TTT rowsort +select x, y, z from t3 union all by name select z, y, x from t3; +---- +a b c +a b c + +query TTT rowsort +select x, y, z from t3 union all by name select z, y, x from t4; +---- +a b c +a b c + +query TTT +select x, y, z from t3 union all by name select z, y, x from t4 order by x; +---- +a b c +a b c From c3cdcc7fcee5c9ba07f9d85cf197a5483019d37b Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Mon, 17 Mar 2025 15:58:49 -0700 Subject: [PATCH 5/9] test: Add problematic union all by name SLT test --- .../sqllogictest/test_files/union_by_name.slt | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 2181734c374e..76b36c38f1a3 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -333,3 +333,91 @@ select x, y, z from t3 union all by name select z, y, x from t4 order by x; ---- a b c a b c + + +# FIXME: The following should pass without error, but currently it is failing +# due to differing record batch schemas when the SLT runner collects results in +# normalize::convert_batches. +# +# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2730402547 +query error +select x, y, z from t3 union all by name select z, y, x, 'd' as zz from t3; +---- +DataFusion error: Internal error: Schema mismatch. Previously had +Schema { + fields: [ + Field { + name: "x", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + Field { + name: "y", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + Field { + name: "z", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + Field { + name: "zz", + data_type: Utf8, + nullable: false, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + ], + metadata: {}, +} + +Got: +Schema { + fields: [ + Field { + name: "x", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + Field { + name: "y", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + Field { + name: "z", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + Field { + name: "zz", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + ], + metadata: {}, +}. +This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker From 2f4e79617d0b3d2ccac6997ce8a9f75decbc1d02 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Mon, 17 Mar 2025 16:01:16 -0700 Subject: [PATCH 6/9] chore: styling nits --- datafusion/sql/tests/sql_integration.rs | 1 - datafusion/sqllogictest/test_files/union_by_name.slt | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 96670aea01f9..7cf1cdb0bbc0 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1904,7 +1904,6 @@ fn union_by_name_different_columns() { \n Projection: Int64(1), order_id\ \n Projection: orders.order_id, Int64(1)\ \n TableScan: orders"; - quick_test(sql, expected); } diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 76b36c38f1a3..93ece329bd8e 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -337,7 +337,7 @@ a b c # FIXME: The following should pass without error, but currently it is failing # due to differing record batch schemas when the SLT runner collects results in -# normalize::convert_batches. +# `normalize::convert_batches`. # # More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2730402547 query error From 2eb3c7982776ce34c79de01d727e661f16246e1e Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Sun, 23 Mar 2025 16:32:49 -0700 Subject: [PATCH 7/9] fix: Correct handling of nullability when field is not present in all inputs --- datafusion/expr/src/logical_plan/plan.rs | 30 ++++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index f9617beddd75..3368c736844d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2739,14 +2739,16 @@ impl Union { inputs: &[Arc], loose_types: bool, ) -> Result { - type FieldData<'a> = (&'a DataType, bool, Vec<&'a HashMap>); + type FieldData<'a> = + (&'a DataType, bool, Vec<&'a HashMap>, usize); // Prefer `BTreeMap` as it produces items in order by key when iterated over let mut cols: BTreeMap<&str, FieldData> = BTreeMap::new(); for input in inputs.iter() { for field in input.schema().fields() { match cols.entry(field.name()) { std::collections::btree_map::Entry::Occupied(mut occupied) => { - let (data_type, is_nullable, metadata) = occupied.get_mut(); + let (data_type, is_nullable, metadata, occurrences) = + occupied.get_mut(); if !loose_types && *data_type != field.data_type() { return plan_err!( "Found different types for field {}", @@ -2758,12 +2760,14 @@ impl Union { // If the field is nullable in any one of the inputs, // then the field in the final schema is also nullable. *is_nullable |= field.is_nullable(); + *occurrences += 1; } std::collections::btree_map::Entry::Vacant(vacant) => { vacant.insert(( field.data_type(), field.is_nullable(), vec![field.metadata()], + 1, )); } } @@ -2772,12 +2776,24 @@ impl Union { let union_fields = cols .into_iter() - .map(|(name, (data_type, is_nullable, unmerged_metadata))| { - let mut field = Field::new(name, data_type.clone(), is_nullable); - field.set_metadata(intersect_maps(unmerged_metadata)); + .map( + |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| { + // If the final number of occurrences of the field is less + // than the number of inputs (i.e. the field is missing from + // one or more inputs), then it must be treated as nullable. + let final_is_nullable = if occurrences == inputs.len() { + is_nullable + } else { + true + }; - (None, Arc::new(field)) - }) + let mut field = + Field::new(name, data_type.clone(), final_is_nullable); + field.set_metadata(intersect_maps(unmerged_metadata)); + + (None, Arc::new(field)) + }, + ) .collect::, _)>>(); let union_schema_metadata = From 31fae11f7a8aca265f97d88ab4733e4dfdf7aa60 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Mon, 24 Mar 2025 09:31:51 -0700 Subject: [PATCH 8/9] chore: Update fixme comment --- datafusion/sqllogictest/test_files/union_by_name.slt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 93ece329bd8e..fdd73710e50d 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -336,10 +336,10 @@ a b c # FIXME: The following should pass without error, but currently it is failing -# due to differing record batch schemas when the SLT runner collects results in -# `normalize::convert_batches`. +# due to differing record batch schemas when the SLT runner collects results. +# This is due to the following issue: https://github.com/apache/datafusion/issues/15394#issue-2943811768 # -# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2730402547 +# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2746563234 query error select x, y, z from t3 union all by name select z, y, x, 'd' as zz from t3; ---- From 0204f359e4fd545da029dfe992853cc62c776a37 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Mon, 24 Mar 2025 20:14:08 -0700 Subject: [PATCH 9/9] fix: handle ordering by order of inputs --- datafusion/expr/src/logical_plan/plan.rs | 45 +++++++------- datafusion/sql/tests/sql_integration.rs | 8 +-- .../sqllogictest/test_files/union_by_name.slt | 62 +++++++++---------- 3 files changed, 57 insertions(+), 58 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3368c736844d..004ae7ce315e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -18,7 +18,7 @@ //! Logical plan types use std::cmp::Ordering; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::{Arc, LazyLock}; @@ -2741,35 +2741,34 @@ impl Union { ) -> Result { type FieldData<'a> = (&'a DataType, bool, Vec<&'a HashMap>, usize); - // Prefer `BTreeMap` as it produces items in order by key when iterated over - let mut cols: BTreeMap<&str, FieldData> = BTreeMap::new(); + let mut cols: Vec<(&str, FieldData)> = Vec::new(); for input in inputs.iter() { for field in input.schema().fields() { - match cols.entry(field.name()) { - std::collections::btree_map::Entry::Occupied(mut occupied) => { - let (data_type, is_nullable, metadata, occurrences) = - occupied.get_mut(); - if !loose_types && *data_type != field.data_type() { - return plan_err!( - "Found different types for field {}", - field.name() - ); - } - - metadata.push(field.metadata()); - // If the field is nullable in any one of the inputs, - // then the field in the final schema is also nullable. - *is_nullable |= field.is_nullable(); - *occurrences += 1; + if let Some((_, (data_type, is_nullable, metadata, occurrences))) = + cols.iter_mut().find(|(name, _)| name == field.name()) + { + if !loose_types && *data_type != field.data_type() { + return plan_err!( + "Found different types for field {}", + field.name() + ); } - std::collections::btree_map::Entry::Vacant(vacant) => { - vacant.insert(( + + metadata.push(field.metadata()); + // If the field is nullable in any one of the inputs, + // then the field in the final schema is also nullable. + *is_nullable |= field.is_nullable(); + *occurrences += 1; + } else { + cols.push(( + field.name(), + ( field.data_type(), field.is_nullable(), vec![field.metadata()], 1, - )); - } + ), + )); } } } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 7cf1cdb0bbc0..5fd1f7362a18 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1898,10 +1898,10 @@ fn union_by_name_different_columns() { let expected = "\ Distinct:\ \n Union\ - \n Projection: NULL AS Int64(1), order_id\ + \n Projection: order_id, NULL AS Int64(1)\ \n Projection: orders.order_id\ \n TableScan: orders\ - \n Projection: Int64(1), order_id\ + \n Projection: order_id, Int64(1)\ \n Projection: orders.order_id, Int64(1)\ \n TableScan: orders"; quick_test(sql, expected); @@ -1937,10 +1937,10 @@ fn union_all_by_name_different_columns() { "SELECT order_id from orders UNION ALL BY NAME SELECT order_id, 1 FROM orders"; let expected = "\ Union\ - \n Projection: NULL AS Int64(1), order_id\ + \n Projection: order_id, NULL AS Int64(1)\ \n Projection: orders.order_id\ \n TableScan: orders\ - \n Projection: Int64(1), order_id\ + \n Projection: order_id, Int64(1)\ \n Projection: orders.order_id, Int64(1)\ \n TableScan: orders"; quick_test(sql, expected); diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index fdd73710e50d..4e0a54718082 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -88,38 +88,38 @@ SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x; query II (SELECT x FROM t1 UNION ALL SELECT x FROM t1) UNION BY NAME SELECT 5 ORDER BY x; ---- -NULL 1 -NULL 3 -5 NULL +1 NULL +3 NULL +NULL 5 query II (SELECT x FROM t1 UNION ALL SELECT x FROM t1) UNION ALL BY NAME SELECT 5 ORDER BY x; ---- -NULL 1 -NULL 1 -NULL 3 -NULL 3 -NULL 3 -NULL 3 -5 NULL +1 NULL +1 NULL +3 NULL +3 NULL +3 NULL +3 NULL +NULL 5 query II (SELECT x FROM t1 UNION ALL SELECT y FROM t1) UNION BY NAME SELECT 5 ORDER BY x; ---- -NULL 1 -NULL 3 -5 NULL +1 NULL +3 NULL +NULL 5 query II (SELECT x FROM t1 UNION ALL SELECT y FROM t1) UNION ALL BY NAME SELECT 5 ORDER BY x; ---- -NULL 1 -NULL 1 -NULL 3 -NULL 3 -NULL 3 -NULL 3 -5 NULL +1 NULL +1 NULL +3 NULL +3 NULL +3 NULL +3 NULL +NULL 5 # Ambiguous name @@ -152,22 +152,22 @@ NULL 4 # Limit query III -SELECT 1 UNION BY NAME SELECT * FROM unnest(range(2, 100)) UNION BY NAME SELECT 999 ORDER BY 3, 1 LIMIT 5; +SELECT 1 UNION BY NAME SELECT * FROM unnest(range(2, 100)) UNION BY NAME SELECT 999 ORDER BY 3, 1, 2 LIMIT 5; ---- -NULL NULL 2 -NULL NULL 3 -NULL NULL 4 -NULL NULL 5 -NULL NULL 6 +NULL NULL 999 +1 NULL NULL +NULL 2 NULL +NULL 3 NULL +NULL 4 NULL query III SELECT 1 UNION ALL BY NAME SELECT * FROM unnest(range(2, 100)) UNION ALL BY NAME SELECT 999 ORDER BY 3, 1 LIMIT 5; ---- -NULL NULL 2 -NULL NULL 3 -NULL NULL 4 -NULL NULL 5 -NULL NULL 6 +NULL NULL 999 +1 NULL NULL +NULL 2 NULL +NULL 3 NULL +NULL 4 NULL # Order by