From e21dbcebb1dc149a06cbd5e2d75301575677e490 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Jan 2025 09:50:10 -0800 Subject: [PATCH 01/16] Support marking columns as system columns via metadata --- datafusion-examples/Cargo.toml | 1 + .../examples/system_columns.rs | 177 ++++++++++++++++++ datafusion/catalog/src/information_schema.rs | 5 + datafusion/core/tests/sql/select.rs | 155 +++++++++++++++ datafusion/expr/src/utils.rs | 75 +++++++- 5 files changed, 411 insertions(+), 2 deletions(-) create mode 100644 datafusion-examples/examples/system_columns.rs diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index d90ec3333cb9..b1430dd3afaf 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -75,6 +75,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tonic = "0.12.1" url = { workspace = true } uuid = "1.7" +itertools = { workspace = true } [target.'cfg(not(target_os = "windows"))'.dev-dependencies] nix = { version = "0.28.0", features = ["fs"] } diff --git a/datafusion-examples/examples/system_columns.rs b/datafusion-examples/examples/system_columns.rs new file mode 100644 index 000000000000..7d8610441273 --- /dev/null +++ b/datafusion-examples/examples/system_columns.rs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::array::record_batch; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; + +use datafusion::{assert_batches_eq, prelude::*}; + +/// This example shows how to mark fields as system columns. +/// System columns are columns which meant to be semi-public stores of the internal details of the table. +/// For example, `ctid` in Postgres would be considered a metadata column +/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. +/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). +/// +/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata +/// to `true`. +/// +/// As an example of how this works in practice, if you have the following Postgres table: +/// +/// ```sql +/// CREATE TABLE t (x int); +/// INSERT INTO t VALUES (1); +/// ``` +/// +/// And you do a `SELECT * FROM t`, you would get the following schema: +/// +/// ```text +/// +---+ +/// | x | +/// +---+ +/// | 1 | +/// +---+ +/// ``` +/// +/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): +/// +/// ```text +/// +-----+---+ +/// | ctid| x | +/// +-----+---+ +/// | 0 | 1 | +/// +-----+---+ +/// ``` +#[tokio::main] +async fn main() { + let batch = record_batch!( + ("a", Int32, [1, 2, 3]), + ("b", Utf8, ["foo", "bar", "baz"]), + ("_row_num", UInt32, [1, 2, 3]) + ).unwrap(); + let batch = batch.with_schema( + Arc::new( + Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("_row_num", DataType::UInt32, true).with_metadata(HashMap::from_iter([ + ("datafusion.system_column".to_string(), "true".to_string()), + ])), + ]) + ) + ).unwrap(); + + let ctx = SessionContext::new(); + let _ = ctx.register_batch("t", batch); + + let res = ctx.sql("SELECT a, b FROM t").await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | foo |", + "| 2 | bar |", + "| 3 | baz |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx.sql("SELECT _row_num FROM t").await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+----------+", + "| _row_num |", + "+----------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----------+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx.sql("SELECT * FROM t").await.unwrap().collect().await.unwrap(); + // does not include _row_num + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | foo |", + "| 2 | bar |", + "| 3 | baz |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx.sql("SELECT *, _row_num FROM t").await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+----------+", + "| a | b | _row_num |", + "+---+-----+----------+", + "| 1 | foo | 1 |", + "| 2 | bar | 2 |", + "| 3 | baz | 3 |", + "+---+-----+----------+", + ]; + assert_batches_eq!(expected, &res); + + + + let res = ctx.sql("SELECT t._row_num FROM t").await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+----------+", + "| _row_num |", + "+----------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----------+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx.sql("SELECT t.* FROM t").await.unwrap().collect().await.unwrap(); + // does not include _row_num + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | foo |", + "| 2 | bar |", + "| 3 | baz |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx.sql("SELECT t.*, _row_num FROM t").await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+----------+", + "| a | b | _row_num |", + "+---+-----+----------+", + "| 1 | foo | 1 |", + "| 2 | bar | 2 |", + "| 3 | baz | 3 |", + "+---+-----+----------+", + ]; + assert_batches_eq!(expected, &res); +} diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index e68e636989f8..38766c64d8ca 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -190,6 +190,11 @@ impl InformationSchemaConfig { for (field_position, field) in table.schema().fields().iter().enumerate() { + if let Some(v) = field.metadata().get("datafusion.system_column") { + if v.to_lowercase().starts_with("t") { + continue; + } + } builder.add_column( &catalog_name, &schema_name, diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 6e81bf6410c1..0e8db9dcdc72 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use super::*; use datafusion_common::ScalarValue; @@ -350,3 +352,156 @@ async fn test_version_function() { assert_eq!(version.value(0), expected_version); } + +#[tokio::test] +async fn test_select_system_column() { + let batch = record_batch!( + ("id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9000, 100, 1000]), + ("_rowid", UInt32, [0, 1, 2]), + ("_file", Utf8, ["file-0", "file-1", "file-2"]) + ).unwrap(); + let batch = batch.with_schema( + Arc::new( + Schema::new(vec![ + Field::new("id", DataType::UInt8, true), + Field::new("bank_account", DataType::UInt64, true), + Field::new("_rowid", DataType::UInt32, true).with_metadata(HashMap::from_iter([ + ("datafusion.system_column".to_string(), "true".to_string()), + ])), + Field::new("_file", DataType::Utf8, true).with_metadata(HashMap::from_iter([ + ("datafusion.system_column".to_string(), "true".to_string()), + ])), + ]) + ) + ).unwrap(); + + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let _ = ctx.register_batch("test", batch); + + let select0 = "SELECT * FROM test order by id"; + let df = ctx.sql(select0).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 100 |", + "| 3 | 1000 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select1 = "SELECT _rowid FROM test order by _rowid"; + let df = ctx.sql(select1).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select2 = "SELECT _rowid, id FROM test order by _rowid"; + let df = ctx.sql(select2).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0"; + let df = ctx.sql(select3).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select4 = "SELECT _rowid FROM test LIMIT 1"; + let df = ctx.sql(select4).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; + let df = ctx.sql(select5).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 1 | 2 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select6 = "SELECT _rowid, _file FROM test order by _rowid"; + let df = ctx.sql(select6).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+--------+--------+", + "| _rowid | _file |", + "+--------+--------+", + "| 0 | file-0 |", + "| 1 | file-1 |", + "| 2 | file-2 |", + "+--------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select6 = "SELECT id FROM test order by _rowid desc"; + let df = ctx.sql(select6).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+----+", + "| id |", + "+----+", + "| 3 |", + "| 2 |", + "| 1 |", + "+----+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql(show_columns).await.unwrap(); + let batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+--------------+-----------+", + "| column_name | data_type |", + "+--------------+-----------+", + "| id | UInt8 |", + "| bank_account | UInt64 |", + "+--------------+-----------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); +} diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 049926fb0bcd..faacfa17107e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -357,6 +357,72 @@ fn get_excluded_columns( Ok(result) } + +/// Find system columns in the schema, if any. +/// +/// System columns are columns which meant to be semi-public stores of the internal details of the table. +/// For example, `ctid` in Postgres would be considered a metadata column +/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. +/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). +/// +/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata +/// to `true`. +/// +/// As an example of how this works in practice, if you have the following Postgres table: +/// +/// ```sql +/// CREATE TABLE t (x int); +/// INSERT INTO t VALUES (1); +/// ``` +/// +/// And you do a `SELECT * FROM t`, you would get the following schema: +/// +/// ```text +/// +---+ +/// | x | +/// +---+ +/// | 1 | +/// +---+ +/// ``` +/// +/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): +/// +/// ```text +/// +-----+---+ +/// | ctid| x | +/// +-----+---+ +/// | 0 | 1 | +/// +-----+---+ +/// ``` +/// +/// Returns: A list of `Column`s that are system columns. +fn get_system_columns( + schema: &DFSchema, + qualifier: Option<&TableReference>, +) -> Result> { + let mut result = vec![]; + // exclude columns with `datafusion.system_column` metadata set to true + if let Some(qualifier) = qualifier { + for field in schema.fields_with_qualified(qualifier) { + if let Some(v) = field.metadata().get("datafusion.system_column") { + if !v.is_empty() && v.to_lowercase().starts_with("t") { + result.push(Column::new(Some(qualifier.clone()), field.name())); + } + } + } + } else { + for field in schema.fields() { + if let Some(v) = field.metadata().get("datafusion.system_column") { + if !v.is_empty() && v.to_lowercase().starts_with("t") { + let (qualifier, field) = schema.qualified_field_with_unqualified_name(field.name())?; + result.push(Column::new(qualifier.cloned(), field.name())); + } + } + } + } + Ok(result) +} + /// Returns all `Expr`s in the schema, except the `Column`s in the `columns_to_skip` fn get_exprs_except_skipped( schema: &DFSchema, @@ -413,6 +479,7 @@ pub fn expand_wildcard( wildcard_options: Option<&WildcardOptions>, ) -> Result> { let mut columns_to_skip = exclude_using_columns(plan)?; + columns_to_skip.extend(get_system_columns(schema, None)?); let excluded_columns = if let Some(WildcardOptions { exclude: opt_exclude, except: opt_except, @@ -467,6 +534,7 @@ pub fn expand_qualified_wildcard( }; // Add each excluded `Column` to columns_to_skip let mut columns_to_skip = HashSet::new(); + columns_to_skip.extend(get_system_columns(schema, Some(qualifier))?); columns_to_skip.extend(excluded_columns); Ok(get_exprs_except_skipped( &qualified_dfschema, @@ -706,6 +774,7 @@ pub fn exprlist_to_fields<'a>( // Look for exact match in plan's output schema let wildcard_schema = find_base_plan(plan).schema(); let input_schema = plan.schema(); + let exprs = exprs.into_iter().collect::>(); let result = exprs .into_iter() .map(|e| match e { @@ -718,6 +787,7 @@ pub fn exprlist_to_fields<'a>( wildcard_schema, None, )?); + excluded.extend(get_system_columns(wildcard_schema, None)?); Ok(wildcard_schema .iter() .filter(|(q, f)| { @@ -727,7 +797,7 @@ pub fn exprlist_to_fields<'a>( .collect::>()) } Some(qualifier) => { - let excluded: Vec = get_excluded_columns( + let mut excluded: Vec = get_excluded_columns( options.exclude.as_ref(), options.except.as_ref(), wildcard_schema, @@ -736,11 +806,12 @@ pub fn exprlist_to_fields<'a>( .into_iter() .map(|c| c.flat_name()) .collect(); + excluded.extend(get_system_columns(wildcard_schema, None)?.into_iter().map(|c| c.flat_name())); Ok(wildcard_schema .fields_with_qualified(qualifier) .into_iter() .filter_map(|field| { - let flat_name = format!("{}.{}", qualifier, field.name()); + let flat_name = Column::new(Some(qualifier.clone()), field.name()).flat_name(); if excluded.contains(&flat_name) { None } else { From 72c36a237bcd4f3ea2bdacfd224edf23c8f44154 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:08:22 -0800 Subject: [PATCH 02/16] fmt --- datafusion-examples/Cargo.toml | 2 +- .../examples/system_columns.rs | 93 +++++++++++++------ datafusion/catalog/src/information_schema.rs | 7 +- datafusion/common/src/dfschema.rs | 67 +++++++++++++ datafusion/common/src/lib.rs | 2 +- datafusion/core/tests/sql/select.rs | 40 ++++---- datafusion/expr/src/utils.rs | 44 ++++----- 7 files changed, 178 insertions(+), 77 deletions(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b1430dd3afaf..b34af6f05966 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -65,6 +65,7 @@ datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } @@ -75,7 +76,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tonic = "0.12.1" url = { workspace = true } uuid = "1.7" -itertools = { workspace = true } [target.'cfg(not(target_os = "windows"))'.dev-dependencies] nix = { version = "0.28.0", features = ["fs"] } diff --git a/datafusion-examples/examples/system_columns.rs b/datafusion-examples/examples/system_columns.rs index 7d8610441273..46ae8746daed 100644 --- a/datafusion-examples/examples/system_columns.rs +++ b/datafusion-examples/examples/system_columns.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::sync::Arc; use arrow::array::record_batch; use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::FieldExt; use datafusion::{assert_batches_eq, prelude::*}; /// This example shows how to mark fields as system columns. @@ -31,16 +31,16 @@ use datafusion::{assert_batches_eq, prelude::*}; /// /// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata /// to `true`. -/// +/// /// As an example of how this works in practice, if you have the following Postgres table: -/// +/// /// ```sql /// CREATE TABLE t (x int); /// INSERT INTO t VALUES (1); /// ``` -/// +/// /// And you do a `SELECT * FROM t`, you would get the following schema: -/// +/// /// ```text /// +---+ /// | x | @@ -48,9 +48,9 @@ use datafusion::{assert_batches_eq, prelude::*}; /// | 1 | /// +---+ /// ``` -/// +/// /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): -/// +/// /// ```text /// +-----+---+ /// | ctid| x | @@ -64,23 +64,26 @@ async fn main() { ("a", Int32, [1, 2, 3]), ("b", Utf8, ["foo", "bar", "baz"]), ("_row_num", UInt32, [1, 2, 3]) - ).unwrap(); - let batch = batch.with_schema( - Arc::new( - Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, true), - Field::new("_row_num", DataType::UInt32, true).with_metadata(HashMap::from_iter([ - ("datafusion.system_column".to_string(), "true".to_string()), - ])), - ]) - ) - ).unwrap(); + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("_row_num", DataType::UInt32, true).as_system_column(), + ]))) + .unwrap(); let ctx = SessionContext::new(); let _ = ctx.register_batch("t", batch); - let res = ctx.sql("SELECT a, b FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT a, b FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); #[rustfmt::skip] let expected: Vec<&str> = vec![ "+---+-----+", @@ -93,7 +96,13 @@ async fn main() { ]; assert_batches_eq!(expected, &res); - let res = ctx.sql("SELECT _row_num FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT _row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); #[rustfmt::skip] let expected: Vec<&str> = vec![ "+----------+", @@ -106,7 +115,13 @@ async fn main() { ]; assert_batches_eq!(expected, &res); - let res = ctx.sql("SELECT * FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT * FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); // does not include _row_num #[rustfmt::skip] let expected: Vec<&str> = vec![ @@ -120,7 +135,13 @@ async fn main() { ]; assert_batches_eq!(expected, &res); - let res = ctx.sql("SELECT *, _row_num FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT *, _row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); #[rustfmt::skip] let expected: Vec<&str> = vec![ "+---+-----+----------+", @@ -133,9 +154,13 @@ async fn main() { ]; assert_batches_eq!(expected, &res); - - - let res = ctx.sql("SELECT t._row_num FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT t._row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); #[rustfmt::skip] let expected: Vec<&str> = vec![ "+----------+", @@ -148,7 +173,13 @@ async fn main() { ]; assert_batches_eq!(expected, &res); - let res = ctx.sql("SELECT t.* FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT t.* FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); // does not include _row_num #[rustfmt::skip] let expected: Vec<&str> = vec![ @@ -162,7 +193,13 @@ async fn main() { ]; assert_batches_eq!(expected, &res); - let res = ctx.sql("SELECT t.*, _row_num FROM t").await.unwrap().collect().await.unwrap(); + let res = ctx + .sql("SELECT t.*, _row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); #[rustfmt::skip] let expected: Vec<&str> = vec![ "+---+-----+----------+", diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 38766c64d8ca..7b0329ec5fa2 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -32,6 +32,7 @@ use datafusion_common::config::{ConfigEntry, ConfigOptions}; use datafusion_common::error::Result; use datafusion_common::DataFusionError; use datafusion_execution::TaskContext; +use datafusion_common::FieldExt; use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; use datafusion_expr::{TableType, Volatility}; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -190,10 +191,8 @@ impl InformationSchemaConfig { for (field_position, field) in table.schema().fields().iter().enumerate() { - if let Some(v) = field.metadata().get("datafusion.system_column") { - if v.to_lowercase().starts_with("t") { - continue; - } + if field.is_system_column() { + continue; } builder.add_column( &catalog_name, diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 302d515e027e..d71470c0a7fe 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1056,6 +1056,73 @@ pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String } } +/// Extension trait to manage DataFusion specific metadata on Arrow fields. +pub trait FieldExt { + /// Check if this field is a system columns. + /// + /// System columns are columns which meant to be semi-public stores of the internal details of the table. + /// For example, `ctid` in Postgres would be considered a metadata column + /// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. + /// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). + /// + /// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata + /// to `true`. + /// + /// As an example of how this works in practice, if you have the following Postgres table: + /// + /// ```sql + /// CREATE TABLE t (x int); + /// INSERT INTO t VALUES (1); + /// ``` + /// + /// And you do a `SELECT * FROM t`, you would get the following schema: + /// + /// ```text + /// +---+ + /// | x | + /// +---+ + /// | 1 | + /// +---+ + /// ``` + /// + /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): + /// + /// ```text + /// +-----+---+ + /// | ctid| x | + /// +-----+---+ + /// | 0 | 1 | + /// +-----+---+ + /// ``` + fn is_system_column(&self) -> bool; + + /// Mark this field as a system column. + /// + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn as_system_column(self) -> Self; +} + +/// See [`FieldExt`]. +impl FieldExt for Field { + /// Check if this field is a system column. + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn is_system_column(&self) -> bool { + self.metadata() + .get("datafusion.system_column") + .map(|v| v.to_lowercase().starts_with("t")) + .unwrap_or(false) + } + + /// Mark this field as a system column. + /// See [`FieldExt::as_system_column`] for more information on what a system column is. + fn as_system_column(mut self) -> Self { + let mut metadata = self.metadata().clone(); + metadata.insert("datafusion.system_column".to_string(), "true".to_string()); + self.set_metadata(metadata); + self + } +} + #[cfg(test)] mod tests { use crate::assert_contains; diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 77e8cd60ede2..f83807303d43 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -52,7 +52,7 @@ pub mod utils; pub use arrow; pub use column::Column; pub use dfschema::{ - qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, + qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldExt, SchemaExt, ToDFSchema, }; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 0e8db9dcdc72..f2f48deccac5 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -360,21 +360,23 @@ async fn test_select_system_column() { ("bank_account", UInt64, [9000, 100, 1000]), ("_rowid", UInt32, [0, 1, 2]), ("_file", Utf8, ["file-0", "file-1", "file-2"]) - ).unwrap(); - let batch = batch.with_schema( - Arc::new( - Schema::new(vec![ - Field::new("id", DataType::UInt8, true), - Field::new("bank_account", DataType::UInt64, true), - Field::new("_rowid", DataType::UInt32, true).with_metadata(HashMap::from_iter([ - ("datafusion.system_column".to_string(), "true".to_string()), - ])), - Field::new("_file", DataType::Utf8, true).with_metadata(HashMap::from_iter([ - ("datafusion.system_column".to_string(), "true".to_string()), - ])), - ]) - ) - ).unwrap(); + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, true), + Field::new("bank_account", DataType::UInt64, true), + Field::new("_rowid", DataType::UInt32, true).with_metadata( + HashMap::from_iter([( + "datafusion.system_column".to_string(), + "true".to_string(), + )]), + ), + Field::new("_file", DataType::Utf8, true).with_metadata(HashMap::from_iter( + [("datafusion.system_column".to_string(), "true".to_string())], + )), + ]))) + .unwrap(); let ctx = SessionContext::new_with_config( SessionConfig::new().with_information_schema(true), @@ -477,13 +479,7 @@ async fn test_select_system_column() { let df = ctx.sql(select6).await.unwrap(); let batchs = df.collect().await.unwrap(); let expected = [ - "+----+", - "| id |", - "+----+", - "| 3 |", - "| 2 |", - "| 1 |", - "+----+", + "+----+", "| id |", "+----+", "| 3 |", "| 2 |", "| 1 |", "+----+", ]; assert_batches_sorted_eq!(expected, &batchs); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index faacfa17107e..be1a4f573bb8 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{ }; use datafusion_common::utils::get_at_indices; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap, - Result, TableReference, + internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, FieldExt, + HashMap, Result, TableReference, }; use indexmap::IndexSet; @@ -357,9 +357,8 @@ fn get_excluded_columns( Ok(result) } - /// Find system columns in the schema, if any. -/// +/// /// System columns are columns which meant to be semi-public stores of the internal details of the table. /// For example, `ctid` in Postgres would be considered a metadata column /// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. @@ -367,16 +366,16 @@ fn get_excluded_columns( /// /// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata /// to `true`. -/// +/// /// As an example of how this works in practice, if you have the following Postgres table: -/// +/// /// ```sql /// CREATE TABLE t (x int); /// INSERT INTO t VALUES (1); /// ``` -/// +/// /// And you do a `SELECT * FROM t`, you would get the following schema: -/// +/// /// ```text /// +---+ /// | x | @@ -384,9 +383,9 @@ fn get_excluded_columns( /// | 1 | /// +---+ /// ``` -/// +/// /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): -/// +/// /// ```text /// +-----+---+ /// | ctid| x | @@ -404,19 +403,16 @@ fn get_system_columns( // exclude columns with `datafusion.system_column` metadata set to true if let Some(qualifier) = qualifier { for field in schema.fields_with_qualified(qualifier) { - if let Some(v) = field.metadata().get("datafusion.system_column") { - if !v.is_empty() && v.to_lowercase().starts_with("t") { - result.push(Column::new(Some(qualifier.clone()), field.name())); - } + if field.is_system_column() { + result.push(Column::new(Some(qualifier.clone()), field.name())); } } } else { for field in schema.fields() { - if let Some(v) = field.metadata().get("datafusion.system_column") { - if !v.is_empty() && v.to_lowercase().starts_with("t") { - let (qualifier, field) = schema.qualified_field_with_unqualified_name(field.name())?; - result.push(Column::new(qualifier.cloned(), field.name())); - } + if field.is_system_column() { + let (qualifier, field) = + schema.qualified_field_with_unqualified_name(field.name())?; + result.push(Column::new(qualifier.cloned(), field.name())); } } } @@ -806,12 +802,18 @@ pub fn exprlist_to_fields<'a>( .into_iter() .map(|c| c.flat_name()) .collect(); - excluded.extend(get_system_columns(wildcard_schema, None)?.into_iter().map(|c| c.flat_name())); + excluded.extend( + get_system_columns(wildcard_schema, None)? + .into_iter() + .map(|c| c.flat_name()), + ); Ok(wildcard_schema .fields_with_qualified(qualifier) .into_iter() .filter_map(|field| { - let flat_name = Column::new(Some(qualifier.clone()), field.name()).flat_name(); + let flat_name = + Column::new(Some(qualifier.clone()), field.name()) + .flat_name(); if excluded.contains(&flat_name) { None } else { From 251b6b9466c0783aa1f50d132798d2364d25e597 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:09:28 -0800 Subject: [PATCH 03/16] use trait --- datafusion/core/tests/sql/select.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index f2f48deccac5..c6e006039f9b 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - use super::*; use datafusion_common::ScalarValue; @@ -366,15 +364,8 @@ async fn test_select_system_column() { .with_schema(Arc::new(Schema::new(vec![ Field::new("id", DataType::UInt8, true), Field::new("bank_account", DataType::UInt64, true), - Field::new("_rowid", DataType::UInt32, true).with_metadata( - HashMap::from_iter([( - "datafusion.system_column".to_string(), - "true".to_string(), - )]), - ), - Field::new("_file", DataType::Utf8, true).with_metadata(HashMap::from_iter( - [("datafusion.system_column".to_string(), "true".to_string())], - )), + Field::new("_rowid", DataType::UInt32, true).as_system_column(), + Field::new("_file", DataType::Utf8, true).as_system_column(), ]))) .unwrap(); From 12c48479908a4f9642b18576c3fc8bdd1c8c95e2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:10:04 -0800 Subject: [PATCH 04/16] import --- datafusion/core/tests/sql/select.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index c6e006039f9b..0722a50f33d9 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -16,7 +16,7 @@ // under the License. use super::*; -use datafusion_common::ScalarValue; +use datafusion_common::{FieldExt, ScalarValue}; #[tokio::test] async fn test_list_query_parameters() -> Result<()> { From c82f55607d5880b713c8de17f55d00cad92fb229 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:12:42 -0600 Subject: [PATCH 05/16] remove unnecessary collect --- datafusion/expr/src/utils.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index be1a4f573bb8..907f77105fd2 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -770,7 +770,6 @@ pub fn exprlist_to_fields<'a>( // Look for exact match in plan's output schema let wildcard_schema = find_base_plan(plan).schema(); let input_schema = plan.schema(); - let exprs = exprs.into_iter().collect::>(); let result = exprs .into_iter() .map(|e| match e { From 76b23466a36ad8a1e15498716d40f8371e095c52 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:26:31 -0800 Subject: [PATCH 06/16] rename method --- datafusion-examples/examples/system_columns.rs | 2 +- datafusion/common/src/dfschema.rs | 6 +++--- datafusion/core/tests/sql/select.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion-examples/examples/system_columns.rs b/datafusion-examples/examples/system_columns.rs index 46ae8746daed..2385d1ae60cb 100644 --- a/datafusion-examples/examples/system_columns.rs +++ b/datafusion-examples/examples/system_columns.rs @@ -70,7 +70,7 @@ async fn main() { .with_schema(Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Utf8, true), - Field::new("_row_num", DataType::UInt32, true).as_system_column(), + Field::new("_row_num", DataType::UInt32, true).to_system_column(), ]))) .unwrap(); diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d71470c0a7fe..ff15ce86c428 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1099,7 +1099,7 @@ pub trait FieldExt { /// Mark this field as a system column. /// /// See [`FieldExt::is_system_column`] for more information on what a system column is. - fn as_system_column(self) -> Self; + fn to_system_column(self) -> Self; } /// See [`FieldExt`]. @@ -1114,8 +1114,8 @@ impl FieldExt for Field { } /// Mark this field as a system column. - /// See [`FieldExt::as_system_column`] for more information on what a system column is. - fn as_system_column(mut self) -> Self { + /// See [`FieldExt::to_system_column`] for more information on what a system column is. + fn to_system_column(mut self) -> Self { let mut metadata = self.metadata().clone(); metadata.insert("datafusion.system_column".to_string(), "true".to_string()); self.set_metadata(metadata); diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 0722a50f33d9..b76259a29d15 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -364,8 +364,8 @@ async fn test_select_system_column() { .with_schema(Arc::new(Schema::new(vec![ Field::new("id", DataType::UInt8, true), Field::new("bank_account", DataType::UInt64, true), - Field::new("_rowid", DataType::UInt32, true).as_system_column(), - Field::new("_file", DataType::Utf8, true).as_system_column(), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + Field::new("_file", DataType::Utf8, true).to_system_column(), ]))) .unwrap(); From ee88d1d5850e80ffb1f4694b8be2edba8f1e8da7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 30 Jan 2025 06:55:32 -0800 Subject: [PATCH 07/16] add test for conflicting column names --- datafusion/core/tests/sql/select.rs | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index b76259a29d15..8091e0cb6156 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -491,4 +491,35 @@ async fn test_select_system_column() { "+--------------+-----------+", ]; assert_batches_sorted_eq!(expected, &batchs); + + let batch = record_batch!( + ("other_id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9, 10, 11]), + ("_row_id", UInt32, [10, 11, 12]) // not a system column! + ) + .unwrap(); + let _ = ctx.register_batch("test2", batch); + + // Normally _row_id would be a name conflict + // But when it's a conflict between a metadata column and a non-metadata column, the non metadata column should be used + let select7 = + "SELECT id, other_id, _row_id FROM test INNER JOIN test2 ON id = other_id"; + let df = ctx.sql(select7).await.unwrap(); + let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _row_id |", + "+----+----------+---------+", + "| 1 | 1 | 10 |", + "| 2 | 2 | 11 |", + "| 3 | 3 | 12 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + // Demonstrate that for other columns we get a conflict + let select7 = + "SELECT id, other_id, bank_account FROM test INNER JOIN test2 ON id = other_id"; + assert!(ctx.sql(select7).await.is_err()); } From 6d0d268184cc90b778c37cfcbf4b83c673e88208 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 30 Jan 2025 06:59:13 -0800 Subject: [PATCH 08/16] fmt --- datafusion/core/tests/sql/select.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 8091e0cb6156..48c0752e7a0f 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -495,7 +495,7 @@ async fn test_select_system_column() { let batch = record_batch!( ("other_id", UInt8, [1, 2, 3]), ("bank_account", UInt64, [9, 10, 11]), - ("_row_id", UInt32, [10, 11, 12]) // not a system column! + ("_row_id", UInt32, [10, 11, 12]) // not a system column! ) .unwrap(); let _ = ctx.register_batch("test2", batch); From 72f2065035fc55302bc9758b857143b8a23095c0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 30 Jan 2025 18:06:16 -0800 Subject: [PATCH 09/16] more tests --- datafusion/core/tests/sql/select.rs | 108 +++++++++++++++++++++++++--- 1 file changed, 100 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 48c0752e7a0f..50f90f92b476 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -377,6 +377,7 @@ async fn test_select_system_column() { let select0 = "SELECT * FROM test order by id"; let df = ctx.sql(select0).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+----+--------------+", "| id | bank_account |", @@ -391,6 +392,7 @@ async fn test_select_system_column() { let select1 = "SELECT _rowid FROM test order by _rowid"; let df = ctx.sql(select1).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+--------+", "| _rowid |", @@ -405,6 +407,7 @@ async fn test_select_system_column() { let select2 = "SELECT _rowid, id FROM test order by _rowid"; let df = ctx.sql(select2).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+--------+----+", "| _rowid | id |", @@ -419,6 +422,7 @@ async fn test_select_system_column() { let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0"; let df = ctx.sql(select3).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+--------+----+", "| _rowid | id |", @@ -431,6 +435,7 @@ async fn test_select_system_column() { let select4 = "SELECT _rowid FROM test LIMIT 1"; let df = ctx.sql(select4).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+--------+", "| _rowid |", @@ -443,6 +448,7 @@ async fn test_select_system_column() { let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; let df = ctx.sql(select5).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+--------+----+", "| _rowid | id |", @@ -455,6 +461,7 @@ async fn test_select_system_column() { let select6 = "SELECT _rowid, _file FROM test order by _rowid"; let df = ctx.sql(select6).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ "+--------+--------+", "| _rowid | _file |", @@ -466,11 +473,18 @@ async fn test_select_system_column() { ]; assert_batches_sorted_eq!(expected, &batchs); - let select6 = "SELECT id FROM test order by _rowid desc"; + let select6 = "SELECT id FROM test order by _rowid asc"; let df = ctx.sql(select6).await.unwrap(); let batchs = df.collect().await.unwrap(); + #[rustfmt::skip] let expected = [ - "+----+", "| id |", "+----+", "| 3 |", "| 2 |", "| 1 |", "+----+", + "+----+", + "| id |", + "+----+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----+", ]; assert_batches_sorted_eq!(expected, &batchs); @@ -495,21 +509,22 @@ async fn test_select_system_column() { let batch = record_batch!( ("other_id", UInt8, [1, 2, 3]), ("bank_account", UInt64, [9, 10, 11]), - ("_row_id", UInt32, [10, 11, 12]) // not a system column! + ("_rowid", UInt32, [10, 11, 12]) // not a system column! ) .unwrap(); let _ = ctx.register_batch("test2", batch); - // Normally _row_id would be a name conflict - // But when it's a conflict between a metadata column and a non-metadata column, the non metadata column should be used + // Normally _rowid would be a name conflict and throw an error during planning. + // But when it's a conflict between a system column and a non system column, + // the non system column should be used. let select7 = - "SELECT id, other_id, _row_id FROM test INNER JOIN test2 ON id = other_id"; + "SELECT id, other_id, _rowid FROM test INNER JOIN test2 ON id = other_id"; let df = ctx.sql(select7).await.unwrap(); let batchs = df.collect().await.unwrap(); #[rustfmt::skip] let expected = [ "+----+----------+---------+", - "| id | other_id | _row_id |", + "| id | other_id | _rowid |", "+----+----------+---------+", "| 1 | 1 | 10 |", "| 2 | 2 | 11 |", @@ -518,8 +533,85 @@ async fn test_select_system_column() { ]; assert_batches_sorted_eq!(expected, &batchs); - // Demonstrate that for other columns we get a conflict + // Sanity check: for other columns we do get a conflict let select7 = "SELECT id, other_id, bank_account FROM test INNER JOIN test2 ON id = other_id"; assert!(ctx.sql(select7).await.is_err()); + + // Demonstrate that we can join on _rowid + let batch = record_batch!( + ("other_id", UInt8, [2, 3, 4]), + ("_rowid", UInt32, [2, 3, 4]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("other_id", DataType::UInt8, true), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + ]))) + .unwrap(); + let _ = ctx.register_batch("test2", batch); + + let select8 = "SELECT id, other_id, _rowid FROM test JOIN test2 ON _rowid = _rowid"; + let df = ctx.sql(select8).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _rowid |", + "+----+----------+---------+", + "| 2 | 2 | 2 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Once passed through a projection, system columns are no longer available + let select9 = r" + WITH cte AS (SELECT * FROM test) + SELECT * FROM cte + "; + let df = ctx.sql(select9).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _rowid |", + "+----+----------+---------+", + "| 2 | 2 | 2 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select10 = r" + WITH cte AS (SELECT * FROM test) + SELECT _rowid FROM cte + "; + let df = ctx.sql(select10).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _rowid |", + "+----+----------+---------+", + "| 2 | 2 | 2 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // And if passed explicitly selected and passed through a projection + // they are no longer system columns. + let select11 = r" + WITH cte AS (SELECT id, _rowid FROM test) + SELECT * FROM cte + "; + let df = ctx.sql(select11).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+---------+", + "| id | _rowid |", + "+----+---------+", + "| 2 | 2 |", + "+----+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); } From 0c40c90cfd1c6d2b0b3a7c7224c2b49430deb460 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 30 Jan 2025 18:09:14 -0800 Subject: [PATCH 10/16] fmt --- datafusion/catalog/src/information_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 7b0329ec5fa2..d46ea37d3943 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -31,8 +31,8 @@ use async_trait::async_trait; use datafusion_common::config::{ConfigEntry, ConfigOptions}; use datafusion_common::error::Result; use datafusion_common::DataFusionError; -use datafusion_execution::TaskContext; use datafusion_common::FieldExt; +use datafusion_execution::TaskContext; use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; use datafusion_expr::{TableType, Volatility}; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; From c472aeabd36535b9523f03209897b8172870119f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 31 Jan 2025 09:34:15 -0800 Subject: [PATCH 11/16] split up tests --- datafusion/common/src/dfschema.rs | 34 +++ datafusion/core/tests/sql/mod.rs | 1 + datafusion/core/tests/sql/select.rs | 267 +------------------- datafusion/core/tests/sql/system_columns.rs | 263 +++++++++++++++++++ datafusion/expr/src/logical_plan/builder.rs | 10 +- datafusion/expr/src/utils.rs | 13 + 6 files changed, 321 insertions(+), 267 deletions(-) create mode 100644 datafusion/core/tests/sql/system_columns.rs diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index ff15ce86c428..5865281655b3 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1100,6 +1100,11 @@ pub trait FieldExt { /// /// See [`FieldExt::is_system_column`] for more information on what a system column is. fn to_system_column(self) -> Self; + + /// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata. + /// + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn to_non_system_column(self) -> Self; } /// See [`FieldExt`]. @@ -1121,6 +1126,35 @@ impl FieldExt for Field { self.set_metadata(metadata); self } + + /// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata. + /// See [`FieldExt::to_non_system_column`] for more information on what a system column is. + fn to_non_system_column(mut self) -> Self { + let mut metadata = self.metadata().clone(); + metadata.remove("datafusion.system_column"); + self.set_metadata(metadata); + self + } +} + +impl FieldExt for Arc { + /// Check if this field is a system column. + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn is_system_column(&self) -> bool { + FieldExt::is_system_column(self.as_ref()) + } + + /// Mark this field as a system column. + /// See [`FieldExt::to_system_column`] for more information on what a system column is. + fn to_system_column(self) -> Self { + Arc::new(FieldExt::to_system_column(Arc::unwrap_or_clone(self))) + } + + /// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata. + /// See [`FieldExt::to_non_system_column`] for more information on what a system column is. + fn to_non_system_column(self) -> Self { + Arc::new(FieldExt::to_non_system_column(Arc::unwrap_or_clone(self))) + } } #[cfg(test)] diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 03c4ad7c013e..1e4858bc8dd0 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -64,6 +64,7 @@ pub mod joins; mod path_partition; pub mod select; mod sql_api; +pub mod system_columns; async fn register_aggregate_csv_by_sql(ctx: &SessionContext) { let testdata = test_util::arrow_test_data(); diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 50f90f92b476..6e81bf6410c1 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -16,7 +16,7 @@ // under the License. use super::*; -use datafusion_common::{FieldExt, ScalarValue}; +use datafusion_common::ScalarValue; #[tokio::test] async fn test_list_query_parameters() -> Result<()> { @@ -350,268 +350,3 @@ async fn test_version_function() { assert_eq!(version.value(0), expected_version); } - -#[tokio::test] -async fn test_select_system_column() { - let batch = record_batch!( - ("id", UInt8, [1, 2, 3]), - ("bank_account", UInt64, [9000, 100, 1000]), - ("_rowid", UInt32, [0, 1, 2]), - ("_file", Utf8, ["file-0", "file-1", "file-2"]) - ) - .unwrap(); - let batch = batch - .with_schema(Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt8, true), - Field::new("bank_account", DataType::UInt64, true), - Field::new("_rowid", DataType::UInt32, true).to_system_column(), - Field::new("_file", DataType::Utf8, true).to_system_column(), - ]))) - .unwrap(); - - let ctx = SessionContext::new_with_config( - SessionConfig::new().with_information_schema(true), - ); - let _ = ctx.register_batch("test", batch); - - let select0 = "SELECT * FROM test order by id"; - let df = ctx.sql(select0).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+--------------+", - "| id | bank_account |", - "+----+--------------+", - "| 1 | 9000 |", - "| 2 | 100 |", - "| 3 | 1000 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select1 = "SELECT _rowid FROM test order by _rowid"; - let df = ctx.sql(select1).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+--------+", - "| _rowid |", - "+--------+", - "| 0 |", - "| 1 |", - "| 2 |", - "+--------+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select2 = "SELECT _rowid, id FROM test order by _rowid"; - let df = ctx.sql(select2).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+--------+----+", - "| _rowid | id |", - "+--------+----+", - "| 0 | 1 |", - "| 1 | 2 |", - "| 2 | 3 |", - "+--------+----+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0"; - let df = ctx.sql(select3).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+--------+----+", - "| _rowid | id |", - "+--------+----+", - "| 0 | 1 |", - "+--------+----+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select4 = "SELECT _rowid FROM test LIMIT 1"; - let df = ctx.sql(select4).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+--------+", - "| _rowid |", - "+--------+", - "| 0 |", - "+--------+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; - let df = ctx.sql(select5).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+--------+----+", - "| _rowid | id |", - "+--------+----+", - "| 1 | 2 |", - "+--------+----+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select6 = "SELECT _rowid, _file FROM test order by _rowid"; - let df = ctx.sql(select6).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+--------+--------+", - "| _rowid | _file |", - "+--------+--------+", - "| 0 | file-0 |", - "| 1 | file-1 |", - "| 2 | file-2 |", - "+--------+--------+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let select6 = "SELECT id FROM test order by _rowid asc"; - let df = ctx.sql(select6).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+", - "| id |", - "+----+", - "| 1 |", - "| 2 |", - "| 3 |", - "+----+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let show_columns = "show columns from test;"; - let df_columns = ctx.sql(show_columns).await.unwrap(); - let batchs = df_columns - .select(vec![col("column_name"), col("data_type")]) - .unwrap() - .collect() - .await - .unwrap(); - let expected = [ - "+--------------+-----------+", - "| column_name | data_type |", - "+--------------+-----------+", - "| id | UInt8 |", - "| bank_account | UInt64 |", - "+--------------+-----------+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - let batch = record_batch!( - ("other_id", UInt8, [1, 2, 3]), - ("bank_account", UInt64, [9, 10, 11]), - ("_rowid", UInt32, [10, 11, 12]) // not a system column! - ) - .unwrap(); - let _ = ctx.register_batch("test2", batch); - - // Normally _rowid would be a name conflict and throw an error during planning. - // But when it's a conflict between a system column and a non system column, - // the non system column should be used. - let select7 = - "SELECT id, other_id, _rowid FROM test INNER JOIN test2 ON id = other_id"; - let df = ctx.sql(select7).await.unwrap(); - let batchs = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+----------+---------+", - "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 1 | 1 | 10 |", - "| 2 | 2 | 11 |", - "| 3 | 3 | 12 |", - "+----+----------+---------+", - ]; - assert_batches_sorted_eq!(expected, &batchs); - - // Sanity check: for other columns we do get a conflict - let select7 = - "SELECT id, other_id, bank_account FROM test INNER JOIN test2 ON id = other_id"; - assert!(ctx.sql(select7).await.is_err()); - - // Demonstrate that we can join on _rowid - let batch = record_batch!( - ("other_id", UInt8, [2, 3, 4]), - ("_rowid", UInt32, [2, 3, 4]) - ) - .unwrap(); - let batch = batch - .with_schema(Arc::new(Schema::new(vec![ - Field::new("other_id", DataType::UInt8, true), - Field::new("_rowid", DataType::UInt32, true).to_system_column(), - ]))) - .unwrap(); - let _ = ctx.register_batch("test2", batch); - - let select8 = "SELECT id, other_id, _rowid FROM test JOIN test2 ON _rowid = _rowid"; - let df = ctx.sql(select8).await.unwrap(); - let batches = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+----------+---------+", - "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 2 | 2 | 2 |", - "+----+----------+---------+", - ]; - assert_batches_sorted_eq!(expected, &batches); - - // Once passed through a projection, system columns are no longer available - let select9 = r" - WITH cte AS (SELECT * FROM test) - SELECT * FROM cte - "; - let df = ctx.sql(select9).await.unwrap(); - let batches = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+----------+---------+", - "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 2 | 2 | 2 |", - "+----+----------+---------+", - ]; - assert_batches_sorted_eq!(expected, &batches); - let select10 = r" - WITH cte AS (SELECT * FROM test) - SELECT _rowid FROM cte - "; - let df = ctx.sql(select10).await.unwrap(); - let batches = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+----------+---------+", - "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 2 | 2 | 2 |", - "+----+----------+---------+", - ]; - assert_batches_sorted_eq!(expected, &batches); - - // And if passed explicitly selected and passed through a projection - // they are no longer system columns. - let select11 = r" - WITH cte AS (SELECT id, _rowid FROM test) - SELECT * FROM cte - "; - let df = ctx.sql(select11).await.unwrap(); - let batches = df.collect().await.unwrap(); - #[rustfmt::skip] - let expected = [ - "+----+---------+", - "| id | _rowid |", - "+----+---------+", - "| 2 | 2 |", - "+----+---------+", - ]; - assert_batches_sorted_eq!(expected, &batches); -} diff --git a/datafusion/core/tests/sql/system_columns.rs b/datafusion/core/tests/sql/system_columns.rs new file mode 100644 index 000000000000..bda62ae5fc87 --- /dev/null +++ b/datafusion/core/tests/sql/system_columns.rs @@ -0,0 +1,263 @@ +use super::*; +use datafusion_common::FieldExt; + +#[tokio::test] +async fn test_basic_select() { + let ctx = setup_test_context().await; + + let select = "SELECT * FROM test order by id"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 100 |", + "| 3 | 1000 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_select() { + let ctx = setup_test_context().await; + + // Basic _rowid select + let select = "SELECT _rowid FROM test order by _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // _rowid with regular column + let select = "SELECT _rowid, id FROM test order by _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_filtering() { + let ctx = setup_test_context().await; + + // Filter by exact _rowid + let select = "SELECT _rowid, id FROM test WHERE _rowid = 0"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Filter by _rowid with and operator + let select = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 1 | 2 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Filter without selecting + let select = "SELECT id FROM test WHERE _rowid = 0"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 1 |", + "+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_ordering() { + let ctx = setup_test_context().await; + + let select = "SELECT id FROM test order by _rowid asc"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_joins() { + let ctx = setup_test_context().await; + setup_test2_for_joins(&ctx).await; + + // Join with non-system _rowid column and select _row_id + // Normally this would result in an AmbiguousReference error because both tables have a _rowid column + // But when this conflict is between a system column and a regular column, the regular column is chosen + // to resolve the conflict without error. + let select = + "SELECT id, other_id, _rowid FROM test INNER JOIN test2 ON id = other_id"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _rowid |", + "+----+----------+---------+", + "| 1 | 1 | 10 |", + "| 2 | 2 | 11 |", + "| 3 | 3 | 12 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + // Same in this case, `test._rowid` is discarded because it is a system column + let select = + "SELECT test.*, test2._row_id FROM test INNER JOIN test2 ON id = other_id"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _rowid |", + "+----+----------+---------+", + "| 1 | 1 | 10 |", + "| 2 | 2 | 11 |", + "| 3 | 3 | 12 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Join on system _rowid columns + setup_test2_with_system_rowid(&ctx).await; + let select = "SELECT id, other_id, _rowid FROM test JOIN test2 ON _rowid = _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+---------+", + "| id | other_id | _rowid |", + "+----+----------+---------+", + "| 2 | 2 | 2 |", + "+----+----------+---------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_with_cte() { + let ctx = setup_test_context().await; + + // System columns not available after CTE + let select = r" + WITH cte AS (SELECT * FROM test) + SELECT _rowid FROM cte + "; + assert!(ctx.sql(select).await.is_err()); + + // Explicitly selected system columns become regular columns + let select = r" + WITH cte AS (SELECT id, _rowid FROM test) + SELECT * FROM cte + "; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------+", + "| id | _rowid |", + "+----+--------+", + "| 1 | 0 |", + "| 2 | 1 |", + "| 3 | 2 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +async fn setup_test_context() -> SessionContext { + let batch = record_batch!( + ("id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9000, 100, 1000]), + ("_rowid", UInt32, [0, 1, 2]), + ("_file", Utf8, ["file-0", "file-1", "file-2"]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, true), + Field::new("bank_account", DataType::UInt64, true), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + Field::new("_file", DataType::Utf8, true).to_system_column(), + ]))) + .unwrap(); + + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let _ = ctx.register_batch("test", batch); + ctx +} + +async fn setup_test2_for_joins(ctx: &SessionContext) { + let batch = record_batch!( + ("other_id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9, 10, 11]), + ("_rowid", UInt32, [10, 11, 12]) + ) + .unwrap(); + let _ = ctx.register_batch("test2", batch); +} + +async fn setup_test2_with_system_rowid(ctx: &SessionContext) { + let batch = record_batch!( + ("other_id", UInt8, [2, 3, 4]), + ("_rowid", UInt32, [2, 3, 4]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("other_id", DataType::UInt8, true), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + ]))) + .unwrap(); + let _ = ctx.register_batch("test2", batch); +} diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index b0c28e145525..5740cd112e25 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,7 +54,8 @@ use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, - Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, + FieldExt, FunctionalDependencies, Result, Result, ScalarValue, ScalarValue, + TableReference, TableReference, ToDFSchema, ToDFSchema, UnnestOptions, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -1443,6 +1444,13 @@ pub fn build_join_schema( .into_iter() .chain(right.metadata().clone()) .collect(); + let qualified_fields = match join_type { + JoinType::LeftMark => qualified_fields + .into_iter() + .map(|(q, f)| (q, f.to_non_system_column())) + .collect(), + _ => qualified_fields, + }; let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; dfschema.with_functional_dependencies(func_dependencies) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 907f77105fd2..2f4cb83c6b2e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -830,6 +830,19 @@ pub fn exprlist_to_fields<'a>( .collect::>>()? .into_iter() .flatten() + .collect::>(); + // // Deduplicate system columns and non system columns by preferring non system columns + // let mut non_system_columns = HashSet::new(); + // for (_, f) in result.iter() { + // if f.is_system_column() { + // non_system_columns.insert(f.name().to_string()); + // } + // } + let result = result + .into_iter() + // .filter(|(_, f)| !(f.is_system_column() && non_system_columns.contains(f.name()))) + // And any system columns that are included in the result cease to be system columns + .map(|(q, f)| (q, f.to_non_system_column())) .collect(); Ok(result) } From 7eff84e800e97ae9a94d8f63e9ac942beed8dbf6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 31 Jan 2025 12:55:33 -0800 Subject: [PATCH 12/16] better tests, handle joins --- datafusion/common/src/dfschema.rs | 23 ++ datafusion/core/tests/sql/system_columns.rs | 230 +++++++++++++++----- datafusion/expr/src/logical_plan/builder.rs | 9 +- 3 files changed, 200 insertions(+), 62 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 5865281655b3..4d2a19ab76a8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -494,6 +494,29 @@ impl DFSchema { // Project a.id as id TableScan b id // In this case, there isn't `ambiguous name` problem. When `matches` just contains // one field without qualifier, we should return it. + // Another scenario where we can disambiguate is when we have a conflict between + // a system column and a non system column. + // In this case we return the non system column. + let mut non_system_columns = HashSet::new(); + for (_, f) in matches.iter() { + if !f.is_system_column() { + non_system_columns.insert(f.name().to_string()); + } + } + let matches_filtered = matches + .iter() + .filter_map(|(q, f)| { + if f.is_system_column() && non_system_columns.contains(f.name()) { + None + } else { + Some((q, f)) + } + }) + .collect::>(); + if matches_filtered.len() == 1 { + let (q, f) = matches_filtered[0]; + return Ok((*q, *f)); + } let fields_without_qualifier = matches .iter() .filter(|(q, _)| q.is_none()) diff --git a/datafusion/core/tests/sql/system_columns.rs b/datafusion/core/tests/sql/system_columns.rs index bda62ae5fc87..413cbdeb83f5 100644 --- a/datafusion/core/tests/sql/system_columns.rs +++ b/datafusion/core/tests/sql/system_columns.rs @@ -2,9 +2,10 @@ use super::*; use datafusion_common::FieldExt; #[tokio::test] -async fn test_basic_select() { +async fn test_system_column_select() { let ctx = setup_test_context().await; + // System columns are not included in wildcard select let select = "SELECT * FROM test order by id"; let df = ctx.sql(select).await.unwrap(); let batches = df.collect().await.unwrap(); @@ -19,13 +20,8 @@ async fn test_basic_select() { "+----+--------------+", ]; assert_batches_sorted_eq!(expected, &batches); -} - -#[tokio::test] -async fn test_system_column_select() { - let ctx = setup_test_context().await; - // Basic _rowid select + // But they are if explicitly selected let select = "SELECT _rowid FROM test order by _rowid"; let df = ctx.sql(select).await.unwrap(); let batches = df.collect().await.unwrap(); @@ -41,7 +37,7 @@ async fn test_system_column_select() { ]; assert_batches_sorted_eq!(expected, &batches); - // _rowid with regular column + // They can be selected alongside regular columns let select = "SELECT _rowid, id FROM test order by _rowid"; let df = ctx.sql(select).await.unwrap(); let batches = df.collect().await.unwrap(); @@ -56,6 +52,22 @@ async fn test_system_column_select() { "+--------+----+", ]; assert_batches_sorted_eq!(expected, &batches); + + // As well as wildcard select + let select = "SELECT *, _rowid FROM test order by _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------------+--------+", + "| id | bank_account | _rowid |", + "+----+--------------+--------+", + "| 1 | 9000 | 0 |", + "| 2 | 100 | 1 |", + "| 3 | 1000 | 2 |", + "+----+--------------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); } #[tokio::test] @@ -128,7 +140,102 @@ async fn test_system_column_ordering() { #[tokio::test] async fn test_system_column_joins() { let ctx = setup_test_context().await; - setup_test2_for_joins(&ctx).await; + + let batch = record_batch!( + ("other_id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9, 10, 11]), + ("_rowid", UInt32, [0, 1, 3]) + ) + .unwrap(); + let _ = ctx.register_batch("test2", batch); + + let batch = record_batch!( + ("other_id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9, 10, 11]), + ("_rowid", UInt32, [0, 1, 3]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("other_id", DataType::UInt8, true), + Field::new("bank_account", DataType::UInt64, true), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + ]))) + .unwrap(); + let _ = ctx.register_batch("test2sys", batch); + + let select = "SELECT id, other_id FROM test INNER JOIN test2 USING (_rowid)"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select = "SELECT id, other_id, _rowid FROM test INNER JOIN test2 USING (_rowid)"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 1 | 1 | 0 |", + "| 2 | 2 | 1 |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select = + "SELECT id, other_id FROM test LEFT JOIN test2 ON test._rowid = test2._rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select = + "SELECT id, other_id, _rowid FROM test LEFT JOIN test2 ON test._rowid = test2._rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 1 | 1 | 0 |", + "| 2 | 2 | 1 |", + "| 3 | | |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + let select = + "SELECT id, other_id FROM test JOIN test2 ON test._rowid = test2._rowid % 2"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 2 | 3 |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); // Join with non-system _rowid column and select _row_id // Normally this would result in an AmbiguousReference error because both tables have a _rowid column @@ -140,46 +247,55 @@ async fn test_system_column_joins() { let batches = df.collect().await.unwrap(); #[rustfmt::skip] let expected = [ - "+----+----------+---------+", + "+----+----------+--------+", "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 1 | 1 | 10 |", - "| 2 | 2 | 11 |", - "| 3 | 3 | 12 |", - "+----+----------+---------+", + "+----+----------+--------+", + "| 1 | 1 | 0 |", + "| 2 | 2 | 1 |", + "| 3 | 3 | 3 |", + "+----+----------+--------+", ]; assert_batches_sorted_eq!(expected, &batches); + // but if it's a conflict between two system columns we do get an error + let select = + "SELECT id, other_id, _rowid FROM test INNER JOIN test2sys ON id = other_id"; + assert!(ctx.sql(select).await.is_err()); // Same in this case, `test._rowid` is discarded because it is a system column let select = - "SELECT test.*, test2._row_id FROM test INNER JOIN test2 ON id = other_id"; + "SELECT test.*, test2._rowid FROM test INNER JOIN test2 ON id = other_id"; let df = ctx.sql(select).await.unwrap(); let batches = df.collect().await.unwrap(); #[rustfmt::skip] let expected = [ - "+----+----------+---------+", - "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 1 | 1 | 10 |", - "| 2 | 2 | 11 |", - "| 3 | 3 | 12 |", - "+----+----------+---------+", + "+----+--------------+--------+", + "| id | bank_account | _rowid |", + "+----+--------------+--------+", + "| 1 | 9000 | 0 |", + "| 2 | 100 | 1 |", + "| 3 | 1000 | 3 |", + "+----+--------------+--------+", ]; assert_batches_sorted_eq!(expected, &batches); // Join on system _rowid columns - setup_test2_with_system_rowid(&ctx).await; - let select = "SELECT id, other_id, _rowid FROM test JOIN test2 ON _rowid = _rowid"; + let select = + "SELECT id, other_id FROM test JOIN test2sys ON test._rowid = test2sys._rowid"; let df = ctx.sql(select).await.unwrap(); let batches = df.collect().await.unwrap(); #[rustfmt::skip] let expected = [ - "+----+----------+---------+", - "| id | other_id | _rowid |", - "+----+----------+---------+", - "| 2 | 2 | 2 |", - "+----+----------+---------+", + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "+----+----------+", ]; assert_batches_sorted_eq!(expected, &batches); + + // there should be an ambiguity error since two system columns are joined with the same name + let select = "SELECT id, other_id FROM test JOIN test2sys ON _rowid = _rowid"; + assert!(ctx.sql(select).await.is_err()); } #[tokio::test] @@ -213,6 +329,35 @@ async fn test_system_column_with_cte() { assert_batches_sorted_eq!(expected, &batches); } +#[tokio::test] +async fn test_system_column_in_subquery() { + let ctx = setup_test_context().await; + + // System columns not available in subquery + let select = r" + SELECT _rowid FROM (SELECT * FROM test) + "; + assert!(ctx.sql(select).await.is_err()); + + // Explicitly selected system columns become regular columns + let select = r" + SELECT * FROM (SELECT id, _rowid FROM test) + "; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------+", + "| id | _rowid |", + "+----+--------+", + "| 1 | 0 |", + "| 2 | 1 |", + "| 3 | 2 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + async fn setup_test_context() -> SessionContext { let batch = record_batch!( ("id", UInt8, [1, 2, 3]), @@ -236,28 +381,3 @@ async fn setup_test_context() -> SessionContext { let _ = ctx.register_batch("test", batch); ctx } - -async fn setup_test2_for_joins(ctx: &SessionContext) { - let batch = record_batch!( - ("other_id", UInt8, [1, 2, 3]), - ("bank_account", UInt64, [9, 10, 11]), - ("_rowid", UInt32, [10, 11, 12]) - ) - .unwrap(); - let _ = ctx.register_batch("test2", batch); -} - -async fn setup_test2_with_system_rowid(ctx: &SessionContext) { - let batch = record_batch!( - ("other_id", UInt8, [2, 3, 4]), - ("_rowid", UInt32, [2, 3, 4]) - ) - .unwrap(); - let batch = batch - .with_schema(Arc::new(Schema::new(vec![ - Field::new("other_id", DataType::UInt8, true), - Field::new("_rowid", DataType::UInt32, true).to_system_column(), - ]))) - .unwrap(); - let _ = ctx.register_batch("test2", batch); -} diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 5740cd112e25..79cca210582e 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -56,6 +56,8 @@ use datafusion_common::{ plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, FieldExt, FunctionalDependencies, Result, Result, ScalarValue, ScalarValue, TableReference, TableReference, ToDFSchema, ToDFSchema, UnnestOptions, UnnestOptions, + FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, + UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -1444,13 +1446,6 @@ pub fn build_join_schema( .into_iter() .chain(right.metadata().clone()) .collect(); - let qualified_fields = match join_type { - JoinType::LeftMark => qualified_fields - .into_iter() - .map(|(q, f)| (q, f.to_non_system_column())) - .collect(), - _ => qualified_fields, - }; let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; dfschema.with_functional_dependencies(func_dependencies) } From 193532d59fa9103a3c318c24418d3105544ef77c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 31 Jan 2025 12:58:42 -0800 Subject: [PATCH 13/16] add header --- datafusion/core/tests/sql/system_columns.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/core/tests/sql/system_columns.rs b/datafusion/core/tests/sql/system_columns.rs index 413cbdeb83f5..13002745b5e5 100644 --- a/datafusion/core/tests/sql/system_columns.rs +++ b/datafusion/core/tests/sql/system_columns.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use super::*; use datafusion_common::FieldExt; From 366398fe3d52b1394e486ed4891c9a1fd181ff85 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:01:15 -0800 Subject: [PATCH 14/16] fix rebase --- datafusion/expr/src/logical_plan/builder.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 79cca210582e..b0c28e145525 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,10 +54,7 @@ use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, - FieldExt, FunctionalDependencies, Result, Result, ScalarValue, ScalarValue, - TableReference, TableReference, ToDFSchema, ToDFSchema, UnnestOptions, UnnestOptions, - FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, - UnnestOptions, + Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; From 2cae61d4e15c98b133f46f776fb7f239e078047a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:08:45 -0800 Subject: [PATCH 15/16] remove comment --- datafusion/expr/src/utils.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 2f4cb83c6b2e..1d96a5cf6b22 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -831,17 +831,10 @@ pub fn exprlist_to_fields<'a>( .into_iter() .flatten() .collect::>(); - // // Deduplicate system columns and non system columns by preferring non system columns - // let mut non_system_columns = HashSet::new(); - // for (_, f) in result.iter() { - // if f.is_system_column() { - // non_system_columns.insert(f.name().to_string()); - // } - // } + + // After a projection any system columns that are included in the result cease to be system columns let result = result .into_iter() - // .filter(|(_, f)| !(f.is_system_column() && non_system_columns.contains(f.name()))) - // And any system columns that are included in the result cease to be system columns .map(|(q, f)| (q, f.to_non_system_column())) .collect(); Ok(result) From af6e9725965938e68a6f401dd4ef7fe9eed0c6f1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:13:17 -0800 Subject: [PATCH 16/16] cleanup --- datafusion/expr/src/utils.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 1d96a5cf6b22..3c655527bd04 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -830,13 +830,10 @@ pub fn exprlist_to_fields<'a>( .collect::>>()? .into_iter() .flatten() - .collect::>(); - - // After a projection any system columns that are included in the result cease to be system columns - let result = result - .into_iter() + // After a projection any system columns that are included in the result cease to be system columns .map(|(q, f)| (q, f.to_non_system_column())) - .collect(); + .collect::>(); + Ok(result) }