Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,38 @@ impl DFSchema {
}
}

/// materialize all dictionay columns in this schema. Returns a new schema
/// with no dictionaries, or None if there are no dictionaries to
/// materialize.
pub fn materialize_dictionaries(&self) -> Option<Self> {
if self
.fields
.iter()
.all(|f| matches!(f.data_type(), DataType::Dictionary(_, _)))
{
return None;
}
let new_fields = self
.fields
.iter()
.map(|field| field.clone().materialize_dictionaries())
.collect::<Vec<_>>();
Some(Self {
fields: new_fields,
metadata: self.metadata.clone(),
functional_dependencies: self.functional_dependencies.clone(),
})
}

pub fn without_dictionaries(mut self) -> Self {
self.fields = self
.fields
.into_iter()
.map(|field| field.materialize_dictionaries())
.collect::<Vec<_>>();
self
}

/// Create a new schema that contains the fields from this schema followed by the fields
/// from the supplied schema. An error will be returned if there are duplicate field names.
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
Expand Down Expand Up @@ -815,6 +847,18 @@ pub struct DFField {
}

impl DFField {
pub fn materialize_dictionaries(mut self) -> DFField {
if let DataType::Dictionary(_, value_type) = self.field.data_type() {
let new_field = self
.field
.as_ref()
.clone()
.with_data_type(value_type.as_ref().clone());
self.field = Arc::new(new_field);
}
self
}

/// Creates a new `DFField`
pub fn new<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
Expand Down
12 changes: 6 additions & 6 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,16 +551,16 @@ impl LogicalPlanBuilder {
let left_plan: LogicalPlan = self.plan;
let right_plan: LogicalPlan = plan;

Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
union(left_plan, right_plan)?,
)))))
Ok(Self::from(LogicalPlan::Distinct(Distinct::new_all(
Arc::new(union(left_plan, right_plan)?),
))))
}

/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(self) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
self.plan,
)))))
Ok(Self::from(LogicalPlan::Distinct(Distinct::new_all(
Arc::new(self.plan),
))))
}

