Skip to content
38 changes: 30 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1468,19 +1468,37 @@ impl ValuesFields {
}
}

// `name_map` tracks a mapping between a field name and the number of appearances of that field.
//
// Some field names might already come to this function with the count (number of times it appeared)
// as a sufix e.g. id:1, so there's still a chance of name collisions, for example,
// if these three fields passed to this function: "col:1", "col" and "col", the function
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
// that's why we need the `seen` set, so the fields are always unique.
//
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
let mut seen: HashSet<String> = HashSet::new();

fields
.into_iter()
.map(|field| {
let counter = name_map.entry(field.name().to_string()).or_insert(0);
*counter += 1;
if *counter > 1 {
let new_name = format!("{}:{}", field.name(), *counter - 1);
Field::new(new_name, field.data_type().clone(), field.is_nullable())
} else {
field.as_ref().clone()
Comment on lines 1479 to -1482
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the root cause of the issue: when doing joins, there is a function requalify_sides_if_needed to handle aliasing the columns so the resulting schema of a join : let in_join_schema = left.schema().join(right.schema())?; can be created . However, if we had a query like:

select
 *
from
  first_agg
  LEFT JOIN fourth_random_table ON first_agg.id = fourth_random_table.id
  LEFT JOIN second_agg  ON first_agg.id = second_agg.id
  LEFT JOIN third_agg  ON first_agg.id = third_agg.id

The first JOIN to be converted to a logical plan: LEFT JOIN third_agg ON first_agg.id = third_agg.id will work, the join schema col names will stay as they are with an alias , however on the subsequent JOINs it will fail since the consumer does the following steps for each JOIN:

  1. After handling the innermost join the resulting join schema is [left.id] [right.id]
  2. For the second join it we "carry" the previous schema, so in requalify_sides_if_needed we would have [id, left.id] [id, right.id] so we would have to alias again -> [left.id, left.id] [right.id, right.id] and because of this function we would end up having: [left.id:1 , left.id] [right.id:1 , right.id]
  3. On the outermost and final join the process would be repeated: [id, left.id:1 , left.id] [id, right.id:1 , right.id] ->
    [left.id:1, left.id:1 , left.id] [right.id:1, right.id:1 , right.id] and because of id:1 being repeated with the current change_redundant_column algorithm, the query will fail with Schema contains duplicate unqualified field name "id:1" 🟥

Moreover we can observe that if we do just two levels of joins we would get no error:

select
*
  from
first_agg
  LEFT JOIN fourth_random_table ON first_agg.id = fourth_random_table.id
  LEFT JOIN second_agg  ON first_agg.id = second_agg.id

let base_name = field.name();
let count = name_map.entry(base_name.clone()).or_insert(0);
let mut new_name = base_name.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with trying to avoid this clone, but I could not come up with anything that was reasonable

Since this function is only called when creating subqueries I think it is fine

https://github.com/search?q=repo%3Aapache%2Fdatafusion%20change_redundant_column&type=code


// Loop until we find a name that hasn't been used
while seen.contains(&new_name) {
*count += 1;
new_name = format!("{}:{}", base_name, count);
}

seen.insert(new_name.clone());

let mut modified_field =
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
modified_field.set_metadata(field.metadata().clone());
modified_field
})
.collect()
}
Expand Down Expand Up @@ -2730,10 +2748,13 @@ mod tests {
let t1_field_1 = Field::new("a", DataType::Int32, false);
let t2_field_1 = Field::new("a", DataType::Int32, false);
let t2_field_3 = Field::new("a", DataType::Int32, false);
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
let t1_field_2 = Field::new("b", DataType::Int32, false);
let t2_field_2 = Field::new("b", DataType::Int32, false);

let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3];
let field_vec = vec![
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
];
let remove_redundant = change_redundant_column(&Fields::from(field_vec));

assert_eq!(
Expand All @@ -2744,6 +2765,7 @@ mod tests {
Field::new("b", DataType::Int32, false),
Field::new("b:1", DataType::Int32, false),
Field::new("a:2", DataType::Int32, false),
Field::new("a:1:1", DataType::Int32, false),
]
);
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::PhysicalExpr;

use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, Result};
use datafusion_common::Result;

/// Stores the mapping between source expressions and target expressions for a
/// projection.
Expand Down Expand Up @@ -66,9 +66,9 @@ impl ProjectionMapping {
let idx = col.index();
let matching_input_field = input_schema.field(idx);
if col.name() != matching_input_field.name() {
return internal_err!("Input field name {} does not match with the projection expression {}",
matching_input_field.name(),col.name())
}
let fixed_col = Column::new(col.name(), idx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Ok(Transformed::yes(Arc::new(fixed_col)));
}
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Ok(Transformed::yes(Arc::new(matching_input_column)))
Expand Down
3 changes: 1 addition & 2 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1835,8 +1835,7 @@ fn requalify_sides_if_needed(
})
}) {
// These names have no connection to the original plan, but they'll make the columns
// (mostly) unique. There may be cases where this still causes duplicates, if either left
// or right side itself contains duplicate names with different qualifiers.
// (mostly) unique.
Ok((
left.alias(TableReference::bare("left"))?,
right.alias(TableReference::bare("right"))?,
Expand Down
27 changes: 27 additions & 0 deletions datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_multiple_joins() -> Result<()> {
let plan_str = test_plan_to_string("multiple_joins.json").await?;
assert_eq!(
plan_str,
"Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third\
\n Left Join: left.id = right.id\
\n SubqueryAlias: left\
\n Left Join: left.id = right.id\
\n SubqueryAlias: left\
\n Left Join: left.id = right.id\
\n SubqueryAlias: left\
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
\n Values: (Int64(1)), (Int64(2))\
\n SubqueryAlias: right\
\n Aggregate: groupBy=[[id, category]], aggr=[[]]\
\n Values: (Int64(1), Utf8(\"info\")), (Int64(2), Utf8(\"low\"))\
\n SubqueryAlias: right\
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
\n Values: (Int64(1)), (Int64(2))\
\n SubqueryAlias: right\
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
\n Values: (Int64(1)), (Int64(2))"
);
Ok(())
}

#[tokio::test]
async fn test_select_window_count() -> Result<()> {
let plan_str = test_plan_to_string("select_window_count.substrait.json").await?;
Expand Down
Loading