Skip to content

Commit 66d6554

Browse files
committed
Fix union_schema to merge metadatas for both fields and schema
1 parent 32a1d2b commit 66d6554

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

datafusion/physical-plan/src/union.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,9 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
468468
}
469469

470470
fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
471-
let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
471+
let first_schema = inputs[0].schema();
472+
473+
let fields = (0..first_schema.fields().len())
472474
.map(|i| {
473475
inputs
474476
.iter()
@@ -477,25 +479,30 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
477479
let field = input.schema().field(i).clone();
478480
let mut metadata = field.metadata().clone();
479481

480-
let other_side_metdata = inputs
481-
.get(input_idx ^ (1 << 0))
482-
.map(|other_input| {
483-
other_input.schema().field(i).metadata().clone()
484-
})
485-
.unwrap_or_default();
482+
let other_metadatas = inputs
483+
.iter()
484+
.enumerate()
485+
.filter(|(other_idx, _)| *other_idx != input_idx)
486+
.flat_map(|(_, other_input)| {
487+
other_input.schema().field(i).metadata().clone().into_iter()
488+
});
486489

487-
metadata.extend(other_side_metdata);
490+
metadata.extend(other_metadatas);
488491
field.with_metadata(metadata)
489492
})
490-
.find_or_first(|f| f.is_nullable())
493+
.find_or_first(Field::is_nullable)
494+
// We can unwrap this because if inputs was empty, this would've already panic'ed when we
495+
// indexed into inputs[0].
491496
.unwrap()
492497
})
498+
.collect::<Vec<_>>();
499+
500+
let all_metadata_merged = inputs
501+
.iter()
502+
.flat_map(|i| i.schema().metadata().clone().into_iter())
493503
.collect();
494504

495-
Arc::new(Schema::new_with_metadata(
496-
fields,
497-
inputs[0].schema().metadata().clone(),
498-
))
505+
Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))
499506
}
500507

501508
/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one

0 commit comments

Comments
 (0)