/// Project first values of the specified expression list according to the provided
Expand Down
41 changes: 32 additions & 9 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
LogicalPlan::Distinct(Distinct::All { schema, .. }) => schema,
LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
Expand Down Expand Up @@ -392,7 +392,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(Distinct::All(_))
| LogicalPlan::Distinct(Distinct::All { .. })
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
Expand Down Expand Up @@ -421,7 +421,7 @@ impl LogicalPlan {
inputs.iter().map(|arc| arc.as_ref()).collect()
}
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
Distinct::All { input, .. } | Distinct::On(DistinctOn { input, .. }),
) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
Expand Down Expand Up @@ -482,7 +482,7 @@ impl LogicalPlan {
Ok(Some(select_expr[0].clone()))
}
LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Distinct(Distinct::All(input))
| LogicalPlan::Distinct(Distinct::All { input, .. })
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
Expand Down Expand Up @@ -813,7 +813,9 @@ impl LogicalPlan {
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Distinct::All { .. } => {
Distinct::new_all(Arc::new(inputs[0].clone()))
}
Distinct::On(DistinctOn {
on_expr,
select_expr,
Expand Down Expand Up @@ -1077,7 +1079,7 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
Distinct::All { input, .. } | Distinct::On(DistinctOn { input, .. }),
) => input.max_rows(),
LogicalPlan::Values(v) => Some(v.values.len()),
LogicalPlan::Unnest(_) => None,
Expand Down Expand Up @@ -1661,7 +1663,7 @@ impl LogicalPlan {
write!(f, "{}", statement.display())
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => write!(f, "Distinct:"),
Distinct::All{..} => write!(f, "Distinct:"),
Distinct::On(DistinctOn {
on_expr,
select_expr,
Expand Down Expand Up @@ -2261,11 +2263,26 @@ pub struct Limit {
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum Distinct {
/// Plain `DISTINCT` referencing all selection expressions
All(Arc<LogicalPlan>),
All {
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
},
/// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
On(DistinctOn),
}

impl Distinct {
pub fn new_all(input: Arc<LogicalPlan>) -> Self {
// distinct plans materialize dictionaries
let schema = input
.schema()
.materialize_dictionaries()
.map(Arc::new)
.unwrap_or_else(|| input.schema().clone());
Self::All { input, schema }
}
}

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DistinctOn {
Expand Down Expand Up @@ -2300,7 +2317,8 @@ impl DistinctOn {
let schema = DFSchema::new_with_metadata(
exprlist_to_fields(&select_expr, &input)?,
input.schema().metadata().clone(),
)?;
)?
.without_dictionaries();

let mut distinct_on = DistinctOn {
on_expr,
Expand Down Expand Up @@ -2378,6 +2396,11 @@ impl Aggregate {
let grouping_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;

let mut fields = exprlist_to_fields(grouping_expr.iter(), &input)?;
// Agregates do not preserve dictionary encoding for grouping columns.
fields = fields
.into_iter()
.map(|field| field.materialize_dictionaries())
.collect::<Vec<_>>();

// Even columns that cannot be null will become nullable when used in a grouping set.
if is_grouping_set {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/eliminate_nested_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ impl OptimizerRule for EliminateNestedUnion {
schema: schema.clone(),
})))
}
LogicalPlan::Distinct(Distinct::All(plan)) => match plan.as_ref() {
LogicalPlan::Distinct(Distinct::All { input, .. }) => match input.as_ref() {
LogicalPlan::Union(Union { inputs, schema }) => {
let inputs = inputs
.iter()
.map(extract_plan_from_distinct)
.flat_map(extract_plans_from_union)
.collect::<Vec<_>>();

Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(
Ok(Some(LogicalPlan::Distinct(Distinct::new_all(Arc::new(
LogicalPlan::Union(Union {
inputs,
schema: schema.clone(),
Expand Down Expand Up @@ -94,7 +94,7 @@ fn extract_plans_from_union(plan: &Arc<LogicalPlan>) -> Vec<Arc<LogicalPlan>> {

fn extract_plan_from_distinct(plan: &Arc<LogicalPlan>) -> &Arc<LogicalPlan> {
match plan.as_ref() {
LogicalPlan::Distinct(Distinct::All(plan)) => plan,
LogicalPlan::Distinct(Distinct::All { input, .. }) => input,
_ => plan,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn optimize_projections(
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::Distinct(Distinct::All(_)) => {
| LogicalPlan::Distinct(Distinct::All { .. }) => {
// These plans require all their fields, and their children should
// be treated as final plans -- otherwise, we may have schema a
// mismatch.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Distinct(Distinct::All(input)) => {
LogicalPlan::Distinct(Distinct::All { input, .. }) => {
let group_expr = expand_wildcard(input.schema(), input, None)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
Expand Down
33 changes: 19 additions & 14 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ impl From<StreamType> for SendableRecordBatchStream {
}
}
}

/// Hash aggregate execution plan
#[derive(Debug)]
pub struct AggregateExec {
Expand All @@ -254,9 +253,6 @@ pub struct AggregateExec {
limit: Option<usize>,
/// Input plan, could be a partial aggregate or the input to the aggregate
pub input: Arc<dyn ExecutionPlan>,
/// Original aggregation schema, could be different from `schema` before dictionary group
/// keys get materialized
original_schema: SchemaRef,
/// Schema after the aggregate is applied
schema: SchemaRef,
/// Input schema before any aggregation is applied. For partial aggregate this will be the
Expand Down Expand Up @@ -299,7 +295,6 @@ impl AggregateExec {
&original_schema,
group_by.expr.len(),
));
let original_schema = Arc::new(original_schema);
AggregateExec::try_new_with_schema(
mode,
group_by,
Expand All @@ -308,7 +303,6 @@ impl AggregateExec {
input,
input_schema,
schema,
original_schema,
)
}

Expand All @@ -329,7 +323,6 @@ impl AggregateExec {
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
schema: SchemaRef,
original_schema: SchemaRef,
) -> Result<Self> {
let input_eq_properties = input.equivalence_properties();
// Get GROUP BY expressions:
Expand Down Expand Up @@ -382,7 +375,6 @@ impl AggregateExec {
aggr_expr,
filter_expr,
input,
original_schema,
schema,
input_schema,
projection_mapping,
Expand Down Expand Up @@ -414,6 +406,25 @@ impl AggregateExec {
self.group_by.output_exprs()
}

/// The schema of the grouping expressions, as fed to [`GroupValues`]
pub fn group_schema(&self) -> Result<SchemaRef> {
let input_schema = self.input.schema();
let contains_null_expr = self.group_by.contains_null();

let mut fields = vec![];
for (expr, name) in &self.group_by.expr {
fields.push(Field::new(
name,
expr.data_type(&input_schema)?,
// In cases where we have multiple grouping sets, we will use NULL expressions in
// order to align the grouping sets. So the field must be nullable even if the underlying
// schema field is not.
contains_null_expr || expr.nullable(&input_schema)?,
))
}
Ok(Arc::new(Schema::new(fields)))
}

/// Aggregate expressions
pub fn aggr_expr(&self) -> &[Arc<dyn AggregateExpr>] {
&self.aggr_expr
Expand Down Expand Up @@ -693,7 +704,6 @@ impl ExecutionPlan for AggregateExec {
children[0].clone(),
self.input_schema.clone(),
self.schema.clone(),
self.original_schema.clone(),
)?;
me.limit = self.limit;
Ok(Arc::new(me))
Expand Down Expand Up @@ -818,11 +828,6 @@ fn materialize_dict_group_keys(schema: &Schema, group_count: usize) -> Schema {
Schema::new(fields)
}

fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
let group_fields = schema.fields()[0..group_count].to_vec();
Arc::new(Schema::new(group_fields))
}

/// Determines the lexical ordering requirement for an aggregate expression.
///
/// # Parameters
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use std::vec;
use crate::aggregates::group_values::{new_group_values, GroupValues};
use crate::aggregates::order::GroupOrderingFull;
use crate::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
PhysicalGroupBy,
evaluate_group_by, evaluate_many, evaluate_optional, AggregateMode, PhysicalGroupBy,
};
use crate::common::IPCWriter;
use crate::metrics::{BaselineMetrics, RecordOutput};
Expand Down Expand Up @@ -326,7 +325,8 @@ impl GroupedHashAggregateStream {

// we need to use original schema so RowConverter in group_values below
// will do the proper coversion of dictionaries into value types
let group_schema = group_schema(&agg.original_schema, agg_group_by.expr.len());
//let group_schema = group_schema(&agg.original_schema, agg_group_by.expr.len());
let group_schema = agg.group_schema()?;
let spill_expr = group_schema
.fields
.into_iter()
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
}

fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
// all inputs should have the same schema

let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
.map(|i| {
inputs
Expand Down