Skip to content
Merged
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
arrow = "51.0.0"
arrow-schema = "51.0.0"
datafusion-common = "37.0.0"
datafusion-expr = "37.0.0"
Expand All @@ -13,7 +14,6 @@ log = "0.4.21"
datafusion-execution = "37.0.0"

[dev-dependencies]
arrow = "51.0.0"
datafusion = "37.0.0"
tokio = { version = "1.37.0", features = ["full"] }

Expand All @@ -25,3 +25,4 @@ print_stdout = "warn"
# certain lints which we don't want to enforce (for now)
pedantic = { level = "warn", priority = -1 }
missing_errors_doc = "allow"
cast_possible_truncation = "allow"
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
# datafusion-functions-json

methods to implement:
## Done

* [x] `json_get(json: str, *keys: str | int) -> JsonUnion` - Get a value from a JSON object by it's "path"
* [x] `json_get_str(json: str, *keys: str | int) -> str` - Get a string value from a JSON object by it's "path"
* [x] `json_get_int(json: str, *keys: str | int) -> int` - Get an integer value from a JSON object by it's "path"
* [x] `json_get_float(json: str, *keys: str | int) -> float` - Get a float value from a JSON object by it's "path"
* [x] `json_get_bool(json: str, *keys: str | int) -> bool` - Get a boolean value from a JSON object by it's "path"
* [x] `json_get_json(json: str, *keys: str | int) -> str` - Get any value from a JSON object by it's "path", represented as a string
* [x] `json_obj_contains(json: str, key: str) -> bool` - true if a JSON object has a specific key
* [ ] `json_obj_contains_all(json: str, keys: list[str]) -> bool` - true if a JSON object has all of a list of keys
* [ ] `json_obj_contains_any(json: str, keys: list[str]) -> bool` - true if a JSON object has all of a list of keys

## TODO

* [ ] `json_obj_keys(json: str) -> list[str]` - get the keys of a JSON object
* [ ] `json_length(json: str) -> int` - get the length of a JSON object or array
* [ ] `json_obj_values(json: str) -> list[Any]` - get the values of a JSON object
* [ ] `json_obj_contains_all(json: str, keys: list[str]) -> bool` - true if a JSON object has all of the keys
* [ ] `json_obj_contains_any(json: str, keys: list[str]) -> bool` - true if a JSON object has any of the keys
* [ ] `json_is_obj(json: str) -> bool` - true if the JSON is an object
* [ ] `json_array_contains(json: str, key: Any) -> bool` - true if a JSON array has a specific value
* [ ] `json_array_items(json: str) -> list[Any]` - get the items of a JSON array
* [ ] `json_array_items_str(json: str) -> list[Any]` - get the items of a JSON array
* [ ] `json_is_array(json: str) -> bool` - true if the JSON is an array
* [ ] `json_get(json: str, key: str | int) -> Any` - get the value of a key in a JSON object or array
* [ ] `json_get_path(json: str, key: list[str | int]) -> Any` - is this possible?
* [ ] `json_length(json: str) -> int` - get the length of a JSON object or array
* [ ] `json_valid(json: str) -> bool` - true if the JSON is valid
* [ ] `json_cast(json: str) -> Any` - cast the JSON to a native type???
199 changes: 199 additions & 0 deletions src/common_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use std::str::Utf8Error;

use arrow::array::{as_string_array, Array, ArrayRef, Int64Array, StringArray};
use arrow_schema::DataType;
use datafusion_common::{exec_err, plan_err, Result as DataFusionResult, ScalarValue};
use datafusion_expr::ColumnarValue;
use jiter::{Jiter, JiterError, Peek};

pub fn check_args(args: &[DataType], fn_name: &str) -> DataFusionResult<()> {
let first = match args.get(0) {
Some(arg) => arg,
None => return plan_err!("The `{fn_name}` function requires one or more arguments."),
};
if !matches!(first, DataType::Utf8) {
return plan_err!("Unexpected argument type to `{fn_name}` at position 1, expected a string.");
}
args[1..].iter().enumerate().try_for_each(|(index, arg)| match arg {
DataType::Utf8 | DataType::UInt64 | DataType::Int64 => Ok(()),
_ => plan_err!(
"Unexpected argument type to `{fn_name}` at position {}, expected string or int.",
index + 2
),
})
}

#[derive(Debug)]
pub enum JsonPath<'s> {
Key(&'s str),
Index(usize),
None,
}

impl From<u64> for JsonPath<'_> {
fn from(index: u64) -> Self {
JsonPath::Index(index as usize)
}
}

impl From<i64> for JsonPath<'_> {
fn from(index: i64) -> Self {
match usize::try_from(index) {
Ok(i) => Self::Index(i),
Err(_) => Self::None,
}
}
}

impl<'s> JsonPath<'s> {
pub fn extract_path(args: &'s [ColumnarValue]) -> Vec<Self> {
args[1..]
.iter()
.map(|arg| match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => Self::Key(s),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => (*i).into(),
ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => (*i).into(),
_ => Self::None,
})
.collect()
}
}

