File tree Expand file tree Collapse file tree 1 file changed +21
-7
lines changed
datafusion/functions-aggregate/src Expand file tree Collapse file tree 1 file changed +21
-7
lines changed Original file line number Diff line number Diff line change @@ -242,14 +242,28 @@ impl<T: ArrowNumericType> Debug for MedianAccumulator<T> {
242242
243243impl < T : ArrowNumericType > Accumulator for MedianAccumulator < T > {
244244 fn state ( & mut self ) -> Result < Vec < ScalarValue > > {
245- let all_values = self
246- . all_values
247- . iter ( )
248- . map ( |x| ScalarValue :: new_primitive :: < T > ( Some ( * x) , & self . data_type ) )
249- . collect :: < Result < Vec < _ > > > ( ) ?;
245+ // Convert `all_values` to `ListArray` and return a single List ScalarValue
250246
251- let arr = ScalarValue :: new_list_nullable ( & all_values, & self . data_type ) ;
252- Ok ( vec ! [ ScalarValue :: List ( arr) ] )
247+ // Build offsets
248+ let offsets =
249+ OffsetBuffer :: new ( ScalarBuffer :: from ( vec ! [ 0 , self . all_values. len( ) as i32 ] ) ) ;
250+
251+ // Build inner array
252+ let values_array = PrimitiveArray :: < T > :: new (
253+ ScalarBuffer :: from ( std:: mem:: take ( & mut self . all_values ) ) ,
254+ None ,
255+ )
256+ . with_data_type ( self . data_type . clone ( ) ) ;
257+
258+ // Build the result list array
259+ let list_array = ListArray :: new (
260+ Arc :: new ( Field :: new_list_field ( self . data_type . clone ( ) , true ) ) ,
261+ offsets,
262+ Arc :: new ( values_array) ,
263+ None ,
264+ ) ;
265+
266+ Ok ( vec ! [ ScalarValue :: List ( Arc :: new( list_array) ) ] )
253267 }
254268
255269 fn update_batch ( & mut self , values : & [ ArrayRef ] ) -> Result < ( ) > {
You can’t perform that action at this time.
0 commit comments