Skip to content
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-proto = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
object_store = { workspace = true, features = ["aws", "http"] }
Expand Down
214 changes: 214 additions & 0 deletions datafusion-examples/examples/system_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::record_batch;
use datafusion::arrow::datatypes::{DataType, Field, Schema};

use datafusion::common::FieldExt;
use datafusion::{assert_batches_eq, prelude::*};

/// This example shows how to mark fields as system columns.
/// System columns are columns which meant to be semi-public stores of the internal details of the table.
/// For example, `ctid` in Postgres would be considered a metadata column
/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples.
/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)).
///
/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata
/// to `true`.
///
/// As an example of how this works in practice, if you have the following Postgres table:
///
/// ```sql
/// CREATE TABLE t (x int);
/// INSERT INTO t VALUES (1);
/// ```
///
/// And you do a `SELECT * FROM t`, you would get the following schema:
///
/// ```text
/// +---+
/// | x |
/// +---+
/// | 1 |
/// +---+
/// ```
///
/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example):
///
/// ```text
/// +-----+---+
/// | ctid| x |
/// +-----+---+
/// | 0 | 1 |
/// +-----+---+
/// ```
#[tokio::main]
async fn main() {
let batch = record_batch!(
("a", Int32, [1, 2, 3]),
("b", Utf8, ["foo", "bar", "baz"]),
("_row_num", UInt32, [1, 2, 3])
)
.unwrap();
let batch = batch
.with_schema(Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("_row_num", DataType::UInt32, true).to_system_column(),
])))
.unwrap();

let ctx = SessionContext::new();
let _ = ctx.register_batch("t", batch);

let res = ctx
.sql("SELECT a, b FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| 1 | foo |",
"| 2 | bar |",
"| 3 | baz |",
"+---+-----+",
];
assert_batches_eq!(expected, &res);

let res = ctx
.sql("SELECT _row_num FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+----------+",
"| _row_num |",
"+----------+",
"| 1 |",
"| 2 |",
"| 3 |",
"+----------+",
];
assert_batches_eq!(expected, &res);

let res = ctx
.sql("SELECT * FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
// does not include _row_num
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| 1 | foo |",
"| 2 | bar |",
"| 3 | baz |",
"+---+-----+",
];
assert_batches_eq!(expected, &res);

let res = ctx
.sql("SELECT *, _row_num FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+----------+",
"| a | b | _row_num |",
"+---+-----+----------+",
"| 1 | foo | 1 |",
"| 2 | bar | 2 |",
"| 3 | baz | 3 |",
"+---+-----+----------+",
];
assert_batches_eq!(expected, &res);

let res = ctx
.sql("SELECT t._row_num FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+----------+",
"| _row_num |",
"+----------+",
"| 1 |",
"| 2 |",
"| 3 |",
"+----------+",
];
assert_batches_eq!(expected, &res);

let res = ctx
.sql("SELECT t.* FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
// does not include _row_num
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| 1 | foo |",
"| 2 | bar |",
"| 3 | baz |",
"+---+-----+",
];
assert_batches_eq!(expected, &res);