pub fn get_invoke<C: FromIterator<Option<I>> + 'static, I>(
args: &[ColumnarValue],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
to_scalar: impl Fn(Option<I>) -> ScalarValue,
) -> DataFusionResult<ColumnarValue> {
match &args[0] {
ColumnarValue::Array(json_array) => {
let result_collect = match &args[1] {
ColumnarValue::Array(a) => {
if let Some(str_path_array) = a.as_any().downcast_ref::<StringArray>() {
let paths = str_path_array.iter().map(|opt_key| opt_key.map(JsonPath::Key));
zip_apply(json_array, paths, jiter_find)
} else if let Some(int_path_array) = a.as_any().downcast_ref::<Int64Array>() {
let paths = int_path_array.iter().map(|opt_index| opt_index.map(Into::into));
zip_apply(json_array, paths, jiter_find)
} else {
return exec_err!("unexpected second argument type, expected string or int array");
}
}
ColumnarValue::Scalar(_) => {
let path = JsonPath::extract_path(args);
as_string_array(json_array)
.iter()
.map(|opt_json| jiter_find(opt_json, &path).ok())
.collect::<C>()
}
};
to_array(result_collect).map(ColumnarValue::from)
}
ColumnarValue::Scalar(ScalarValue::Utf8(s)) => {
let path = JsonPath::extract_path(args);
let v = jiter_find(s.as_ref().map(String::as_str), &path).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}
ColumnarValue::Scalar(_) => {
exec_err!("unexpected first argument type, expected string")
}
}
}

fn zip_apply<'a, P: Iterator<Item = Option<JsonPath<'a>>>, C: FromIterator<Option<I>> + 'static, I>(
json_array: &ArrayRef,
paths: P,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
as_string_array(json_array)
.iter()
.zip(paths)
.map(|(opt_json, opt_path)| {
if let Some(path) = opt_path {
jiter_find(opt_json, &[path]).ok()
} else {
None
}
})
.collect::<C>()
}

pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Option<(Jiter<'j>, Peek)> {
if let Some(json_str) = opt_json {
let mut jiter = Jiter::new(json_str.as_bytes(), false);
if let Ok(peek) = jiter.peek() {
if let Ok(peek_found) = jiter_json_find_step(&mut jiter, peek, path) {
return Some((jiter, peek_found));
}
}
}
None
}
macro_rules! get_err {
() => {
Err(GetError)
};
}
pub(crate) use get_err;

fn jiter_json_find_step(jiter: &mut Jiter, peek: Peek, path: &[JsonPath]) -> Result<Peek, GetError> {
let (first, rest) = match path.split_first() {
Some(first_rest) => first_rest,
None => return Ok(peek),
};
let next_peek = match peek {
Peek::Array => match first {
JsonPath::Index(index) => jiter_array_get(jiter, *index),
_ => get_err!(),
},
Peek::Object => match first {
JsonPath::Key(key) => jiter_object_get(jiter, key),
_ => get_err!(),
},
_ => get_err!(),
}?;
jiter_json_find_step(jiter, next_peek, rest)
}

fn jiter_array_get(jiter: &mut Jiter, find_key: usize) -> Result<Peek, GetError> {
let mut peek_opt = jiter.known_array()?;

let mut index: usize = 0;
while let Some(peek) = peek_opt {
if index == find_key {
return Ok(peek);
}
jiter.next_skip()?;
index += 1;
peek_opt = jiter.array_step()?;
}
get_err!()
}

fn jiter_object_get(jiter: &mut Jiter, find_key: &str) -> Result<Peek, GetError> {
let mut opt_key = jiter.known_object()?;

while let Some(key) = opt_key {
if key == find_key {
let value_peek = jiter.peek()?;
return Ok(value_peek);
}
jiter.next_skip()?;
opt_key = jiter.next_key()?;
}
get_err!()
}

pub struct GetError;

impl From<JiterError> for GetError {
fn from(_: JiterError) -> Self {
GetError
}
}

impl From<Utf8Error> for GetError {
fn from(_: Utf8Error) -> Self {
GetError
}
}
49 changes: 49 additions & 0 deletions src/common_macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/// Creates external API `ScalarUDF` for an array UDF. Specifically, creates
///
/// Creates a singleton `ScalarUDF` of the `$udf_impl` function named `$expr_fn_name _udf` and a
/// function named `$expr_fn_name _udf` which returns that function.
///
/// This is used to ensure creating the list of `ScalarUDF` only happens once.
///
/// # Arguments
/// * `udf_impl`: name of the [`ScalarUDFImpl`]
/// * `expr_fn_name`: name of the `expr_fn` function to be created
/// * `arg`: 0 or more named arguments for the function
/// * `doc`: documentation string for the function
///
/// Copied mostly from, `/datafusion/functions-array/src/macros.rs`.
///
/// [`ScalarUDFImpl`]: datafusion_expr::ScalarUDFImpl
macro_rules! make_udf_function {
($udf_impl:ty, $expr_fn_name:ident, $($arg:ident)*, $doc:expr) => {
paste::paste! {
#[doc = $doc]
#[must_use] pub fn $expr_fn_name($($arg: datafusion_expr::Expr),*) -> datafusion_expr::Expr {
datafusion_expr::Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction::new_udf(
[< $expr_fn_name _udf >](),
vec![$($arg),*],
))
}

/// Singleton instance of [`$udf_impl`], ensures the UDF is only created once
/// named for example `STATIC_JSON_OBJ_CONTAINS`
static [< STATIC_ $expr_fn_name:upper >]: std::sync::OnceLock<std::sync::Arc<datafusion_expr::ScalarUDF>> =
std::sync::OnceLock::new();

/// ScalarFunction that returns a [`ScalarUDF`] for [`$udf_impl`]
///
/// [`ScalarUDF`]: datafusion_expr::ScalarUDF
pub fn [< $expr_fn_name _udf >]() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
[< STATIC_ $expr_fn_name:upper >]
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$udf_impl>::default(),
))
})
.clone()
}
}
};
}

pub(crate) use make_udf_function;
Loading