1717
1818//! [`ScalarUDFImpl`] definitions for array_union, array_intersect and array_distinct functions.
1919
20- use crate :: make_array:: { empty_array_type, make_array_inner} ;
2120use crate :: utils:: make_scalar_function;
22- use arrow:: array:: { new_empty_array , Array , ArrayRef , GenericListArray , OffsetSizeTrait } ;
21+ use arrow:: array:: { Array , ArrayRef , GenericListArray , OffsetSizeTrait } ;
2322use arrow:: buffer:: OffsetBuffer ;
2423use arrow:: compute;
2524use arrow:: datatypes:: { DataType , Field , FieldRef } ;
2625use arrow:: row:: { RowConverter , SortField } ;
26+ use arrow_array:: { new_null_array, LargeListArray , ListArray } ;
2727use arrow_schema:: DataType :: { FixedSizeList , LargeList , List , Null } ;
2828use datafusion_common:: cast:: { as_large_list_array, as_list_array} ;
29- use datafusion_common:: { exec_err, internal_err, Result } ;
29+ use datafusion_common:: { exec_err, internal_err, plan_err , Result } ;
3030use datafusion_expr:: scalar_doc_sections:: DOC_SECTION_ARRAY ;
3131use datafusion_expr:: {
3232 ColumnarValue , Documentation , ScalarUDFImpl , Signature , Volatility ,
@@ -92,7 +92,8 @@ impl ScalarUDFImpl for ArrayUnion {
9292
9393 fn return_type ( & self , arg_types : & [ DataType ] ) -> Result < DataType > {
9494 match ( & arg_types[ 0 ] , & arg_types[ 1 ] ) {
95- ( & Null , dt) => Ok ( dt. clone ( ) ) ,
95+ ( Null , Null ) => Ok ( DataType :: new_list ( Null , true ) ) ,
96+ ( Null , dt) => Ok ( dt. clone ( ) ) ,
9697 ( dt, Null ) => Ok ( dt. clone ( ) ) ,
9798 ( dt, _) => Ok ( dt. clone ( ) ) ,
9899 }
@@ -180,9 +181,10 @@ impl ScalarUDFImpl for ArrayIntersect {
180181
181182 fn return_type ( & self , arg_types : & [ DataType ] ) -> Result < DataType > {
182183 match ( arg_types[ 0 ] . clone ( ) , arg_types[ 1 ] . clone ( ) ) {
183- ( Null , Null ) | ( Null , _) => Ok ( Null ) ,
184- ( _, Null ) => Ok ( empty_array_type ( ) ) ,
185- ( dt, _) => Ok ( dt) ,
184+ ( Null , Null ) => Ok ( DataType :: new_list ( Null , true ) ) ,
185+ ( Null , dt) => Ok ( dt. clone ( ) ) ,
186+ ( dt, Null ) => Ok ( dt. clone ( ) ) ,
187+ ( dt, _) => Ok ( dt. clone ( ) ) ,
186188 }
187189 }
188190
@@ -266,19 +268,13 @@ impl ScalarUDFImpl for ArrayDistinct {
266268
267269 fn return_type ( & self , arg_types : & [ DataType ] ) -> Result < DataType > {
268270 match & arg_types[ 0 ] {
269- List ( field) | FixedSizeList ( field, _) => Ok ( List ( Arc :: new ( Field :: new (
270- "item" ,
271- field. data_type ( ) . clone ( ) ,
272- true ,
273- ) ) ) ) ,
274- LargeList ( field) => Ok ( LargeList ( Arc :: new ( Field :: new (
275- "item" ,
276- field. data_type ( ) . clone ( ) ,
277- true ,
278- ) ) ) ) ,
279- _ => exec_err ! (
280- "Not reachable, data_type should be List, LargeList or FixedSizeList"
281- ) ,
271+ List ( field) | FixedSizeList ( field, _) => {
272+ Ok ( DataType :: new_list ( field. data_type ( ) . clone ( ) , true ) )
273+ }
274+ LargeList ( field) => {
275+ Ok ( DataType :: new_large_list ( field. data_type ( ) . clone ( ) , true ) )
276+ }
277+ arg_type => plan_err ! ( "{} does not support type {arg_type}" , self . name( ) ) ,
282278 }
283279 }
284280
@@ -329,22 +325,18 @@ fn array_distinct_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
329325 return exec_err ! ( "array_distinct needs one argument" ) ;
330326 }
331327
332- // handle null
333- if args[ 0 ] . data_type ( ) == & Null {
334- return Ok ( Arc :: clone ( & args[ 0 ] ) ) ;
335- }
336-
337- // handle for list & largelist
338- match args[ 0 ] . data_type ( ) {
328+ let array = & args[ 0 ] ;
329+ match array. data_type ( ) {
330+ Null => Ok ( Arc :: clone ( array) ) ,
339331 List ( field) => {
340- let array = as_list_array ( & args [ 0 ] ) ?;
332+ let array = as_list_array ( array ) ?;
341333 general_array_distinct ( array, field)
342334 }
343335 LargeList ( field) => {
344- let array = as_large_list_array ( & args [ 0 ] ) ?;
336+ let array = as_large_list_array ( array ) ?;
345337 general_array_distinct ( array, field)
346338 }
347- array_type => exec_err ! ( "array_distinct does not support type '{array_type :?}'" ) ,
339+ arg_type => exec_err ! ( "array_distinct does not support type '{arg_type :?}'" ) ,
348340 }
349341}
350342
@@ -369,80 +361,69 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
369361 field : Arc < Field > ,
370362 set_op : SetOp ,
371363) -> Result < ArrayRef > {
372- if matches ! ( l. value_type( ) , Null ) {
364+ if l . is_empty ( ) || l. value_type ( ) . is_null ( ) {
373365 let field = Arc :: new ( Field :: new ( "item" , r. value_type ( ) , true ) ) ;
374366 return general_array_distinct :: < OffsetSize > ( r, & field) ;
375- } else if matches ! ( r. value_type( ) , Null ) {
367+ } else if r . is_empty ( ) || r. value_type ( ) . is_null ( ) {
376368 let field = Arc :: new ( Field :: new ( "item" , l. value_type ( ) , true ) ) ;
377369 return general_array_distinct :: < OffsetSize > ( l, & field) ;
378370 }
379371
380- // Handle empty array at rhs case
381- // array_union(arr, []) -> arr;
382- // array_intersect(arr, []) -> [];
383- if r. value_length ( 0 ) . is_zero ( ) {
384- if set_op == SetOp :: Union {
385- return Ok ( Arc :: new ( l. clone ( ) ) as ArrayRef ) ;
386- } else {
387- return Ok ( Arc :: new ( r. clone ( ) ) as ArrayRef ) ;
388- }
389- }
390-
391372 if l. value_type ( ) != r. value_type ( ) {
392- return internal_err ! ( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ) ;
373+ return internal_err ! (
374+ "{set_op} is not implemented for {} and {}" ,
375+ l. data_type( ) ,
376+ r. data_type( )
377+ ) ;
393378 }
394379
395- let dt = l. value_type ( ) ;
396-
397380 let mut offsets = vec ! [ OffsetSize :: usize_as( 0 ) ] ;
398381 let mut new_arrays = vec ! [ ] ;
399-
400- let converter = RowConverter :: new ( vec ! [ SortField :: new( dt) ] ) ?;
382+ let converter = RowConverter :: new ( vec ! [ SortField :: new( l. value_type( ) ) ] ) ?;
401383 for ( first_arr, second_arr) in l. iter ( ) . zip ( r. iter ( ) ) {
402- if let ( Some ( first_arr) , Some ( second_arr) ) = ( first_arr, second_arr) {
403- let l_values = converter. convert_columns ( & [ first_arr] ) ?;
404- let r_values = converter. convert_columns ( & [ second_arr] ) ?;
405-
406- let l_iter = l_values. iter ( ) . sorted ( ) . dedup ( ) ;
407- let values_set: HashSet < _ > = l_iter. clone ( ) . collect ( ) ;
408- let mut rows = if set_op == SetOp :: Union {
409- l_iter. collect :: < Vec < _ > > ( )
410- } else {
411- vec ! [ ]
412- } ;
413- for r_val in r_values. iter ( ) . sorted ( ) . dedup ( ) {
414- match set_op {
415- SetOp :: Union => {
416- if !values_set. contains ( & r_val) {
417- rows. push ( r_val) ;
418- }
419- }
420- SetOp :: Intersect => {
421- if values_set. contains ( & r_val) {
422- rows. push ( r_val) ;
423- }
424- }
425- }
426- }
384+ let l_values = if let Some ( first_arr) = first_arr {
385+ converter. convert_columns ( & [ first_arr] ) ?
386+ } else {
387+ converter. convert_columns ( & [ ] ) ?
388+ } ;
427389
428- let last_offset = match offsets. last ( ) . copied ( ) {
429- Some ( offset) => offset,
430- None => return internal_err ! ( "offsets should not be empty" ) ,
431- } ;
432- offsets. push ( last_offset + OffsetSize :: usize_as ( rows. len ( ) ) ) ;
433- let arrays = converter. convert_rows ( rows) ?;
434- let array = match arrays. first ( ) {
435- Some ( array) => Arc :: clone ( array) ,
436- None => {
437- return internal_err ! ( "{set_op}: failed to get array from rows" ) ;
438- }
439- } ;
440- new_arrays. push ( array) ;
390+ let r_values = if let Some ( second_arr) = second_arr {
391+ converter. convert_columns ( & [ second_arr] ) ?
392+ } else {
393+ converter. convert_columns ( & [ ] ) ?
394+ } ;
395+
396+ let l_iter = l_values. iter ( ) . sorted ( ) . dedup ( ) ;
397+ let values_set: HashSet < _ > = l_iter. clone ( ) . collect ( ) ;
398+ let mut rows = if set_op == SetOp :: Union {
399+ l_iter. collect ( )
400+ } else {
401+ vec ! [ ]
402+ } ;
403+
404+ for r_val in r_values. iter ( ) . sorted ( ) . dedup ( ) {
405+ match set_op {
406+ SetOp :: Union if !values_set. contains ( & r_val) => rows. push ( r_val) ,
407+ SetOp :: Intersect if values_set. contains ( & r_val) => rows. push ( r_val) ,
408+ _ => ( ) ,
409+ }
441410 }
411+
412+ let last_offset = match offsets. last ( ) {
413+ Some ( offset) => * offset,
414+ None => return internal_err ! ( "offsets should not be empty" ) ,
415+ } ;
416+
417+ offsets. push ( last_offset + OffsetSize :: usize_as ( rows. len ( ) ) ) ;
418+ let arrays = converter. convert_rows ( rows) ?;
419+ new_arrays. push ( match arrays. first ( ) {
420+ Some ( array) => Arc :: clone ( array) ,
421+ None => return internal_err ! ( "{set_op}: failed to get array from rows" ) ,
422+ } ) ;
442423 }
443424
444425 let offsets = OffsetBuffer :: new ( offsets. into ( ) ) ;
445- let new_arrays_ref = new_arrays. iter ( ) . map ( |v| v. as_ref ( ) ) . collect :: < Vec < _ > > ( ) ;
426+ let new_arrays_ref: Vec < _ > = new_arrays. iter ( ) . map ( |v| v. as_ref ( ) ) . collect ( ) ;
446427 let values = compute:: concat ( & new_arrays_ref) ?;
447428 let arr = GenericListArray :: < OffsetSize > :: try_new ( field, offsets, values, None ) ?;
448429 Ok ( Arc :: new ( arr) )
@@ -453,38 +434,60 @@ fn general_set_op(
453434 array2 : & ArrayRef ,
454435 set_op : SetOp ,
455436) -> Result < ArrayRef > {
437+ fn empty_array ( data_type : & DataType , len : usize , large : bool ) -> Result < ArrayRef > {
438+ let field = Arc :: new ( Field :: new_list_field ( data_type. clone ( ) , true ) ) ;
439+ let values = new_null_array ( data_type, len) ;
440+ if large {
441+ Ok ( Arc :: new ( LargeListArray :: try_new (
442+ field,
443+ OffsetBuffer :: new_zeroed ( len) ,
444+ values,
445+ None ,
446+ ) ?) )
447+ } else {
448+ Ok ( Arc :: new ( ListArray :: try_new (
449+ field,
450+ OffsetBuffer :: new_zeroed ( len) ,
451+ values,
452+ None ,
453+ ) ?) )
454+ }
455+ }
456+
456457 match ( array1. data_type ( ) , array2. data_type ( ) ) {
458+ ( Null , Null ) => Ok ( Arc :: new ( ListArray :: new_null (
459+ Arc :: new ( Field :: new_list_field ( Null , true ) ) ,
460+ array1. len ( ) ,
461+ ) ) ) ,
457462 ( Null , List ( field) ) => {
458463 if set_op == SetOp :: Intersect {
459- return Ok ( new_empty_array ( & Null ) ) ;
464+ return empty_array ( field . data_type ( ) , array1 . len ( ) , false ) ;
460465 }
461466 let array = as_list_array ( & array2) ?;
462467 general_array_distinct :: < i32 > ( array, field)
463468 }
464469
465470 ( List ( field) , Null ) => {
466471 if set_op == SetOp :: Intersect {
467- return make_array_inner ( & [ ] ) ;
472+ return empty_array ( field . data_type ( ) , array1 . len ( ) , false ) ;
468473 }
469474 let array = as_list_array ( & array1) ?;
470475 general_array_distinct :: < i32 > ( array, field)
471476 }
472477 ( Null , LargeList ( field) ) => {
473478 if set_op == SetOp :: Intersect {
474- return Ok ( new_empty_array ( & Null ) ) ;
479+ return empty_array ( field . data_type ( ) , array1 . len ( ) , true ) ;
475480 }
476481 let array = as_large_list_array ( & array2) ?;
477482 general_array_distinct :: < i64 > ( array, field)
478483 }
479484 ( LargeList ( field) , Null ) => {
480485 if set_op == SetOp :: Intersect {
481- return make_array_inner ( & [ ] ) ;
486+ return empty_array ( field . data_type ( ) , array1 . len ( ) , true ) ;
482487 }
483488 let array = as_large_list_array ( & array1) ?;
484489 general_array_distinct :: < i64 > ( array, field)
485490 }
486- ( Null , Null ) => Ok ( new_empty_array ( & Null ) ) ,
487-
488491 ( List ( field) , List ( _) ) => {
489492 let array1 = as_list_array ( & array1) ?;
490493 let array2 = as_list_array ( & array2) ?;
0 commit comments