Skip to content
6 changes: 6 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub enum BuiltinScalarFunction {
ArraySlice,
/// array_to_string
ArrayToString,
/// array_union
ArrayUnion,
/// cardinality
Cardinality,
/// construct an array from columns
Expand Down Expand Up @@ -400,6 +402,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Flatten => Volatility::Immutable,
BuiltinScalarFunction::ArraySlice => Volatility::Immutable,
BuiltinScalarFunction::ArrayToString => Volatility::Immutable,
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::Cardinality => Volatility::Immutable,
BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
Expand Down Expand Up @@ -597,6 +600,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayToString => Ok(Utf8),
BuiltinScalarFunction::ArrayUnion => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::Cardinality => Ok(UInt64),
BuiltinScalarFunction::MakeArray => match input_expr_types.len() {
0 => Ok(List(Arc::new(Field::new("item", Null, true)))),
Expand Down Expand Up @@ -900,6 +904,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayToString => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()),
BuiltinScalarFunction::Cardinality => Signature::any(1, self.volatility()),
BuiltinScalarFunction::MakeArray => {
// 0 or more arguments of arbitrary type
Expand Down Expand Up @@ -1523,6 +1528,7 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
"array_join",
"list_join",
],
BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
BuiltinScalarFunction::Cardinality => &["cardinality"],
BuiltinScalarFunction::MakeArray => &["make_array", "make_list"],

Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,8 @@ scalar_expr!(
array delimiter,
"converts each element to its text representation."
);
nary_scalar_expr!(ArrayUnion, array_union, "returns an array of the elements in the union of array1 and array2 without duplicates.");

scalar_expr!(
Cardinality,
cardinality,
Expand Down
86 changes: 86 additions & 0 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use arrow::array::*;
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::datatypes::{DataType, Field, UInt64Type};
use arrow::row::{RowConverter, SortField, Row};
use arrow_buffer::NullBuffer;

use arrow_schema::FieldRef;
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_list_array, as_string_array,
};
Expand All @@ -36,6 +38,7 @@ use datafusion_common::{
};

use itertools::Itertools;
use std::collections::HashSet;

macro_rules! downcast_arg {
($ARG:expr, $ARRAY_TYPE:ident) => {{
Expand Down Expand Up @@ -1358,6 +1361,89 @@ macro_rules! to_string {
}};
}

fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
field: &FieldRef
) -> Result<GenericListArray<OffsetSize>> {
let converter =
RowConverter::new(vec![SortField::new(l.value_type().clone())])?;

let nulls = NullBuffer::union(l.nulls(), r.nulls());
let l_values = l.values().clone();
let r_values = r.values().clone();
let l_values = converter.convert_columns(&[l_values])?;
let r_values = converter.convert_columns(&[r_values])?;

// Might be worth adding an upstream OffsetBufferBuilder
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really neat implementation @edmondop.

offsets.push(OffsetSize::usize_as(0));
let mut rows = Vec::with_capacity(l_values.num_rows() + r_values.num_rows());
let mut dedup = HashSet::new();
// Needed to preserve ordering
let mut row_elements:Vec<Row<'_>> = vec![];
for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
for i in l_slice {
let left_row = l_values.row(i);
if dedup.insert(left_row) {
row_elements.push(left_row);
}
}
for i in r_slice {
let right_row=r_values.row(i);
if dedup.insert(right_row){
row_elements.push(right_row);
}
}

rows.extend(row_elements.iter());
offsets.push(OffsetSize::usize_as(rows.len()));
dedup.clear();
row_elements.clear();
}

let values = converter.convert_rows(rows)?;
let offsets = OffsetBuffer::new(offsets.into());
let result = values[0].clone();
Ok(GenericListArray::<OffsetSize>::new(
field.clone(), offsets, result, nulls,
))
}

/// Array_union SQL function
pub fn array_union(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_union needs two arguments");
}
let array1 = &args[0];
let array2 = &args[1];
match (array1.data_type(), array2.data_type()) {
(DataType::Null, _) => Ok(array2.clone()),
(_, DataType::Null) => Ok(array1.clone()),
(DataType::List(field_ref), DataType::List(_)) => {
check_datatypes("array_union", &[&array1, &array2])?;
let list1 = array1.as_list::<i32>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer datafusion::common::cast as_list_array and as_large_list_array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion::common::cast and arrow::array::cast are doing the similar thing. The difference is that common::cast return datafusion error, while arrow::array::cast return Arror error. To me, return datafusion error in datafusion project make much more senses for me. Also, mixing these two casting is quite messy, we can have arrow casting inside common::cast and call common::cast for other crate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found that arrow::array::cast does not return Result<>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I don't have strong opinion on which to use yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we directly use as_list_array here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the offset i32 branch you mean @Weijun-H ? I like the fact that the code has the same structure for LargeList and List

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think as_list_array can avoid panic because it returns the Return type.

let list2 = array2.as_list::<i32>();
let result = union_generic_lists::<i32>(list1, list2, field_ref)?;
Ok(Arc::new(result))
}
(DataType::LargeList(field_ref), DataType::LargeList(_)) => {
check_datatypes("array_union", &[&array1, &array2])?;
let list1 = array1.as_list::<i64>();
let list2 = array2.as_list::<i64>();
let result = union_generic_lists::<i64>(list1, list2, field_ref)?;
Ok(Arc::new(result))
}
_ => {
return internal_err!(
"array_union only support list with offsets of type int32 and int64"
);
}
}
}