let res = ctx
.sql("SELECT t.*, _row_num FROM t")
.await
.unwrap()
.collect()
.await
.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+----------+",
"| a | b | _row_num |",
"+---+-----+----------+",
"| 1 | foo | 1 |",
"| 2 | bar | 2 |",
"| 3 | baz | 3 |",
"+---+-----+----------+",
];
assert_batches_eq!(expected, &res);
}
4 changes: 4 additions & 0 deletions datafusion/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use async_trait::async_trait;
use datafusion_common::config::{ConfigEntry, ConfigOptions};
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
use datafusion_common::FieldExt;
use datafusion_execution::TaskContext;
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
use datafusion_expr::{TableType, Volatility};
Expand Down Expand Up @@ -190,6 +191,9 @@ impl InformationSchemaConfig {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
if field.is_system_column() {
continue;
}
builder.add_column(
&catalog_name,
&schema_name,
Expand Down
124 changes: 124 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,29 @@ impl DFSchema {
// Project a.id as id TableScan b id
// In this case, there isn't `ambiguous name` problem. When `matches` just contains
// one field without qualifier, we should return it.
// Another scenario where we can disambiguate is when we have a conflict between
// a system column and a non system column.
// In this case we return the non system column.
let mut non_system_columns = HashSet::new();
for (_, f) in matches.iter() {
if !f.is_system_column() {
non_system_columns.insert(f.name().to_string());
}
}
let matches_filtered = matches
.iter()
.filter_map(|(q, f)| {
if f.is_system_column() && non_system_columns.contains(f.name()) {
None
} else {
Some((q, f))
}
})
.collect::<Vec<_>>();
if matches_filtered.len() == 1 {
let (q, f) = matches_filtered[0];
return Ok((*q, *f));
}
let fields_without_qualifier = matches
.iter()
.filter(|(q, _)| q.is_none())
Expand Down Expand Up @@ -1056,6 +1079,107 @@ pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String
}
}

/// Extension trait to manage DataFusion specific metadata on Arrow fields.
pub trait FieldExt {
/// Check if this field is a system columns.
///
/// System columns are columns which meant to be semi-public stores of the internal details of the table.
/// For example, `ctid` in Postgres would be considered a metadata column
/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples.
/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)).
///
/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata
/// to `true`.
///
/// As an example of how this works in practice, if you have the following Postgres table:
///
/// ```sql
/// CREATE TABLE t (x int);
/// INSERT INTO t VALUES (1);
/// ```
///
/// And you do a `SELECT * FROM t`, you would get the following schema:
///
/// ```text
/// +---+
/// | x |
/// +---+
/// | 1 |
/// +---+
/// ```
///
/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example):
///
/// ```text
/// +-----+---+
/// | ctid| x |
/// +-----+---+
/// | 0 | 1 |
/// +-----+---+
/// ```
fn is_system_column(&self) -> bool;

/// Mark this field as a system column.
///
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn to_system_column(self) -> Self;

/// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata.
///
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn to_non_system_column(self) -> Self;
}

/// See [`FieldExt`].
impl FieldExt for Field {
/// Check if this field is a system column.
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn is_system_column(&self) -> bool {
self.metadata()
.get("datafusion.system_column")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it a const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻 will do next iteration

.map(|v| v.to_lowercase().starts_with("t"))
.unwrap_or(false)
}

/// Mark this field as a system column.
/// See [`FieldExt::to_system_column`] for more information on what a system column is.
fn to_system_column(mut self) -> Self {
let mut metadata = self.metadata().clone();
metadata.insert("datafusion.system_column".to_string(), "true".to_string());
self.set_metadata(metadata);
self
}

/// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata.
/// See [`FieldExt::to_non_system_column`] for more information on what a system column is.
fn to_non_system_column(mut self) -> Self {
let mut metadata = self.metadata().clone();
metadata.remove("datafusion.system_column");
self.set_metadata(metadata);
self
}
}

impl FieldExt for Arc<Field> {
/// Check if this field is a system column.
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn is_system_column(&self) -> bool {
FieldExt::is_system_column(self.as_ref())
}

/// Mark this field as a system column.
/// See [`FieldExt::to_system_column`] for more information on what a system column is.
fn to_system_column(self) -> Self {
Arc::new(FieldExt::to_system_column(Arc::unwrap_or_clone(self)))
}

/// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata.
/// See [`FieldExt::to_non_system_column`] for more information on what a system column is.
fn to_non_system_column(self) -> Self {
Arc::new(FieldExt::to_non_system_column(Arc::unwrap_or_clone(self)))
}
Comment on lines +1170 to +1180
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably make these no op / zero cost by calling is_system_column() first.

}

#[cfg(test)]
mod tests {
use crate::assert_contains;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub mod utils;
pub use arrow;
pub use column::Column;
pub use dfschema::{
qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema,
qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldExt, SchemaExt, ToDFSchema,
};
pub use error::{
field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub mod joins;
mod path_partition;
pub mod select;
mod sql_api;
pub mod system_columns;

async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
let testdata = test_util::arrow_test_data();
Expand Down
Loading