Skip to content

Commit 49a671f

Browse files
committed
revert removed test in array distinct
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
1 parent 9f75d9c commit 49a671f

2 files changed

Lines changed: 39 additions & 21 deletions

File tree

datafusion/common/src/scalar.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2055,6 +2055,43 @@ impl ScalarValue {
20552055
}
20562056
}
20572057

2058+
/// Retrieve ScalarValue for each row in `array`
2059+
///
2060+
/// Example
2061+
/// ```
2062+
/// use datafusion_common::ScalarValue;
2063+
/// use arrow::array::ListArray;
2064+
/// use arrow::datatypes::{DataType, Int32Type};
2065+
///
2066+
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2067+
/// Some(vec![Some(1), Some(2), Some(3)]),
2068+
/// None,
2069+
/// Some(vec![Some(4), Some(5)])
2070+
/// ]);
2071+
///
2072+
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
2073+
///
2074+
/// let expected = vec![
2075+
/// vec![
2076+
/// ScalarValue::Int32(Some(1)),
2077+
/// ScalarValue::Int32(Some(2)),
2078+
/// ScalarValue::Int32(Some(3)),
2079+
/// ],
2080+
/// vec![],
2081+
/// vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(5))]
2082+
/// ];
2083+
///
2084+
/// assert_eq!(scalar_vec, expected);
2085+
/// ```
2086+
pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result<Vec<Vec<Self>>> {
2087+
let data_type = array.data_type().to_owned();
2088+
2089+
match data_type {
2090+
DataType::List(_) => Self::convert_list_array_to_scalar_vec(array),
2091+
_ => Ok(vec![Self::convert_non_list_array_to_scalars(array)?]),
2092+
}
2093+
}
2094+
20582095
// TODO: Support more types after other ScalarValue is wrapped with ArrayRef
20592096
/// Get raw data (inner array) inside ScalarValue
20602097
pub fn raw_data(&self) -> Result<ArrayRef> {
@@ -3088,7 +3125,7 @@ mod tests {
30883125

30893126
let l12 = arrays_into_list_array([l1, l2]).unwrap();
30903127
let arr = Arc::new(l12) as ArrayRef;
3091-
3128+
30923129
let actual = ScalarValue::convert_list_array_to_scalar_vec(&arr).unwrap();
30933130
let expected = vec![
30943131
vec![

datafusion/physical-expr/src/aggregate/array_agg_distinct.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,10 @@ impl Accumulator for DistinctArrayAggAccumulator {
133133
Ok(vec![self.evaluate()?])
134134
}
135135

136-
// fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
137-
// assert_eq!(values.len(), 1, "batch input should only include 1 column!");
138-
139-
// let scalars = ScalarValue::convert_non_list_array_to_scalars(&values[0])?;
140-
// self.values.extend(scalars);
141-
// Ok(())
142-
// }
143136
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
144137
assert_eq!(values.len(), 1, "batch input should only include 1 column!");
145138

146139
let array = &values[0];
147-
println!("array: {:?}", array);
148140
let scalars = ScalarValue::convert_array_to_scalar_vec(array)?;
149141
for scalar in scalars {
150142
self.values.extend(scalar)
@@ -157,18 +149,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
157149
return Ok(());
158150
}
159151

160-
assert_eq!(
161-
states.len(),
162-
1,
163-
"array_agg_distinct states must contain single array"
164-
);
165-
166-
let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&states[0])?;
167-
for scalars in scalar_vec {
168-
self.values.extend(scalars)
169-
}
170-
171-
Ok(())
152+
self.update_batch(states)
172153
}
173154

174155
fn evaluate(&self) -> Result<ScalarValue> {

0 commit comments

Comments
 (0)