Skip to content

Commit 6f1ca29

Browse files
authored
fix: metadata of join schema (#16221)
1 parent 7248259 commit 6f1ca29

File tree

2 files changed

+69
-4
lines changed

2 files changed

+69
-4
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,12 +1626,19 @@ pub fn build_join_schema(
16261626
join_type,
16271627
left.fields().len(),
16281628
);
1629-
let metadata = left
1629+
1630+
let (schema1, schema2) = match join_type {
1631+
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1632+
_ => (right, left),
1633+
};
1634+
1635+
let metadata = schema1
16301636
.metadata()
16311637
.clone()
16321638
.into_iter()
1633-
.chain(right.metadata().clone())
1639+
.chain(schema2.metadata().clone())
16341640
.collect();
1641+
16351642
let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
16361643
dfschema.with_functional_dependencies(func_dependencies)
16371644
}
@@ -2870,4 +2877,31 @@ mod tests {
28702877

28712878
Ok(())
28722879
}
2880+
2881+
#[test]
2882+
fn test_join_metadata() -> Result<()> {
2883+
let left_schema = DFSchema::new_with_metadata(
2884+
vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2885+
HashMap::from([("key".to_string(), "left".to_string())]),
2886+
)?;
2887+
let right_schema = DFSchema::new_with_metadata(
2888+
vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2889+
HashMap::from([("key".to_string(), "right".to_string())]),
2890+
)?;
2891+
2892+
let join_schema =
2893+
build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2894+
assert_eq!(
2895+
join_schema.metadata(),
2896+
&HashMap::from([("key".to_string(), "left".to_string())])
2897+
);
2898+
let join_schema =
2899+
build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2900+
assert_eq!(
2901+
join_schema.metadata(),
2902+
&HashMap::from([("key".to_string(), "right".to_string())])
2903+
);
2904+
2905+
Ok(())
2906+
}
28732907
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,18 @@ pub fn build_join_schema(
314314
JoinType::RightSemi | JoinType::RightAnti => right_fields().unzip(),
315315
};
316316

317-
let metadata = left
317+
let (schema1, schema2) = match join_type {
318+
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
319+
_ => (right, left),
320+
};
321+
322+
let metadata = schema1
318323
.metadata()
319324
.clone()
320325
.into_iter()
321-
.chain(right.metadata().clone())
326+
.chain(schema2.metadata().clone())
322327
.collect();
328+
323329
(fields.finish().with_metadata(metadata), column_indices)
324330
}
325331

@@ -1498,6 +1504,7 @@ pub(super) fn swap_join_projection(
14981504
#[cfg(test)]
14991505
mod tests {
15001506
use super::*;
1507+
use std::collections::HashMap;
15011508
use std::pin::Pin;
15021509

15031510
use arrow::array::Int32Array;
@@ -2495,4 +2502,28 @@ mod tests {
24952502
assert_eq!(col.name(), name);
24962503
assert_eq!(col.index(), index);
24972504
}
2505+
2506+
#[test]
2507+
fn test_join_metadata() -> Result<()> {
2508+
let left_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)])
2509+
.with_metadata(HashMap::from([("key".to_string(), "left".to_string())]));
2510+
2511+
let right_schema = Schema::new(vec![Field::new("b", DataType::Int32, false)])
2512+
.with_metadata(HashMap::from([("key".to_string(), "right".to_string())]));
2513+
2514+
let (join_schema, _) =
2515+
build_join_schema(&left_schema, &right_schema, &JoinType::Left);
2516+
assert_eq!(
2517+
join_schema.metadata(),
2518+
&HashMap::from([("key".to_string(), "left".to_string())])
2519+
);
2520+
let (join_schema, _) =
2521+
build_join_schema(&left_schema, &right_schema, &JoinType::Right);
2522+
assert_eq!(
2523+
join_schema.metadata(),
2524+
&HashMap::from([("key".to_string(), "right".to_string())])
2525+
);
2526+
2527+
Ok(())
2528+
}
24982529
}

0 commit comments

Comments
 (0)