Skip to content

Commit e84b999

Browse files
authored
Implement GetIndexedField for map-typed columns (#7825)
* Implement GetIndexedField for map-typed columns * Drop explicit dep on arrow-ord and use re-exported kernel
1 parent d33595a commit e84b999

4 files changed

Lines changed: 73 additions & 3 deletions

File tree

10.3 KB
Binary file not shown.

datafusion/expr/src/field_util.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,19 @@ impl GetFieldAccessSchema {
4747
match self {
4848
Self::NamedStructField{ name } => {
4949
match (data_type, name) {
50+
(DataType::Map(fields, _), _) => {
51+
match fields.data_type() {
52+
DataType::Struct(fields) if fields.len() == 2 => {
53+
// Arrow's MapArray is essentially a ListArray of structs with two columns. They are
54+
// often named "key", and "value", but we don't require any specific naming here;
55+
// instead, we assume that the second columnis the "value" column both here and in
56+
// execution.
57+
let value_field = fields.get(1).expect("fields should have exactly two members");
58+
Ok(Field::new("map", value_field.data_type().clone(), true))
59+
},
60+
_ => plan_err!("Map fields must contain a Struct with exactly 2 fields"),
61+
}
62+
}
5063
(DataType::Struct(fields), ScalarValue::Utf8(Some(s))) => {
5164
if s.is_empty() {
5265
plan_err!(
@@ -60,7 +73,7 @@ impl GetFieldAccessSchema {
6073
(DataType::Struct(_), _) => plan_err!(
6174
"Only utf8 strings are valid as an indexed field in a struct"
6275
),
63-
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List` or `Struct` types, got {other}"),
76+
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List`, `Struct`, or `Map` types, got {other}"),
6477
}
6578
}
6679
Self::ListIndex{ key_dt } => {

datafusion/physical-expr/src/expressions/get_indexed_field.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
//! get field of a `ListArray`
1919
2020
use crate::PhysicalExpr;
21-
use arrow::array::Array;
2221
use datafusion_common::exec_err;
2322

2423
use crate::array_expressions::{array_element, array_slice};
2524
use crate::physical_expr::down_cast_any_ref;
2625
use arrow::{
26+
array::{Array, Scalar, StringArray},
2727
datatypes::{DataType, Schema},
2828
record_batch::RecordBatch,
2929
};
30-
use datafusion_common::{cast::as_struct_array, DataFusionError, Result, ScalarValue};
30+
use datafusion_common::{
31+
cast::{as_map_array, as_struct_array},
32+
DataFusionError, Result, ScalarValue,
33+
};
3134
use datafusion_expr::{field_util::GetFieldAccessSchema, ColumnarValue};
3235
use std::fmt::Debug;
3336
use std::hash::{Hash, Hasher};
@@ -183,6 +186,14 @@ impl PhysicalExpr for GetIndexedFieldExpr {
183186
let array = self.arg.evaluate(batch)?.into_array(batch.num_rows());
184187
match &self.field {
185188
GetFieldAccessExpr::NamedStructField{name} => match (array.data_type(), name) {
189+
(DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => {
190+
let map_array = as_map_array(array.as_ref())?;
191+
let key_scalar = Scalar::new(StringArray::from(vec![k.clone()]));
192+
let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?;
193+
let entries = arrow::compute::filter(map_array.entries(), &keys)?;
194+
let entries_struct_array = as_struct_array(entries.as_ref())?;
195+
Ok(ColumnarValue::Array(entries_struct_array.column(1).clone()))
196+
}
186197
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => {
187198
let as_struct_array = as_struct_array(&array)?;
188199
match as_struct_array.column_by_name(k) {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
statement ok
19+
CREATE EXTERNAL TABLE data
20+
STORED AS PARQUET
21+
LOCATION '../core/tests/data/parquet_map.parquet';
22+
23+
query I
24+
SELECT SUM(ints['bytes']) FROM data;
25+
----
26+
5636785
27+
28+
query I
29+
SELECT SUM(ints['bytes']) FROM data WHERE strings['method'] == 'GET';
30+
----
31+
649668
32+
33+
query TI
34+
SELECT strings['method'] AS method, COUNT(*) as count FROM data GROUP BY method ORDER BY count DESC;
35+
----
36+
POST 41
37+
HEAD 33
38+
PATCH 30
39+
OPTION 29
40+
GET 27
41+
PUT 25
42+
DELETE 24
43+
44+
query T
45+
SELECT strings['not_found'] FROM data LIMIT 1;
46+
----

0 commit comments

Comments
 (0)