diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index defbbe737a9d..ba6b63260e06 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -242,14 +242,28 @@ impl Debug for MedianAccumulator { impl Accumulator for MedianAccumulator { fn state(&mut self) -> Result> { - let all_values = self - .all_values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(*x), &self.data_type)) - .collect::>>()?; + // Convert `all_values` to `ListArray` and return a single List ScalarValue - let arr = ScalarValue::new_list_nullable(&all_values, &self.data_type); - Ok(vec![ScalarValue::List(arr)]) + // Build offsets + let offsets = + OffsetBuffer::new(ScalarBuffer::from(vec![0, self.all_values.len() as i32])); + + // Build inner array + let values_array = PrimitiveArray::::new( + ScalarBuffer::from(std::mem::take(&mut self.all_values)), + None, + ) + .with_data_type(self.data_type.clone()); + + // Build the result list array + let list_array = ListArray::new( + Arc::new(Field::new_list_field(self.data_type.clone(), true)), + offsets, + Arc::new(values_array), + None, + ); + + Ok(vec![ScalarValue::List(Arc::new(list_array))]) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {