diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 0d832bb3062dd..05e1f284c5606 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -19,7 +19,7 @@ use super::super::conversion::*; use super::error::{DFSqlLogicTestError, Result}; use crate::engines::output::DFColumnType; use arrow::array::{Array, AsArray}; -use arrow::datatypes::Fields; +use arrow::datatypes::{Fields, Schema}; use arrow::util::display::ArrayFormatter; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; use datafusion::common::DataFusionError; @@ -29,43 +29,39 @@ use std::sync::LazyLock; /// Converts `batches` to a result as expected by sqllogictest. pub fn convert_batches( + schema: &Schema, batches: Vec, is_spark_path: bool, ) -> Result>> { - if batches.is_empty() { - Ok(vec![]) - } else { - let schema = batches[0].schema(); - let mut rows = vec![]; - for batch in batches { - // Verify schema - if !schema.contains(&batch.schema()) { - return Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal( - format!( - "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", - &schema, - batch.schema() - ), - ))); - } - - // Convert a single batch to a `Vec>` for comparison, flatten expanded rows, and normalize each. - let new_rows = (0..batch.num_rows()) - .map(|row| { - batch - .columns() - .iter() - .map(|col| cell_to_string(col, row, is_spark_path)) - .collect::>>() - }) - .collect::>>>()? - .into_iter() - .flat_map(expand_row) - .map(normalize_paths); - rows.extend(new_rows); + let mut rows = vec![]; + for batch in batches { + // Verify schema + if !schema.contains(&batch.schema()) { + return Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal( + format!( + "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", + &schema, + batch.schema() + ), + ))); } - Ok(rows) + + // Convert a single batch to a `Vec>` for comparison, flatten expanded rows, and normalize each. + let new_rows = (0..batch.num_rows()) + .map(|row| { + batch + .columns() + .iter() + .map(|col| cell_to_string(col, row, is_spark_path)) + .collect::>>() + }) + .collect::>>>()? + .into_iter() + .flat_map(expand_row) + .map(normalize_paths); + rows.extend(new_rows); } + Ok(rows) } /// special case rows that have newlines in them (like explain plans) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index a01ac7e2f9855..45deefdc9bbdf 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -124,11 +124,12 @@ async fn run_query( let df = ctx.sql(sql.into().as_str()).await?; let task_ctx = Arc::new(df.task_ctx()); let plan = df.create_physical_plan().await?; + let schema = plan.schema(); let stream = execute_stream(plan, task_ctx)?; let types = normalize::convert_schema_to_types(stream.schema().fields()); let results: Vec = collect(stream).await?; - let rows = normalize::convert_batches(results, is_spark_path)?; + let rows = normalize::convert_batches(&schema, results, is_spark_path)?; if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) diff --git a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs index 9d38547553529..2df93f0dede33 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs @@ -141,10 +141,11 @@ async fn run_query_substrait_round_trip( }; let physical_plan = state.create_physical_plan(&round_tripped_plan).await?; + let schema = physical_plan.schema(); let stream = execute_stream(physical_plan, task_ctx)?; let types = convert_schema_to_types(stream.schema().fields()); let results: Vec = collect(stream).await?; - let rows = convert_batches(results, false)?; + let rows = convert_batches(&schema, results, false)?; if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 233885618f832..6a1608d5d1348 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -334,90 +334,8 @@ select x, y, z from t3 union all by name select z, y, x from t4 order by x; a b c a b c - -# FIXME: The following should pass without error, but currently it is failing -# due to differing record batch schemas when the SLT runner collects results. -# This is due to the following issue: https://github.com/apache/datafusion/issues/15394#issue-2943811768 -# -# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2746563234 -query error +query TTTT rowsort select x, y, z from t3 union all by name select z, y, x, 'd' as zz from t3; ---- -DataFusion error: Internal error: Schema mismatch. Previously had -Schema { - fields: [ - Field { - name: "x", - data_type: Utf8View, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "y", - data_type: Utf8View, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "z", - data_type: Utf8View, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "zz", - data_type: Utf8, - nullable: false, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - ], - metadata: {}, -} - -Got: -Schema { - fields: [ - Field { - name: "x", - data_type: Utf8View, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "y", - data_type: Utf8View, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "z", - data_type: Utf8View, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "zz", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - ], - metadata: {}, -}. -This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +a b c NULL +a b c d