Skip to content

Commit a98b6a0

Browse files
committed
Support timestamp types for min/max
1 parent d96dfa2 commit a98b6a0

File tree

1 file changed

+71
-31
lines changed

1 file changed

+71
-31
lines changed

datafusion/physical-expr/src/aggregate/min_max.rs

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323

2424
use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr};
2525
use arrow::compute;
26-
use arrow::datatypes::{DataType, TimeUnit};
26+
use arrow::datatypes::{DataType, TimeUnit, Date32Type, Date64Type, Time32SecondType, Time32MillisecondType, Time64NanosecondType, Time64MicrosecondType, TimestampSecondType, TimestampMillisecondType, TimestampMicrosecondType, TimestampNanosecondType};
2727
use arrow::{
2828
array::{
2929
ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
@@ -144,7 +144,14 @@ impl AggregateExpr for Max {
144144
}
145145

146146
fn groups_accumulator_supported(&self) -> bool {
147-
self.data_type.is_primitive()
147+
use DataType::*;
148+
matches!(self.data_type,
149+
Int8 | Int16 | Int32 | Int64 |
150+
UInt8 | UInt16 | UInt32 | UInt64 |
151+
Float32 | Float64 | Decimal128(_,_)|
152+
Date32 | Date64 | Time32(_) | Time64(_)
153+
|Timestamp(_,_)
154+
)
148155
}
149156

150157
fn create_row_accumulator(
@@ -158,32 +165,46 @@ impl AggregateExpr for Max {
158165
}
159166

160167
fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
168+
use DataType::*;
169+
use TimeUnit::*;
170+
161171
match self.data_type {
162-
DataType::Int8 => instantiate_min_max_accumulator!(self, Int8Type, false),
163-
DataType::Int16 => instantiate_min_max_accumulator!(self, Int16Type, false),
164-
DataType::Int32 => instantiate_min_max_accumulator!(self, Int32Type, false),
165-
DataType::Int64 => instantiate_min_max_accumulator!(self, Int64Type, false),
166-
DataType::UInt8 => instantiate_min_max_accumulator!(self, UInt8Type, false),
167-
DataType::UInt16 => instantiate_min_max_accumulator!(self, UInt16Type, false),
168-
DataType::UInt32 => instantiate_min_max_accumulator!(self, UInt32Type, false),
169-
DataType::UInt64 => instantiate_min_max_accumulator!(self, UInt64Type, false),
170-
DataType::Float32 => {
172+
Int8 => instantiate_min_max_accumulator!(self, Int8Type, false),
173+
Int16 => instantiate_min_max_accumulator!(self, Int16Type, false),
174+
Int32 => instantiate_min_max_accumulator!(self, Int32Type, false),
175+
Int64 => instantiate_min_max_accumulator!(self, Int64Type, false),
176+
UInt8 => instantiate_min_max_accumulator!(self, UInt8Type, false),
177+
UInt16 => instantiate_min_max_accumulator!(self, UInt16Type, false),
178+
UInt32 => instantiate_min_max_accumulator!(self, UInt32Type, false),
179+
UInt64 => instantiate_min_max_accumulator!(self, UInt64Type, false),
180+
Float32 => {
171181
instantiate_min_max_accumulator!(self, Float32Type, false)
172182
}
173-
DataType::Float64 => {
183+
Float64 => {
174184
instantiate_min_max_accumulator!(self, Float64Type, false)
175185
}
186+
Date32 => instantiate_min_max_accumulator!(self, Date32Type, false),
187+
Date64 => instantiate_min_max_accumulator!(self, Date64Type, false),
188+
Time32(Second) => instantiate_min_max_accumulator!(self, Time32SecondType, false),
189+
Time32(Millisecond) => instantiate_min_max_accumulator!(self, Time32MillisecondType, false),
190+
Time64(Microsecond) => instantiate_min_max_accumulator!(self, Time64MicrosecondType, false),
191+
Time64(Nanosecond) => instantiate_min_max_accumulator!(self, Time64NanosecondType, false),
192+
Timestamp(Second, _) => instantiate_min_max_accumulator!(self, TimestampSecondType, false),
193+
Timestamp(Millisecond, _) => instantiate_min_max_accumulator!(self, TimestampMillisecondType, false),
194+
Timestamp(Microsecond, _) => instantiate_min_max_accumulator!(self, TimestampMicrosecondType, false),
195+
Timestamp(Nanosecond, _) => instantiate_min_max_accumulator!(self, TimestampNanosecondType, false),
176196

177197
// It would be nice to have a fast implementation for Strings as well
178198
// https://github.com/apache/arrow-datafusion/issues/6906
179-
DataType::Decimal128(_, _) => {
199+
Decimal128(_, _) => {
180200
Ok(Box::new(MinMaxGroupsPrimitiveAccumulator::<
181201
Decimal128Type,
182202
false,
183203
>::new(&self.data_type)))
184204
}
185-
_ => Err(DataFusionError::NotImplemented(format!(
186-
"MinMaxGroupsPrimitiveAccumulator not supported for {}",
205+
// This is only reached if groups_accumulator_supported is out of sync
206+
_ => Err(DataFusionError::Internal(format!(
207+
"MinMaxGroupsPrimitiveAccumulator not supported for max({})",
187208
self.data_type
188209
))),
189210
}
@@ -890,34 +911,53 @@ impl AggregateExpr for Min {
890911
}
891912

892913
fn groups_accumulator_supported(&self) -> bool {
893-
self.data_type.is_primitive()
914+
use DataType::*;
915+
matches!(self.data_type,
916+
Int8 | Int16 | Int32 | Int64 |
917+
UInt8 | UInt16 | UInt32 | UInt64 |
918+
Float32 | Float64 | Decimal128(_,_)|
919+
Date32 | Date64 | Time32(_) | Time64(_)
920+
|Timestamp(_,_)
921+
)
894922
}
895923

896924
fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
925+
use DataType::*;
926+
use TimeUnit::*;
897927
match self.data_type {
898-
DataType::Int8 => instantiate_min_max_accumulator!(self, Int8Type, true),
899-
DataType::Int16 => instantiate_min_max_accumulator!(self, Int16Type, true),
900-
DataType::Int32 => instantiate_min_max_accumulator!(self, Int32Type, true),
901-
DataType::Int64 => instantiate_min_max_accumulator!(self, Int64Type, true),
902-
DataType::UInt8 => instantiate_min_max_accumulator!(self, UInt8Type, true),
903-
DataType::UInt16 => instantiate_min_max_accumulator!(self, UInt16Type, true),
904-
DataType::UInt32 => instantiate_min_max_accumulator!(self, UInt32Type, true),
905-
DataType::UInt64 => instantiate_min_max_accumulator!(self, UInt64Type, true),
906-
DataType::Float32 => {
928+
Int8 => instantiate_min_max_accumulator!(self, Int8Type, true),
929+
Int16 => instantiate_min_max_accumulator!(self, Int16Type, true),
930+
Int32 => instantiate_min_max_accumulator!(self, Int32Type, true),
931+
Int64 => instantiate_min_max_accumulator!(self, Int64Type, true),
932+
UInt8 => instantiate_min_max_accumulator!(self, UInt8Type, true),
933+
UInt16 => instantiate_min_max_accumulator!(self, UInt16Type, true),
934+
UInt32 => instantiate_min_max_accumulator!(self, UInt32Type, true),
935+
UInt64 => instantiate_min_max_accumulator!(self, UInt64Type, true),
936+
Float32 => {
907937
instantiate_min_max_accumulator!(self, Float32Type, true)
908938
}
909-
DataType::Float64 => {
939+
Float64 => {
910940
instantiate_min_max_accumulator!(self, Float64Type, true)
911941
}
912-
913-
DataType::Decimal128(_, _) => {
942+
Date32 => instantiate_min_max_accumulator!(self, Date32Type, true),
943+
Date64 => instantiate_min_max_accumulator!(self, Date64Type, true),
944+
Time32(Second) => instantiate_min_max_accumulator!(self, Time32SecondType, true),
945+
Time32(Millisecond) => instantiate_min_max_accumulator!(self, Time32MillisecondType, true),
946+
Time64(Microsecond) => instantiate_min_max_accumulator!(self, Time64MicrosecondType, true),
947+
Time64(Nanosecond) => instantiate_min_max_accumulator!(self, Time64NanosecondType, true),
948+
Timestamp(Second, _) => instantiate_min_max_accumulator!(self, TimestampSecondType, true),
949+
Timestamp(Millisecond, _) => instantiate_min_max_accumulator!(self, TimestampMillisecondType, true),
950+
Timestamp(Microsecond, _) => instantiate_min_max_accumulator!(self, TimestampMicrosecondType, true),
951+
Timestamp(Nanosecond, _) => instantiate_min_max_accumulator!(self, TimestampNanosecondType, true),
952+
Decimal128(_, _) => {
914953
Ok(Box::new(MinMaxGroupsPrimitiveAccumulator::<
915954
Decimal128Type,
916955
true,
917956
>::new(&self.data_type)))
918957
}
919-
_ => Err(DataFusionError::NotImplemented(format!(
920-
"MinMaxGroupsPrimitiveAccumulator not supported for {}",
958+
// This is only reached if groups_accumulator_supported is out of sync
959+
_ => Err(DataFusionError::Internal(format!(
960+
"MinMaxGroupsPrimitiveAccumulator not supported for min({})",
921961
self.data_type
922962
))),
923963
}
@@ -1332,7 +1372,7 @@ where
13321372
}
13331373

13341374
fn size(&self) -> usize {
1335-
self.min_max.capacity() * std::mem::size_of::<usize>() + self.null_state.size()
1375+
self.min_max.capacity() * std::mem::size_of::<T>() + self.null_state.size()
13361376
}
13371377
}
13381378

0 commit comments

Comments
 (0)