Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions datafusion/common/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`DataTypeExt`] and [`FieldExt`] extension trait for working with DataTypes to Fields

use crate::arrow::datatypes::{DataType, Field, FieldRef};
use crate::metadata::FieldMetadata;
use std::sync::Arc;

/// DataFusion extension methods for Arrow [`DataType`]
Expand Down Expand Up @@ -62,6 +63,20 @@ impl DataTypeExt for DataType {

/// DataFusion extension methods for Arrow [`Field`] and [`FieldRef`]
pub trait FieldExt {
/// Rename the field, returning a new Field with the given name
fn renamed(self, new_name: &str) -> Self;

/// Retype the field with the given data type (this is different than
/// [`Field::with_type`] as it tries to avoid copying if the data type is the
/// same for [`FieldRefs`])
fn retyped(self, new_data_type: DataType) -> Self;

/// Add field metadata,
fn with_field_metadata(self, metadata: &FieldMetadata) -> Self;

/// Add optional field metadata,
fn with_field_metadata_opt(self, metadata: Option<&FieldMetadata>) -> Self;

/// Returns a new Field representing a List of this Field's DataType.
///
/// For example if input represents an `Int32`, the return value will
Expand Down Expand Up @@ -130,6 +145,31 @@ pub trait FieldExt {
}

impl FieldExt for Field {
fn renamed(self, new_name: &str) -> Self {
// check before allocating a new field
if self.name() == new_name {
self
} else {
self.with_name(new_name)
}
}

fn retyped(self, new_data_type: DataType) -> Self {
self.with_data_type(new_data_type)
}

fn with_field_metadata(self, metadata: &FieldMetadata) -> Self {
metadata.add_to_field(self)
}

fn with_field_metadata_opt(self, metadata: Option<&FieldMetadata>) -> Self {
if let Some(metadata) = metadata {
self.with_field_metadata(metadata)
} else {
self
}
}

fn into_list(self) -> Self {
DataType::List(Arc::new(self.into_list_item())).into_nullable_field()
}
Expand All @@ -149,6 +189,34 @@ impl FieldExt for Field {
}

impl FieldExt for Arc<Field> {
fn renamed(self, new_name: &str) -> Self {
if self.name() == new_name {
self
} else {
Arc::new(Arc::unwrap_or_clone(self).with_name(new_name))
}
}

fn retyped(self, new_data_type: DataType) -> Self {
if self.data_type() == &new_data_type {
self
} else {
Arc::new(Arc::unwrap_or_clone(self).with_data_type(new_data_type))
}
}

fn with_field_metadata(self, metadata: &FieldMetadata) -> Self {
metadata.add_to_field_ref(self)
}

fn with_field_metadata_opt(self, metadata: Option<&FieldMetadata>) -> Self {
if let Some(metadata) = metadata {
self.with_field_metadata(metadata)
} else {
self
}
}

fn into_list(self) -> Self {
DataType::List(self.into_list_item())
.into_nullable_field()
Expand Down
32 changes: 15 additions & 17 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,13 @@ impl DFSchema {

/// Returns an immutable reference of a specific `Field` instance selected using an
/// offset within the internal `fields` vector
pub fn field(&self, i: usize) -> &Field {
pub fn field(&self, i: usize) -> &FieldRef {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes in this file are the public API changes -- they return a reference to the Arc rather than what is in the Arc meaning the callsite can clone the arcs when needed

&self.inner.fields[i]
}

/// Returns an immutable reference of a specific `Field` instance selected using an
/// offset within the internal `fields` vector and its qualifier
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
(self.field_qualifiers[i].as_ref(), self.field(i))
}

Expand Down Expand Up @@ -415,7 +415,7 @@ impl DFSchema {
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<&Field> {
) -> Result<&FieldRef> {
if let Some(qualifier) = qualifier {
self.field_with_qualified_name(qualifier, name)
} else {
Expand All @@ -428,7 +428,7 @@ impl DFSchema {
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<(Option<&TableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &FieldRef)> {
if let Some(qualifier) = qualifier {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
Expand All @@ -440,10 +440,10 @@ impl DFSchema {
}

/// Find all fields having the given qualifier
pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
self.iter()
.filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
.map(|(_, f)| f.as_ref())
.map(|(_, f)| f)
.collect()
}

Expand All @@ -459,22 +459,20 @@ impl DFSchema {
}

/// Find all fields that match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
self.fields()
.iter()
.filter(|field| field.name() == name)
.map(|f| f.as_ref())
.collect()
}

/// Find all fields that match the given name and return them with their qualifier
pub fn qualified_fields_with_unqualified_name(
&self,
name: &str,
) -> Vec<(Option<&TableReference>, &Field)> {
) -> Vec<(Option<&TableReference>, &FieldRef)> {
self.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| (qualifier, field.as_ref()))
.collect()
}

Expand All @@ -499,7 +497,7 @@ impl DFSchema {
pub fn qualified_field_with_unqualified_name(
&self,
name: &str,
) -> Result<(Option<&TableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &FieldRef)> {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
Expand Down Expand Up @@ -528,7 +526,7 @@ impl DFSchema {
}

/// Find the field with the given name
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
self.qualified_field_with_unqualified_name(name)
.map(|(_, field)| field)
}
Expand All @@ -538,7 +536,7 @@ impl DFSchema {
&self,
qualifier: &TableReference,
name: &str,
) -> Result<&Field> {
) -> Result<&FieldRef> {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Expand All @@ -550,7 +548,7 @@ impl DFSchema {
pub fn qualified_field_from_column(
&self,
column: &Column,
) -> Result<(Option<&TableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &FieldRef)> {
self.qualified_field_with_name(column.relation.as_ref(), &column.name)
}

Expand Down Expand Up @@ -1221,7 +1219,7 @@ pub trait ExprSchema: std::fmt::Debug {
}

// Return the column's field
fn field_from_column(&self, col: &Column) -> Result<&Field>;
fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
Expand All @@ -1242,13 +1240,13 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
self.as_ref().data_type_and_nullable(col)
}

fn field_from_column(&self, col: &Column) -> Result<&Field> {
fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
self.as_ref().field_from_column(col)
}
}

impl ExprSchema for DFSchema {
fn field_from_column(&self, col: &Column) -> Result<&Field> {
fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
match &col.relation {
Some(r) => self.field_with_qualified_name(r, &col.name),
None => self.field_with_unqualified_name(&col.name),
Expand Down
12 changes: 11 additions & 1 deletion datafusion/common/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::{collections::BTreeMap, sync::Arc};

use arrow::datatypes::{DataType, Field};
use arrow::datatypes::{DataType, Field, FieldRef};
use hashbrown::HashMap;

use crate::{error::_plan_err, DataFusionError, ScalarValue};
Expand Down Expand Up @@ -320,6 +320,16 @@ impl FieldMetadata {

field.with_metadata(self.to_hashmap())
}

/// Updates the metadata on the FieldRef with this metadata, if it is not empty.
pub fn add_to_field_ref(&self, mut field_ref: FieldRef) -> FieldRef {
if self.inner.is_empty() {
return field_ref;
}

Arc::make_mut(&mut field_ref).set_metadata(self.to_hashmap());
field_ref
}
}

impl From<&Field> for FieldMetadata {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow_schema::FieldRef;
use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
Expand Down Expand Up @@ -2232,7 +2233,7 @@ impl DataFrame {
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
if qualifier.eq(&qualifier_rename) && field == field_rename {
(
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name),
Expand Down Expand Up @@ -2393,7 +2394,7 @@ impl DataFrame {
.schema()
.fields()
.iter()
.map(|f| f.as_ref().clone())
.map(Arc::clone)
.collect()
} else {
self.find_columns(&columns)?
Expand Down Expand Up @@ -2430,7 +2431,7 @@ impl DataFrame {
}

// Helper to find columns from names
fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
fn find_columns(&self, names: &[String]) -> Result<Vec<FieldRef>> {
let schema = self.logical_plan().schema();
names
.iter()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ impl DefaultPhysicalPlanner {
let filter_df_fields = filter_df_fields
.into_iter()
.map(|(qualifier, field)| {
(qualifier.cloned(), Arc::new(field.clone()))
(qualifier.cloned(), Arc::clone(field))
})
.collect();

Expand Down
Loading
Loading