/// Array_to_string SQL function
pub fn array_to_string(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = &args[0];
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ pub fn create_physical_fun(
BuiltinScalarFunction::MakeArray => {
Arc::new(|args| make_scalar_function(array_expressions::make_array)(args))
}

BuiltinScalarFunction::ArrayUnion => {
Arc::new(|args| make_scalar_function(array_expressions::array_union)(args))
}
// struct functions
BuiltinScalarFunction::Struct => Arc::new(struct_expressions::struct_expr),

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ enum ScalarFunction {
ArrayPopBack = 116;
StringToArray = 117;
ToTimestampNanos = 118;
ArrayUnion = 119;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::ArrayReplaceAll => Self::ArrayReplaceAll,
ScalarFunction::ArraySlice => Self::ArraySlice,
ScalarFunction::ArrayToString => Self::ArrayToString,
ScalarFunction::ArrayUnion => Self::ArrayUnion,
ScalarFunction::Cardinality => Self::Cardinality,
ScalarFunction::Array => Self::MakeArray,
ScalarFunction::NullIf => Self::NullIf,
Expand Down Expand Up @@ -1411,6 +1412,12 @@ pub fn parse_expr(
ScalarFunction::ArrayNdims => {
Ok(array_ndims(parse_expr(&args[0], registry)?))
}
ScalarFunction::ArrayUnion => Ok(array(
args.to_owned()
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<Result<Vec<_>, _>>()?,
)),
ScalarFunction::Sqrt => Ok(sqrt(parse_expr(&args[0], registry)?)),
ScalarFunction::Cbrt => Ok(cbrt(parse_expr(&args[0], registry)?)),
ScalarFunction::Sin => Ok(sin(parse_expr(&args[0], registry)?)),
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => Self::ArrayReplaceAll,
BuiltinScalarFunction::ArraySlice => Self::ArraySlice,
BuiltinScalarFunction::ArrayToString => Self::ArrayToString,
BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion,
BuiltinScalarFunction::Cardinality => Self::Cardinality,
BuiltinScalarFunction::MakeArray => Self::Array,
BuiltinScalarFunction::NullIf => Self::NullIf,
Expand Down
46 changes: 46 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,52 @@ select array_to_string(make_array(), ',')
----
(empty)


## array_union (aliases: `list_union`)

# array_union scalar function #1
query ?
select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
----
[1, 2, 3, 4, 5, 6]

# array_union scalar function #2
query ?
select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
----
[1, 2, 3, 4, 5, 6, 7, 8]

# array_union scalar function #3
query ?
select array_union([1,2,3], []);
----
[1, 2, 3]

# array_union scalar function #4
query ?
select array_union([1, 2, 3, 4], [5, 4]);
----
[1, 2, 3, 4, 5]

statement ok
CREATE TABLE arrays_with_repeating_elements_for_union
AS VALUES
([1], [2]),
([2, 3], [3]),
([3], [3, 4])
;

query ?
select array_union(column1, column2) from arrays_with_repeating_elements_for_union;
----
[1, 2]
[2, 3]
[3, 4]

statement ok
drop table arrays_with_repeating_elements_for_union;


# list_to_string scalar function #4 (function alias `array_to_string`)
query TTT
select list_to_string(['h', 'e', 'l', 'l', 'o'], ','), list_to_string([1, 2, 3, 4, 5], '-'), list_to_string([1.0, 2.0, 3.0], '|');
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ Unlike to some databases the math functions in Datafusion works the same way as
| array_replace_all(array, from, to) | Replaces all occurrences of the specified element with another specified element. `array_replace_all([1, 2, 2, 3, 2, 1, 4], 2, 5) -> [1, 5, 5, 3, 5, 1, 4]` |
| array_slice(array, index) | Returns a slice of the array. `array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6) -> [3, 4, 5, 6]` |
| array_to_string(array, delimiter) | Converts each element to its text representation. `array_to_string([1, 2, 3, 4], ',') -> 1,2,3,4` |
| array_union(array1, array2) | Returns an array of the elements in the union of array1 and array2 without duplicates. `array_union([1, 2, 3, 4], [5, 6, 3, 4]) -> [1, 2, 3, 4, 5, 6]`
| cardinality(array) | Returns the total number of elements in the array. `cardinality([[1, 2, 3], [4, 5, 6]]) -> 6` |
| make_array(value1, [value2 [, ...]]) | Returns an Arrow array using the specified input expressions. `make_array(1, 2, 3) -> [1, 2, 3]` |
| trim_array(array, n) | Deprecated |
Expand Down
41 changes: 41 additions & 0 deletions docs/source/user-guide/sql/scalar_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,47 @@ array_to_string(array, delimiter)
- list_join
- list_to_string

### `array_union`

Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.

```
array_union(array1, array2)
```

#### Arguments

- **array1**: Array expression.
Can be a constant, column, or function, and any combination of array operators.
- **array2**: Array expression.
Can be a constant, column, or function, and any combination of array operators.

#### Example

```
❯ select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
+----------------------------------------------------+
| array_union([1, 2, 3, 4], [5, 6, 3, 4]); |
+----------------------------------------------------+
| [1, 2, 3, 4, 5, 6] |
+----------------------------------------------------+
❯ select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
+----------------------------------------------------+
| array_union([1, 2, 3, 4], [5, 6, 7, 8]); |
+----------------------------------------------------+
| [1, 2, 3, 4, 5, 6] |
+----------------------------------------------------+
```


----


#### Aliases

- list_union


### `cardinality`

Returns the total number of elements in the array.
Expand Down