diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 0ca83452bd026..775f8ec87e381 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1122,9 +1122,13 @@ dependencies = [ name = "datafusion-common" version = "31.0.0" dependencies = [ + "ahash", "arrow", "arrow-array", + "arrow-buffer", + "arrow-schema", "chrono", + "half", "num_cpus", "object_store", "parquet", diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index f2b8f1a1e4bed..b5cdec1be17b6 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -39,10 +39,14 @@ default = ["parquet"] pyarrow = ["pyo3", "arrow/pyarrow"] [dependencies] +ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.16", default-features = false, features = ["snappy"], optional = true } arrow = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } chrono = { workspace = true } +half = { version = "2.1", default-features = false } num_cpus = "1.13.0" object_store = { version = "0.7.0", default-features = false, optional = true } parquet = { workspace = true, optional = true } diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs similarity index 98% rename from datafusion/physical-expr/src/hash_utils.rs rename to datafusion/common/src/hash_utils.rs index 379e0eba52772..9198461e00bf9 100644 --- a/datafusion/physical-expr/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -17,19 +17,19 @@ //! Functionality used both on logical and physical plans +use std::sync::Arc; + use ahash::RandomState; use arrow::array::*; use arrow::datatypes::*; use arrow::row::Rows; use arrow::{downcast_dictionary_array, downcast_primitive_array}; use arrow_buffer::i256; -use datafusion_common::{ - cast::{ - as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, - }, - internal_err, DataFusionError, Result, + +use crate::cast::{ + as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, }; -use std::sync::Arc; +use crate::error::{DataFusionError, Result, _internal_err}; // Combines two hashes into one hash #[inline] @@ -51,7 +51,7 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: } } -pub(crate) trait HashValue { +pub trait HashValue { fn hash_one(&self, state: &RandomState) -> u64; } @@ -337,7 +337,7 @@ pub fn create_hashes<'a>( } _ => { // This is internal because we should have caught this before. - return internal_err!( + return _internal_err!( "Unsupported data type in hasher: {}", col.data_type() ); diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index eeb5b2681370c..71782f67046d5 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -25,6 +25,7 @@ mod error; pub mod file_options; pub mod format; mod functional_dependencies; +pub mod hash_utils; mod join_type; pub mod parsers; #[cfg(feature = "pyarrow")] diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index c92bbbb74f16a..bdc476f5b3a1c 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -24,7 +24,6 @@ use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use crate::hash_utils::HashValue; use crate::physical_expr::down_cast_any_ref; use crate::utils::expr_list_eq_any_order; use crate::PhysicalExpr; @@ -37,6 +36,7 @@ use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use arrow::util::bit_iterator::BitIndexIterator; use arrow::{downcast_dictionary_array, downcast_primitive_array}; +use datafusion_common::hash_utils::HashValue; use datafusion_common::{ cast::{as_boolean_array, as_generic_binary_array, as_string_array}, internal_err, not_impl_err, DataFusionError, Result, ScalarValue, diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e83dee2e6c804..48d5f4e1308b3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -28,7 +28,6 @@ pub mod equivalence; pub mod execution_props; pub mod expressions; pub mod functions; -pub mod hash_utils; pub mod intervals; pub mod math_expressions; mod partitioning; @@ -49,6 +48,9 @@ pub mod utils; pub mod var_provider; pub mod window; +// For backwards compatibility +pub use datafusion_common::hash_utils; + pub use aggregate::groups_accumulator::{ EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, }; diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index 746537557d463..10ff9edb8912f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -22,9 +22,9 @@ use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; use arrow_array::{Array, ArrayRef}; use arrow_schema::{DataType, SchemaRef}; +use datafusion_common::hash_utils::create_hashes; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; -use datafusion_physical_expr::hash_utils::create_hashes; use datafusion_physical_expr::EmitTo; use hashbrown::raw::RawTable; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 76adf7611d6f6..aca10893db3d0 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -375,10 +375,11 @@ pub mod windows; use crate::repartition::RepartitionExec; use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; +pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; use datafusion_execution::TaskContext; pub use datafusion_physical_expr::{ - expressions, functions, hash_utils, ordering_equivalence_properties_helper, udf, + expressions, functions, ordering_equivalence_properties_helper, udf, }; #[cfg(test)] diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 4108b4220599c..dfef0ddefa032 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -43,6 +43,8 @@ use arrow::{ datatypes::{Schema, SchemaBuilder, SchemaRef}, record_batch::RecordBatch, }; + +use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::{ evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices, get_record_batch_at_indices, get_row_at_idx, @@ -51,7 +53,6 @@ use datafusion_common::{exec_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::window_state::{PartitionBatchState, WindowAggState}; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr::hash_utils::create_hashes; use datafusion_physical_expr::window::{ PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState, };