Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,8 @@ impl LogicalPlanBuilder {
find_valid_equijoin_key_pair(
&normalized_left_key,
&normalized_right_key,
self.plan.schema().clone(),
right.schema().clone(),
self.plan.schema(),
right.schema(),
)?.ok_or_else(||
plan_datafusion_err!(
"can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ pub fn can_hash(data_type: &DataType) -> bool {
/// Check whether all columns are from the schema.
pub fn check_all_columns_from_schema(
columns: &HashSet<Column>,
schema: DFSchemaRef,
schema: &DFSchema,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This takes the schema by reference rather than value now.

Since DFSchemaRef is an Arc I don't expect this to make any measurable performance improvement, but I think it makes it clearer what is going on and avoids a bunch of clones

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

) -> Result<bool> {
for col in columns.iter() {
let exist = schema.is_column_from_schema(col);
Expand All @@ -909,8 +909,8 @@ pub fn check_all_columns_from_schema(
pub fn find_valid_equijoin_key_pair(
left_key: &Expr,
right_key: &Expr,
left_schema: DFSchemaRef,
right_schema: DFSchemaRef,
left_schema: &DFSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats interesting, should we a reference instead of Arc whenever possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general we should use a reference when the underlying callsite doesn't actually need a owned copy

So in this case, the callsite just needs a &DFSchema so there is no need to go through the overhead of creating the Arc

I think the runtime overhead of creating another Arc is likely to be unmeasurable in all but the most extreme circumstances. However I think the code is often easier to reason about

right_schema: &DFSchema,
) -> Result<Option<(Expr, Expr)>> {
let left_using_columns = left_key.to_columns()?;
let right_using_columns = right_key.to_columns()?;
Expand All @@ -920,8 +920,8 @@ pub fn find_valid_equijoin_key_pair(
return Ok(None);
}

if check_all_columns_from_schema(&left_using_columns, left_schema.clone())?
&& check_all_columns_from_schema(&right_using_columns, right_schema.clone())?
if check_all_columns_from_schema(&left_using_columns, left_schema)?
&& check_all_columns_from_schema(&right_using_columns, right_schema)?
{
return Ok(Some((left_key.clone(), right_key.clone())));
} else if check_all_columns_from_schema(&right_using_columns, left_schema)?
Expand Down
43 changes: 29 additions & 14 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl OptimizerRule for EliminateCrossJoin {
left = find_inner_join(
&left,
&mut all_inputs,
&mut possible_join_keys,
&possible_join_keys,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find_inner_join does not modify possible_join_keys so update the signature to make that clear

&mut all_join_keys,
)?;
}
Expand Down Expand Up @@ -144,7 +144,9 @@ impl OptimizerRule for EliminateCrossJoin {
}
}

/// Recursively accumulate possible_join_keys and inputs from inner joins (including cross joins).
/// Recursively accumulate possible_join_keys and inputs from inner joins
/// (including cross joins).
///
/// Returns a boolean indicating whether the flattening was successful.
fn try_flatten_join_inputs(
plan: &LogicalPlan,
Expand All @@ -159,22 +161,19 @@ fn try_flatten_join_inputs(
return Ok(false);
}
possible_join_keys.extend(join.on.clone());
let left = &*(join.left);
let right = &*(join.right);
vec![left, right]
vec![&join.left, &join.right]
}
LogicalPlan::CrossJoin(join) => {
let left = &*(join.left);
let right = &*(join.right);
vec![left, right]
vec![&join.left, &join.right]
}
_ => {
return plan_err!("flatten_join_inputs just can call join/cross_join");
}
};

for child in children.iter() {
match *child {
let child = child.as_ref();
match child {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
Expand All @@ -184,27 +183,39 @@ fn try_flatten_join_inputs(
return Ok(false);
}
}
_ => all_inputs.push((*child).clone()),
_ => all_inputs.push(child.clone()),
}
}
Ok(true)
}

/// Finds the next to join with the left input plan,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time studying this code so wanted to document what it is doing

///
/// Finds the next `right` from `rights` that can be joined with `left_input`
/// plan based on the join keys in `possible_join_keys`.
///
/// If such a matching `right` is found:
/// 1. Adds the matching join keys to `all_join_keys`.
/// 2. Returns `left_input JOIN right ON (all join keys)`.
///
/// If no matching `right` is found:
/// 1. Removes the first plan from `rights`
/// 2. Returns `left_input CROSS JOIN right`.
fn find_inner_join(
left_input: &LogicalPlan,
rights: &mut Vec<LogicalPlan>,
possible_join_keys: &mut Vec<(Expr, Expr)>,
possible_join_keys: &[(Expr, Expr)],
all_join_keys: &mut HashSet<(Expr, Expr)>,
) -> Result<LogicalPlan> {
for (i, right_input) in rights.iter().enumerate() {
let mut join_keys = vec![];

for (l, r) in &mut *possible_join_keys {
for (l, r) in possible_join_keys.iter() {
let key_pair = find_valid_equijoin_key_pair(
l,
r,
left_input.schema().clone(),
right_input.schema().clone(),
left_input.schema(),
right_input.schema(),
)?;

// Save join keys
Expand All @@ -215,6 +226,7 @@ fn find_inner_join(
}
}

// Found one or more matching join keys
if !join_keys.is_empty() {
all_join_keys.extend(join_keys.clone());
let right_input = rights.remove(i);
Expand All @@ -236,6 +248,9 @@ fn find_inner_join(
}));
}
}

// no matching right plan had any join keys, cross join with the first right
// plan
let right = rights.remove(0);
let join_schema = Arc::new(build_join_schema(
left_input.schema(),
Expand Down
14 changes: 5 additions & 9 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use datafusion_common::{internal_err, DFSchema};
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
use std::sync::Arc;
// equijoin predicate
type EquijoinPredicate = (Expr, Expr);

Expand Down Expand Up @@ -122,8 +121,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {

fn split_eq_and_noneq_join_predicate(
filter: Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
let exprs = split_conjunction_owned(filter);

Expand All @@ -136,12 +135,8 @@ fn split_eq_and_noneq_join_predicate(
op: Operator::Eq,
ref right,
}) => {
let join_key_pair = find_valid_equijoin_key_pair(
left,
right,
left_schema.clone(),
right_schema.clone(),
)?;
let join_key_pair =
find_valid_equijoin_key_pair(left, right, left_schema, right_schema)?;

if let Some((left_expr, right_expr)) = join_key_pair {
let left_expr_type = left_expr.get_type(left_schema)?;
Expand Down Expand Up @@ -172,6 +167,7 @@ mod tests {
use datafusion_expr::{
col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
};
use std::sync::Arc;

fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_display_indent(
Expand Down