From 7553836694c96cd3ce59f303f595d2edd322aee0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 13 Aug 2024 11:52:48 -0400 Subject: [PATCH 01/26] Add window function as template for others and function builder --- python/datafusion/expr.py | 15 ++++ python/datafusion/functions.py | 16 +++- python/datafusion/tests/test_dataframe.py | 1 + src/expr.rs | 95 ++++++++++++++++++++++- src/functions.rs | 8 ++ 5 files changed, 130 insertions(+), 5 deletions(-) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 71fcf397b..dd1f25310 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -355,6 +355,10 @@ def is_null(self) -> Expr: """Returns ``True`` if this expression is null.""" return Expr(self.expr.is_null()) + def is_not_null(self) -> Expr: + """Returns ``True`` if this expression is not null.""" + return Expr(self.expr.is_not_null()) + def cast(self, to: pa.DataType[Any]) -> Expr: """Cast to a new data type.""" return Expr(self.expr.cast(to)) @@ -405,6 +409,17 @@ def column_name(self, plan: LogicalPlan) -> str: """Compute the output column name based on the provided logical plan.""" return self.expr.column_name(plan) + def order_by(self, *exprs: Expr) -> ExprFuncBuilder: + return ExprFuncBuilder(self.expr.order_by(list(e.expr for e in exprs))) + + +class ExprFuncBuilder: + def __init__(self, builder: expr_internal.ExprFuncBuilder): + self.builder = builder + + def build(self) -> Expr: + return Expr(self.builder.build()) + class WindowFrame: """Defines a window frame for performing window operations.""" diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index ec0c1104d..12caeacf5 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -27,6 +27,11 @@ from datafusion.expr import CaseBuilder, Expr, WindowFrame from datafusion.context import SessionContext +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import pyarrow as pa + __all__ = [ "abs", "acos", @@ -247,6 +252,7 @@ "var_samp", "when", "window", + "lead", ] @@ -1022,12 +1028,12 @@ def struct(*args: Expr) -> Expr: return Expr(f.struct(*args)) -def named_struct(name_pairs: list[(str, Expr)]) -> Expr: +def named_struct(name_pairs: list[tuple[str, Expr]]) -> Expr: """Returns a struct with the given names and arguments pairs.""" - name_pairs = [[Expr.literal(pair[0]), pair[1]] for pair in name_pairs] + name_pair_exprs = [[Expr.literal(pair[0]), pair[1]] for pair in name_pairs] # flatten - name_pairs = [x.expr for xs in name_pairs for x in xs] + name_pairs = [x.expr for xs in name_pair_exprs for x in xs] return Expr(f.named_struct(*name_pairs)) @@ -1748,3 +1754,7 @@ def bool_and(arg: Expr, distinct: bool = False) -> Expr: def bool_or(arg: Expr, distinct: bool = False) -> Expr: """Computes the boolean OR of the arguement.""" return Expr(f.bool_or(arg.expr, distinct=distinct)) + + +def lead(arg: Expr, shift_offset: int = 1, default_value: pa.Scalar | None = None): + return Expr(f.lead(arg.expr, shift_offset, default_value)) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 477bc0fce..5526f9c6e 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -306,6 +306,7 @@ def test_distinct(): f.window("lead", [column("b")], order_by=[f.order_by(column("b"))]), [5, 6, None], ), + ("lead", f.lead(column("b")).order_by(column("b").sort()).build(), [5, 6, None]), ( "previous", f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]), diff --git a/src/expr.rs b/src/expr.rs index 04bfc85c2..a40679e03 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -16,10 +16,11 @@ // under the License. use datafusion_expr::utils::exprlist_to_fields; -use datafusion_expr::LogicalPlan; +use datafusion_expr::{ExprFuncBuilder, ExprFunctionExt, LogicalPlan}; use pyo3::{basic::CompareOp, prelude::*}; use std::convert::{From, Into}; use std::sync::Arc; +use window::PyWindowFrame; use arrow::pyarrow::ToPyArrow; use datafusion::arrow::datatypes::{DataType, Field}; @@ -32,7 +33,7 @@ use datafusion_expr::{ lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast, }; -use crate::common::data_type::{DataTypeMap, RexType}; +use crate::common::data_type::{DataTypeMap, NullTreatment, RexType}; use crate::errors::{py_runtime_err, py_type_err, py_unsupported_variant_err, DataFusionError}; use crate::expr::aggregate_expr::PyAggregateFunction; use crate::expr::binary_expr::PyBinaryExpr; @@ -281,6 +282,10 @@ impl PyExpr { self.expr.clone().is_null().into() } + pub fn is_not_null(&self) -> PyExpr { + self.expr.clone().is_not_null().into() + } + pub fn cast(&self, to: PyArrowType) -> PyExpr { // self.expr.cast_to() requires DFSchema to validate that the cast // is supported, omit that for now @@ -510,6 +515,92 @@ impl PyExpr { pub fn column_name(&self, plan: PyLogicalPlan) -> PyResult { self._column_name(&plan.plan()).map_err(py_runtime_err) } + + // Expression Function Builder functions + + pub fn order_by(&self, order_by: Vec) -> PyExprFuncBuilder { + let order_by = order_by.iter().map(|e| e.expr.clone()).collect(); + self.expr.clone().order_by(order_by).into() + } + + pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder { + self.expr.clone().filter(filter.expr.clone()).into() + } + + pub fn distinct(&self) -> PyExprFuncBuilder { + self.expr.clone().distinct().into() + } + + pub fn null_treatment(&self, null_treatment: NullTreatment) -> PyExprFuncBuilder { + self.expr + .clone() + .null_treatment(Some(null_treatment.into())) + .into() + } + + pub fn partition_by(&self, partition_by: Vec) -> PyExprFuncBuilder { + let partition_by = partition_by.iter().map(|e| e.expr.clone()).collect(); + self.expr.clone().partition_by(partition_by).into() + } + + pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder { + self.expr.clone().window_frame(window_frame.into()).into() + } +} + +#[pyclass(name = "ExprFuncBuilder", module = "datafusion.expr", subclass)] +#[derive(Debug, Clone)] +pub struct PyExprFuncBuilder { + pub builder: ExprFuncBuilder, +} + +impl From for PyExprFuncBuilder { + fn from(builder: ExprFuncBuilder) -> Self { + Self { builder } + } +} + +#[pymethods] +impl PyExprFuncBuilder { + pub fn order_by(&self, order_by: Vec) -> PyExprFuncBuilder { + let order_by = order_by.iter().map(|e| e.expr.clone()).collect(); + self.builder.clone().order_by(order_by).into() + } + + pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder { + self.builder.clone().filter(filter.expr.clone()).into() + } + + pub fn distinct(&self) -> PyExprFuncBuilder { + self.builder.clone().distinct().into() + } + + pub fn null_treatment(&self, null_treatment: NullTreatment) -> PyExprFuncBuilder { + self.builder + .clone() + .null_treatment(Some(null_treatment.into())) + .into() + } + + pub fn partition_by(&self, partition_by: Vec) -> PyExprFuncBuilder { + let partition_by = partition_by.iter().map(|e| e.expr.clone()).collect(); + self.builder.clone().partition_by(partition_by).into() + } + + pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder { + self.builder + .clone() + .window_frame(window_frame.into()) + .into() + } + + pub fn build(&self) -> PyResult { + self.builder + .clone() + .build() + .map(|expr| expr.into()) + .map_err(|err| err.into()) + } } impl PyExpr { diff --git a/src/functions.rs b/src/functions.rs index 252563621..c51b1431c 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -16,6 +16,7 @@ // under the License. use datafusion::functions_aggregate::all_default_aggregate_functions; +use datafusion_expr::window_function; use datafusion_expr::ExprFunctionExt; use pyo3::{prelude::*, wrap_pyfunction}; @@ -890,6 +891,11 @@ aggregate_function!(array_agg, functions_aggregate::array_agg::array_agg_udaf); aggregate_function!(max, functions_aggregate::min_max::max_udaf); aggregate_function!(min, functions_aggregate::min_max::min_udaf); +#[pyfunction] +pub fn lead(arg: PyExpr, shift_offset: i64, default_value: Option) -> PyExpr { + window_function::lead(arg.expr, Some(shift_offset), default_value).into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1075,5 +1081,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(array_slice))?; m.add_wrapped(wrap_pyfunction!(flatten))?; + m.add_wrapped(wrap_pyfunction!(lead))?; + Ok(()) } From 4cfb65070febfbfded71dbb44bfcd379f33b9adb Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 13 Aug 2024 13:59:52 -0400 Subject: [PATCH 02/26] Adding docstrings --- python/datafusion/expr.py | 86 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index dd1f25310..57ad80dab 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -23,7 +23,7 @@ from __future__ import annotations from ._internal import expr as expr_internal, LogicalPlan -from datafusion.common import RexType, DataTypeMap +from datafusion.common import NullTreatment, RexType, DataTypeMap from typing import Any import pyarrow as pa @@ -410,14 +410,98 @@ def column_name(self, plan: LogicalPlan) -> str: return self.expr.column_name(plan) def order_by(self, *exprs: Expr) -> ExprFuncBuilder: + """Set the ordering for a window or aggregate function. + + This function will create an :py:class:`ExprFuncBuilder` that can be used to + set parameters for either window or aggregate functions. If used on any other + type of expression, an error will be generated when ``build()`` is called. + """ return ExprFuncBuilder(self.expr.order_by(list(e.expr for e in exprs))) + def filter(self, filter: Expr) -> ExprFuncBuilder: + """Filter an aggregate function. + + This function will create an :py:class:`ExprFuncBuilder` that can be used to + set parameters for either window or aggregate functions. If used on any other + type of expression, an error will be generated when ``build()`` is called. + """ + return ExprFuncBuilder(self.expr.filter(filter.expr)) + + def distinct(self) -> ExprFuncBuilder: + """Only evaluate distinct values for an aggregate function. + + This function will create an :py:class:`ExprFuncBuilder` that can be used to + set parameters for either window or aggregate functions. If used on any other + type of expression, an error will be generated when ``build()`` is called. + """ + return ExprFuncBuilder(self.expr.distinct()) + + def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder: + """Set the treatment for ``null`` values for a window or aggregate function. + + This function will create an :py:class:`ExprFuncBuilder` that can be used to + set parameters for either window or aggregate functions. If used on any other + type of expression, an error will be generated when ``build()`` is called. + """ + return ExprFuncBuilder(self.expr.null_treatment(null_treatment)) + + def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: + """Set the partitioning for a window function. + + This function will create an :py:class:`ExprFuncBuilder` that can be used to + set parameters for either window or aggregate functions. If used on any other + type of expression, an error will be generated when ``build()`` is called. + """ + return ExprFuncBuilder( + self.expr.partition_by(list(e.expr for e in partition_by)) + ) + + def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder: + """Set the frame fora window function. + + This function will create an :py:class:`ExprFuncBuilder` that can be used to + set parameters for either window or aggregate functions. If used on any other + type of expression, an error will be generated when ``build()`` is called. + """ + return ExprFuncBuilder(self.expr.window_frame(window_frame)) + class ExprFuncBuilder: def __init__(self, builder: expr_internal.ExprFuncBuilder): self.builder = builder + def order_by(self, *exprs: Expr) -> ExprFuncBuilder: + """Set the ordering for a window or aggregate function. + + Values given in ``exprs`` must be sort expressions. You can convert any other + expression to a sort expression using `.sort()`. + """ + return ExprFuncBuilder(self.builder.order_by(list(e.expr for e in exprs))) + + def filter(self, filter: Expr) -> ExprFuncBuilder: + """Filter values during aggregation.""" + return ExprFuncBuilder(self.builder.filter(filter.expr)) + + def distinct(self) -> ExprFuncBuilder: + """Only evaluate distinct values during aggregation.""" + return ExprFuncBuilder(self.builder.distinct()) + + def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder: + """Set how nulls are treated for either window or aggregate functions.""" + return ExprFuncBuilder(self.builder.null_treatment(null_treatment)) + + def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: + """Set partitioning for window functions.""" + return ExprFuncBuilder( + self.builder.partition_by(list(e.expr for e in partition_by)) + ) + + def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder: + """Set window frame for window functions.""" + return ExprFuncBuilder(self.builder.window_frame(window_frame)) + def build(self) -> Expr: + """Create an expression from a Function Builder.""" return Expr(self.builder.build()) From 0803153c35244a1899652626857bf2d5a7a8e8e6 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 13 Aug 2024 14:11:56 -0400 Subject: [PATCH 03/26] Change last_value to use function builder instead of explicitly passing values --- python/datafusion/functions.py | 25 ++++++++----------------- src/functions.rs | 28 ++-------------------------- 2 files changed, 10 insertions(+), 43 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 12caeacf5..0d843b1b4 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1712,23 +1712,13 @@ def first_value( ) -def last_value( - arg: Expr, - distinct: bool = False, - filter: bool = None, - order_by: Expr | None = None, - null_treatment: common.NullTreatment | None = None, -) -> Expr: - """Returns the last value in a group of values.""" - return Expr( - f.last_value( - arg.expr, - distinct=distinct, - filter=filter, - order_by=order_by, - null_treatment=null_treatment, - ) - ) +def last_value(arg: Expr) -> Expr: + """Returns the last value in a group of values. + + To set parameters on this expression, use ``.order_by()``, ``.distinct()``, + ``.filter()``, or ``.null_treatment()``. + """ + return Expr(f.last_value(arg.expr)) def bit_and(arg: Expr, distinct: bool = False) -> Expr: @@ -1757,4 +1747,5 @@ def bool_or(arg: Expr, distinct: bool = False) -> Expr: def lead(arg: Expr, shift_offset: int = 1, default_value: pa.Scalar | None = None): + """Create a lead window function.""" return Expr(f.lead(arg.expr, shift_offset, default_value)) diff --git a/src/functions.rs b/src/functions.rs index c51b1431c..e5a186e3a 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -349,32 +349,8 @@ pub fn first_value( } #[pyfunction] -pub fn last_value( - expr: PyExpr, - distinct: bool, - filter: Option, - order_by: Option>, - null_treatment: Option, -) -> PyResult { - let agg_fn = functions_aggregate::expr_fn::last_value(vec![expr.expr]); - - // luckily, I can guarantee initializing a builder with an `order_by` default of empty vec - let order_by = order_by - .map(|x| x.into_iter().map(|x| x.expr).collect::>()) - .unwrap_or_default(); - let mut builder = agg_fn.order_by(order_by); - - if distinct { - builder = builder.distinct(); - } - - if let Some(filter) = filter { - builder = builder.filter(filter.expr); - } - - builder = builder.null_treatment(null_treatment.map(DFNullTreatment::from)); - - Ok(builder.build()?.into()) +pub fn last_value(expr: PyExpr) -> PyExpr { + functions_aggregate::expr_fn::last_value(vec![expr.expr]).into() } #[pyfunction] From 309d2366f13a917d84abc7fc1945debcdb968567 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 21 Aug 2024 08:41:22 -0400 Subject: [PATCH 04/26] Allow any value for lead function default value and add unit test --- python/datafusion/functions.py | 34 +++++++++++++++++++---- python/datafusion/tests/test_dataframe.py | 10 ++++--- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 0d843b1b4..03bbf9e4d 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -27,10 +27,10 @@ from datafusion.expr import CaseBuilder, Expr, WindowFrame from datafusion.context import SessionContext -from typing import TYPE_CHECKING +from typing import Any, Optional +from typing_extensions import deprecated -if TYPE_CHECKING: - import pyarrow as pa +import pyarrow as pa __all__ = [ "abs", @@ -389,7 +389,15 @@ def window( window_frame: WindowFrame | None = None, ctx: SessionContext | None = None, ) -> Expr: - """Creates a new Window function expression.""" + """Creates a new Window function expression. + + This interface is deprecateted. Instead of using this interface, users should call + the window functions directly. For example, to perform a lag use + + ``` + df.select(functions.lag(col("a")).partition_by(col("b")).build()) + ``` + """ args = [a.expr for a in args] partition_by = [e.expr for e in partition_by] if partition_by is not None else None order_by = [o.expr for o in order_by] if order_by is not None else None @@ -1746,6 +1754,20 @@ def bool_or(arg: Expr, distinct: bool = False) -> Expr: return Expr(f.bool_or(arg.expr, distinct=distinct)) -def lead(arg: Expr, shift_offset: int = 1, default_value: pa.Scalar | None = None): - """Create a lead window function.""" +def lead(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) -> Expr: + """Create a lead window function. + + Lead operation will return the argument that is in the next shift_offset-th row in + the partition. For example ``lead(col("b"), shift_offset=3, default_value=5)`` will + return the 3rd following value in column ``b``. At the end of the partition, where + no futher values can be returned it will return the default value of 5. + + Args: + arg: Value to return + shift_offset: Number of rows following the current row. + default_value: Value to return if shift_offet row does not exist. + """ + if not isinstance(default_value, pa.Scalar): + default_value = pa.scalar(default_value) + return Expr(f.lead(arg.expr, shift_offset, default_value)) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 5526f9c6e..95d31798b 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -301,12 +301,14 @@ def test_distinct(): f.window("ntile", [literal(2)], order_by=[f.order_by(column("c"))]), [1, 1, 2], ), + ("lead", f.lead(column("b")).order_by(column("b").sort()).build(), [5, 6, None]), ( - "next", - f.window("lead", [column("b")], order_by=[f.order_by(column("b"))]), - [5, 6, None], + "lead_by_2", + f.lead(column("b"), shift_offset=2, default_value=-1) + .order_by(column("b").sort()) + .build(), + [6, -1, -1], ), - ("lead", f.lead(column("b")).order_by(column("b").sort()).build(), [5, 6, None]), ( "previous", f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]), From 37b154edc28a8e0e10263f733679647e10bbf4c1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 21 Aug 2024 08:48:45 -0400 Subject: [PATCH 05/26] Add lead window function and unit tests --- python/datafusion/functions.py | 20 ++++++++++++++++++++ python/datafusion/tests/test_dataframe.py | 9 ++++++--- src/functions.rs | 6 ++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 03bbf9e4d..3cfb2bcc8 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -253,6 +253,7 @@ "when", "window", "lead", + "lag", ] @@ -1771,3 +1772,22 @@ def lead(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) default_value = pa.scalar(default_value) return Expr(f.lead(arg.expr, shift_offset, default_value)) + + +def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) -> Expr: + """Create a lag window function. + + Lag operation will return the argument that is in the previous shift_offset-th row + in the partition. For example ``lag(col("b"), shift_offset=3, default_value=5)`` + will return the 3rd previous value in column ``b``. At the beginnig of the + partition, where no values can be returned it will return the default value of 5. + + Args: + arg: Value to return + shift_offset: Number of rows before the current row. + default_value: Value to return if shift_offet row does not exist. + """ + if not isinstance(default_value, pa.Scalar): + default_value = pa.scalar(default_value) + + return Expr(f.lag(arg.expr, shift_offset, default_value)) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 95d31798b..1c8763626 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -309,10 +309,13 @@ def test_distinct(): .build(), [6, -1, -1], ), + ("lag", f.lag(column("b")).order_by(column("b").sort()).build(), [None, 4, 5]), ( - "previous", - f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]), - [None, 4, 5], + "lag_by_2", + f.lag(column("b"), shift_offset=2, default_value=-1) + .order_by(column("b").sort()) + .build(), + [-1, -1, 4], ), pytest.param( "first_value", diff --git a/src/functions.rs b/src/functions.rs index e5a186e3a..6679dfca2 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -872,6 +872,11 @@ pub fn lead(arg: PyExpr, shift_offset: i64, default_value: Option) window_function::lead(arg.expr, Some(shift_offset), default_value).into() } +#[pyfunction] +pub fn lag(arg: PyExpr, shift_offset: i64, default_value: Option) -> PyExpr { + window_function::lag(arg.expr, Some(shift_offset), default_value).into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1058,6 +1063,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(flatten))?; m.add_wrapped(wrap_pyfunction!(lead))?; + m.add_wrapped(wrap_pyfunction!(lag))?; Ok(()) } From 58a480779ae64f4cf300383410f3a484092a0ec1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 21 Aug 2024 09:30:19 -0400 Subject: [PATCH 06/26] Temporarily commenting out deprecated functions in documenation so builder will pass --- .../user-guide/common-operations/windows.rst | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/docs/source/user-guide/common-operations/windows.rst b/docs/source/user-guide/common-operations/windows.rst index 5ef3c986c..263a310ad 100644 --- a/docs/source/user-guide/common-operations/windows.rst +++ b/docs/source/user-guide/common-operations/windows.rst @@ -15,13 +15,16 @@ .. specific language governing permissions and limitations .. under the License. +.. _window_functions: + Window Functions ================ -In this section you will learn about window functions. A window function utilizes values from one or multiple rows to -produce a result for each individual row, unlike an aggregate function that provides a single value for multiple rows. +In this section you will learn about window functions. A window function utilizes values from one or +multiple rows to produce a result for each individual row, unlike an aggregate function that +provides a single value for multiple rows. -The functionality of window functions in DataFusion is supported by the dedicated :py:func:`~datafusion.functions.window` function. +The window functions are availble in the :py:mod:`~datafusion.functions` module. We'll use the pokemon dataset (from Ritchie Vink) in the following examples. @@ -40,17 +43,18 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples. ctx = SessionContext() df = ctx.read_csv("pokemon.csv") -Here is an example that shows how to compare each pokemons’s attack power with the average attack power in its ``"Type 1"`` +Here is an example that shows how to compare each pokemons’s attack power with the average attack +power in its ``"Type 1"`` .. ipython:: python df.select( col('"Name"'), col('"Attack"'), - f.alias( - f.window("avg", [col('"Attack"')], partition_by=[col('"Type 1"')]), - "Average Attack", - ) + #f.alias( + # f.window("avg", [col('"Attack"')], partition_by=[col('"Type 1"')]), + # "Average Attack", + #) ) You can also control the order in which rows are processed by window functions by providing @@ -61,15 +65,15 @@ a list of ``order_by`` functions for the ``order_by`` parameter. df.select( col('"Name"'), col('"Attack"'), - f.alias( - f.window( - "rank", - [], - partition_by=[col('"Type 1"')], - order_by=[f.order_by(col('"Attack"'))], - ), - "rank", - ), + #f.alias( + # f.window( + # "rank", + # [], + # partition_by=[col('"Type 1"')], + # order_by=[f.order_by(col('"Attack"'))], + # ), + # "rank", + #), ) The possible window functions are: From f09496e87e878871c9f9ca2f2236f4a04a208089 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 21 Aug 2024 09:33:25 -0400 Subject: [PATCH 07/26] Expose row_number window function --- python/datafusion/functions.py | 10 ++++++++++ python/datafusion/tests/test_dataframe.py | 2 +- src/functions.rs | 6 ++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 3cfb2bcc8..ed9907165 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1791,3 +1791,13 @@ def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) - default_value = pa.scalar(default_value) return Expr(f.lag(arg.expr, shift_offset, default_value)) + + +def row_number() -> Expr: + """Create a row number window function. + + Returns the row number of the window function. To set window function parameters + use the window builder approach described in the ref:`_window_functions` online + documentation. + """ + return Expr(f.row_number()) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 1c8763626..16a67b468 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -279,7 +279,7 @@ def test_distinct(): data_test_window_functions = [ - ("row", f.window("row_number", [], order_by=[f.order_by(column("c"))]), [2, 1, 3]), + ("row", f.row_number().order_by(column("c").sort()).build(), [2, 1, 3]), ("rank", f.window("rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2]), ( "dense_rank", diff --git a/src/functions.rs b/src/functions.rs index 6679dfca2..1ef092df2 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -877,6 +877,11 @@ pub fn lag(arg: PyExpr, shift_offset: i64, default_value: Option) - window_function::lag(arg.expr, Some(shift_offset), default_value).into() } +#[pyfunction] +pub fn row_number() -> PyExpr { + window_function::row_number().into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1064,6 +1069,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(lead))?; m.add_wrapped(wrap_pyfunction!(lag))?; + m.add_wrapped(wrap_pyfunction!(row_number))?; Ok(()) } From c242728ec3820894c3435bfa33129b4f9f9bfb5f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 21 Aug 2024 09:53:11 -0400 Subject: [PATCH 08/26] Add rank window function --- python/datafusion/functions.py | 39 +++++++++++++++++++++-- python/datafusion/tests/test_dataframe.py | 2 +- src/functions.rs | 6 ++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index ed9907165..a462bbd86 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1763,6 +1763,9 @@ def lead(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) return the 3rd following value in column ``b``. At the end of the partition, where no futher values can be returned it will return the default value of 5. + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + Args: arg: Value to return shift_offset: Number of rows following the current row. @@ -1782,6 +1785,9 @@ def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) - will return the 3rd previous value in column ``b``. At the beginnig of the partition, where no values can be returned it will return the default value of 5. + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + Args: arg: Value to return shift_offset: Number of rows before the current row. @@ -1796,8 +1802,35 @@ def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) - def row_number() -> Expr: """Create a row number window function. - Returns the row number of the window function. To set window function parameters - use the window builder approach described in the ref:`_window_functions` online - documentation. + Returns the row number of the window function. + + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. """ return Expr(f.row_number()) + + +def rank() -> Expr: + """Create a rank window function. + + Returns the rank based upon the window order. Consecutive equal values will receive + the same rank, but the next different value will not be consecutive but rather the + number of rows that preceed it plus one. This is similar to Olympic medals. If two + people tie for gold, the next place is bronze. There would be no silver medal. Here + is an example of a dataframe with a window ordered by descending ``points`` and the + associated rank. + + ``` + +--------+------+ + | points | rank | + +--------+------+ + | 100 | 1 | + | 100 | 1 | + | 50 | 3 | + +--------+------+ + ``` + + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + """ + return Expr(f.rank()) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 16a67b468..95d397737 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -280,7 +280,7 @@ def test_distinct(): data_test_window_functions = [ ("row", f.row_number().order_by(column("c").sort()).build(), [2, 1, 3]), - ("rank", f.window("rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2]), + ("rank", f.rank().order_by(column("c").sort()).build(), [2, 1, 2]), ( "dense_rank", f.window("dense_rank", [], order_by=[f.order_by(column("c"))]), diff --git a/src/functions.rs b/src/functions.rs index 1ef092df2..3a06a33fc 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -882,6 +882,11 @@ pub fn row_number() -> PyExpr { window_function::row_number().into() } +#[pyfunction] +pub fn rank() -> PyExpr { + window_function::rank().into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1070,6 +1075,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(lead))?; m.add_wrapped(wrap_pyfunction!(lag))?; m.add_wrapped(wrap_pyfunction!(row_number))?; + m.add_wrapped(wrap_pyfunction!(rank))?; Ok(()) } From 18f7c81688f3d931ce871e01ca7d8d01d22137d4 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 22 Aug 2024 19:31:08 -0400 Subject: [PATCH 09/26] Add percent rank and dense rank --- python/datafusion/dataframe.py | 7 ++- python/datafusion/functions.py | 56 +++++++++++++++++++++++ python/datafusion/tests/test_dataframe.py | 2 +- src/functions.rs | 12 +++++ 4 files changed, 72 insertions(+), 5 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 4f1760135..0e7d82e29 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -123,11 +123,10 @@ def select(self, *exprs: Expr | str) -> DataFrame: df = df.select("a", col("b"), col("a").alias("alternate_a")) """ - exprs = [ - arg.expr if isinstance(arg, Expr) else Expr.column(arg).expr - for arg in exprs + exprs_internal = [ + Expr.column(arg).expr if isinstance(arg, str) else arg.expr for arg in exprs ] - return DataFrame(self.df.select(*exprs)) + return DataFrame(self.df.select(*exprs_internal)) def filter(self, *predicates: Expr) -> DataFrame: """Return a DataFrame for which ``predicate`` evaluates to ``True``. diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index a462bbd86..87b6f3f06 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -251,9 +251,14 @@ "var_pop", "var_samp", "when", + # Window Functions "window", "lead", "lag", + "row_number", + "rank", + "dense_rank", + "percent_rank", ] @@ -1820,6 +1825,8 @@ def rank() -> Expr: is an example of a dataframe with a window ordered by descending ``points`` and the associated rank. + You should set ``order_by`` to produce meaningful results. + ``` +--------+------+ | points | rank | @@ -1827,6 +1834,55 @@ def rank() -> Expr: | 100 | 1 | | 100 | 1 | | 50 | 3 | + | 25 | 4 | + +--------+------+ + ``` + + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + """ + return Expr(f.rank()) + + +def dense_rank() -> Expr: + """Create a dense_rank window function. + + This window function is similar to :py:func:`rank` except that the returned values + will be consecutive. Here is an example of a dataframe with a window ordered by + descending ``points`` and the associated dense rank. + + ``` + +--------+------------+ + | points | dense_rank | + +--------+------------+ + | 100 | 1 | + | 100 | 1 | + | 50 | 2 | + | 25 | 3 | + +--------+------------+ + ``` + + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + """ + return Expr(f.rank()) + + +def percent_rank() -> Expr: + """Create a percent_rank window function. + + This window function is similar to :py:func:`rank` except that the returned values + will be consecutive. Here is an example of a dataframe with a window ordered by + descending ``points`` and the associated dense rank. + + ``` + +--------+------+ + | points | rank | + +--------+------+ + | 100 | 1 | + | 100 | 1 | + | 50 | 2 | + | 25 | 3 | +--------+------+ ``` diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 95d397737..238f1df3e 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -283,7 +283,7 @@ def test_distinct(): ("rank", f.rank().order_by(column("c").sort()).build(), [2, 1, 2]), ( "dense_rank", - f.window("dense_rank", [], order_by=[f.order_by(column("c"))]), + f.dense_rank().order_by((column("c").sort())).build(), [2, 1, 2], ), ( diff --git a/src/functions.rs b/src/functions.rs index 3a06a33fc..2fecef4f2 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -887,6 +887,16 @@ pub fn rank() -> PyExpr { window_function::rank().into() } +#[pyfunction] +pub fn dense_rank() -> PyExpr { + window_function::dense_rank().into() +} + +#[pyfunction] +pub fn percent_rank() -> PyExpr { + window_function::percent_rank().into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1076,6 +1086,8 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(lag))?; m.add_wrapped(wrap_pyfunction!(row_number))?; m.add_wrapped(wrap_pyfunction!(rank))?; + m.add_wrapped(wrap_pyfunction!(dense_rank))?; + m.add_wrapped(wrap_pyfunction!(percent_rank))?; Ok(()) } From eb5598d849c65eb960917eadb9918d1d3ab5dac1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 22 Aug 2024 19:37:33 -0400 Subject: [PATCH 10/26] Add cume_dist --- python/datafusion/functions.py | 51 +++++++++++++++++------ python/datafusion/tests/test_dataframe.py | 4 +- src/functions.rs | 6 +++ 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 87b6f3f06..1813c542d 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -259,6 +259,7 @@ "rank", "dense_rank", "percent_rank", + "cume_dist", ] @@ -1865,28 +1866,54 @@ def dense_rank() -> Expr: To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. """ - return Expr(f.rank()) + return Expr(f.dense_rank()) def percent_rank() -> Expr: """Create a percent_rank window function. This window function is similar to :py:func:`rank` except that the returned values - will be consecutive. Here is an example of a dataframe with a window ordered by - descending ``points`` and the associated dense rank. + are the percentage from 0.0 to 1.0 from first to last. Here is an example of a + dataframe with a window ordered by descending ``points`` and the associated percent + rank. ``` - +--------+------+ - | points | rank | - +--------+------+ - | 100 | 1 | - | 100 | 1 | - | 50 | 2 | - | 25 | 3 | - +--------+------+ + +--------+--------------+ + | points | percent_rank | + +--------+--------------+ + | 100 | 0.0 | + | 100 | 0.0 | + | 50 | 0.666667 | + | 25 | 1.0 | + +--------+--------------+ ``` To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. """ - return Expr(f.rank()) + return Expr(f.percent_rank()) + + +def cume_dist() -> Expr: + """Create a cumulative distribution window function. + + This window function is similar to :py:func:`rank` except that the returned values + are the ratio of the row number to the total numebr of rows. Here is an example of a + dataframe with a window ordered by descending ``points`` and the associated + cumulative distribution. + + ``` + +--------+-----------+ + | points | cume_dist | + +--------+-----------+ + | 100 | 0.5 | + | 100 | 0.5 | + | 50 | 0.75 | + | 25 | 1.0 | + +--------+-----------+ + ``` + + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + """ + return Expr(f.cume_dist()) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 238f1df3e..253ed6df6 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -288,12 +288,12 @@ def test_distinct(): ), ( "percent_rank", - f.window("percent_rank", [], order_by=[f.order_by(column("c"))]), + f.percent_rank().order_by(column("c").sort()).build(), [0.5, 0, 0.5], ), ( "cume_dist", - f.window("cume_dist", [], order_by=[f.order_by(column("b"))]), + f.cume_dist().order_by(column("b").sort()).build(), [0.3333333333333333, 0.6666666666666666, 1.0], ), ( diff --git a/src/functions.rs b/src/functions.rs index 2fecef4f2..7c6186790 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -897,6 +897,11 @@ pub fn percent_rank() -> PyExpr { window_function::percent_rank().into() } +#[pyfunction] +pub fn cume_dist() -> PyExpr { + window_function::cume_dist().into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1088,6 +1093,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(rank))?; m.add_wrapped(wrap_pyfunction!(dense_rank))?; m.add_wrapped(wrap_pyfunction!(percent_rank))?; + m.add_wrapped(wrap_pyfunction!(cume_dist))?; Ok(()) } From f92d064303e7a5225a7e81ba4e239c44b8a62b98 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 22 Aug 2024 19:51:17 -0400 Subject: [PATCH 11/26] Add ntile window function --- python/datafusion/functions.py | 28 +++++++++++++++++++++++ python/datafusion/tests/test_dataframe.py | 2 +- src/functions.rs | 7 ++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 1813c542d..63ec6f515 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1917,3 +1917,31 @@ def cume_dist() -> Expr: ref:`_window_functions` online documentation. """ return Expr(f.cume_dist()) + + +def ntile(groups: int) -> Expr: + """Create a n-tile window function. + + This window function orders the window frame into a give number of groups based on + the ordering criteria. It then returns which group the current row is assigned to. + Here is an example of a dataframe with a window ordered by descending ``points`` + and the associated n-tile function. + + ``` + +--------+-------+ + | points | ntile | + +--------+-------+ + | 120 | 1 | + | 100 | 1 | + | 80 | 2 | + | 60 | 2 | + | 40 | 3 | + | 20 | 3 | + +--------+-------+ + ``` + + To set window function parameters use the window builder approach described in the + ref:`_window_functions` online documentation. + """ + # Developer note: ntile only accepts literal values. + return Expr(f.ntile(Expr.literal(groups).expr)) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 253ed6df6..8cbd16787 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -298,7 +298,7 @@ def test_distinct(): ), ( "ntile", - f.window("ntile", [literal(2)], order_by=[f.order_by(column("c"))]), + f.ntile(2).order_by(column("c").sort()).build(), [1, 1, 2], ), ("lead", f.lead(column("b")).order_by(column("b").sort()).build(), [5, 6, None]), diff --git a/src/functions.rs b/src/functions.rs index 7c6186790..3b981b052 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -902,6 +902,11 @@ pub fn cume_dist() -> PyExpr { window_function::cume_dist().into() } +#[pyfunction] +pub fn ntile(arg: PyExpr) -> PyExpr { + window_function::ntile(arg.into()).into() +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(abs))?; m.add_wrapped(wrap_pyfunction!(acos))?; @@ -1087,6 +1092,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(array_slice))?; m.add_wrapped(wrap_pyfunction!(flatten))?; + // Window Functions m.add_wrapped(wrap_pyfunction!(lead))?; m.add_wrapped(wrap_pyfunction!(lag))?; m.add_wrapped(wrap_pyfunction!(row_number))?; @@ -1094,6 +1100,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(dense_rank))?; m.add_wrapped(wrap_pyfunction!(percent_rank))?; m.add_wrapped(wrap_pyfunction!(cume_dist))?; + m.add_wrapped(wrap_pyfunction!(ntile))?; Ok(()) } From 8ab55e597213ba6e649d3a3d583ecf80317e50e4 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 22 Aug 2024 20:05:00 -0400 Subject: [PATCH 12/26] Add comment to update when upstream merges --- python/datafusion/tests/test_dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 8cbd16787..da56571ae 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -317,6 +317,7 @@ def test_distinct(): .build(), [-1, -1, 4], ), + # TODO update once upstream merges https://github.com/apache/datafusion-python/issues/833 pytest.param( "first_value", f.window("first_value", [column("a")], order_by=[f.order_by(column("b"))]), From ca397ba3e1f634aadf2d35645ba39502deaed4d1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 23 Aug 2024 08:46:03 -0400 Subject: [PATCH 13/26] Window frame required calling inner value --- python/datafusion/expr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 57ad80dab..098cc0cea 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -463,7 +463,7 @@ def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder: set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when ``build()`` is called. """ - return ExprFuncBuilder(self.expr.window_frame(window_frame)) + return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame)) class ExprFuncBuilder: @@ -498,7 +498,7 @@ def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder: """Set window frame for window functions.""" - return ExprFuncBuilder(self.builder.window_frame(window_frame)) + return ExprFuncBuilder(self.builder.window_frame(window_frame.window_frame)) def build(self) -> Expr: """Create an expression from a Function Builder.""" From 0c867546bff360ed819e64175e0b1ea7363e8c22 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 23 Aug 2024 09:00:23 -0400 Subject: [PATCH 14/26] Add unit test for avg as window function --- python/datafusion/tests/test_dataframe.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index da56571ae..668c36def 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -317,7 +317,7 @@ def test_distinct(): .build(), [-1, -1, 4], ), - # TODO update once upstream merges https://github.com/apache/datafusion-python/issues/833 + # TODO update all aggregate functions as windows once upstream merges https://github.com/apache/datafusion-python/issues/833 pytest.param( "first_value", f.window("first_value", [column("a")], order_by=[f.order_by(column("b"))]), @@ -337,6 +337,11 @@ def test_distinct(): ), [None, 5, 5], ), + pytest.param( + "avg", + f.window("avg", [column("b")]), + [4.0, 4.5, 5.0], + ), ] From 1ee2691978868faddb9350ccf8eff0f21a2805a0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 23 Aug 2024 10:19:52 -0400 Subject: [PATCH 15/26] Working on documentation for window functions --- .../common-operations/aggregations.rst | 2 + .../user-guide/common-operations/windows.rst | 89 ++++++++++----- python/datafusion/functions.py | 108 ++++++++---------- 3 files changed, 111 insertions(+), 88 deletions(-) diff --git a/docs/source/user-guide/common-operations/aggregations.rst b/docs/source/user-guide/common-operations/aggregations.rst index b9202129e..7ad402210 100644 --- a/docs/source/user-guide/common-operations/aggregations.rst +++ b/docs/source/user-guide/common-operations/aggregations.rst @@ -15,6 +15,8 @@ .. specific language governing permissions and limitations .. under the License. +.. _aggregation: + Aggregation ============ diff --git a/docs/source/user-guide/common-operations/windows.rst b/docs/source/user-guide/common-operations/windows.rst index 263a310ad..cf4722fa6 100644 --- a/docs/source/user-guide/common-operations/windows.rst +++ b/docs/source/user-guide/common-operations/windows.rst @@ -43,21 +43,21 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples. ctx = SessionContext() df = ctx.read_csv("pokemon.csv") -Here is an example that shows how to compare each pokemons’s attack power with the average attack -power in its ``"Type 1"`` +Here is an example that shows how you can compare each pokemon's speed to the speed of the +previous row in the DataFrame. .. ipython:: python df.select( col('"Name"'), - col('"Attack"'), - #f.alias( - # f.window("avg", [col('"Attack"')], partition_by=[col('"Type 1"')]), - # "Average Attack", - #) + col('"Speed"'), + f.lag(col('"Speed"')).alias("Previous Speed") ) -You can also control the order in which rows are processed by window functions by providing +Setting Parameters +------------------ + +You can control the order in which rows are processed by window functions by providing a list of ``order_by`` functions for the ``order_by`` parameter. .. ipython:: python @@ -65,33 +65,64 @@ a list of ``order_by`` functions for the ``order_by`` parameter. df.select( col('"Name"'), col('"Attack"'), - #f.alias( - # f.window( - # "rank", - # [], - # partition_by=[col('"Type 1"')], - # order_by=[f.order_by(col('"Attack"'))], - # ), - # "rank", - #), + col('"Type 1"'), + f.rank() + .partition_by(col('"Type 1"')) + .order_by(col('"Attack"').sort(ascending=True)) + .build() + .alias("rank"), + ).sort(col('"Type 1"').sort(), col('"Attack"').sort()) + +Window Functions can be configured using a builder approach to set a few parameters. +To create a builder you simply need to call any one of these functions + +- :py:func:`datafusion.expr.Expr.order_by` to set the window ordering. +- :py:func:`datafusion.expr.Expr.null_treatment` to set how ``null`` values should be handled. +- :py:func:`datafusion.expr.Expr.partition_by` to set the partitions for processing. +- :py:func:`datafusion.expr.Expr.window_frame` to set boundary of operation. + +After these parameters are set, you must call ``build()`` on the resultant object to get an +expression as shown in the example above. + +Aggregate Functions +------------------- + +You can use any :ref:`Aggregation Function` as a window function. Currently +aggregate functions must use the deprecated +:py:func:`datafusion.functions.window` API but this should be resolved in +DataFusion 42.0 (`Issue Link `_). Here +is an example that shows how to compare each pokemons’s attack power with the average attack +power in its ``"Type 1"`` using the :py:func:`datafusion.functions.avg` function. + +.. ipython:: python + :okwarning: + + df.select( + col('"Name"'), + col('"Attack"'), + col('"Type 1"'), + f.window("avg", [col('"Attack"')]) + .partition_by(col('"Type 1"')) + .build() + .alias("Average Attack"), ) +Available Functions +------------------- + The possible window functions are: 1. Rank Functions - - rank - - dense_rank - - row_number - - ntile + - :py:func:`datafusion.functions.rank` + - :py:func:`datafusion.functions.dense_rank` + - :py:func:`datafusion.functions.ntile` + - :py:func:`datafusion.functions.row_number` 2. Analytical Functions - - cume_dist - - percent_rank - - lag - - lead - - first_value - - last_value - - nth_value + - :py:func:`datafusion.functions.cume_dist` + - :py:func:`datafusion.functions.percent_rank` + - :py:func:`datafusion.functions.lag` + - :py:func:`datafusion.functions.lead` 3. Aggregate Functions - - All aggregate functions can be used as window functions. + - All :ref:`Aggregation Functions` can be used as window functions. diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 63ec6f515..bd7c1cca3 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -260,6 +260,7 @@ "dense_rank", "percent_rank", "cume_dist", + "ntile", ] @@ -1826,18 +1827,16 @@ def rank() -> Expr: is an example of a dataframe with a window ordered by descending ``points`` and the associated rank. - You should set ``order_by`` to produce meaningful results. + You should set ``order_by`` to produce meaningful results:: - ``` - +--------+------+ - | points | rank | - +--------+------+ - | 100 | 1 | - | 100 | 1 | - | 50 | 3 | - | 25 | 4 | - +--------+------+ - ``` + +--------+------+ + | points | rank | + +--------+------+ + | 100 | 1 | + | 100 | 1 | + | 50 | 3 | + | 25 | 4 | + +--------+------+ To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. @@ -1850,18 +1849,16 @@ def dense_rank() -> Expr: This window function is similar to :py:func:`rank` except that the returned values will be consecutive. Here is an example of a dataframe with a window ordered by - descending ``points`` and the associated dense rank. + descending ``points`` and the associated dense rank:: - ``` - +--------+------------+ - | points | dense_rank | - +--------+------------+ - | 100 | 1 | - | 100 | 1 | - | 50 | 2 | - | 25 | 3 | - +--------+------------+ - ``` + +--------+------------+ + | points | dense_rank | + +--------+------------+ + | 100 | 1 | + | 100 | 1 | + | 50 | 2 | + | 25 | 3 | + +--------+------------+ To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. @@ -1875,18 +1872,16 @@ def percent_rank() -> Expr: This window function is similar to :py:func:`rank` except that the returned values are the percentage from 0.0 to 1.0 from first to last. Here is an example of a dataframe with a window ordered by descending ``points`` and the associated percent - rank. + rank:: - ``` - +--------+--------------+ - | points | percent_rank | - +--------+--------------+ - | 100 | 0.0 | - | 100 | 0.0 | - | 50 | 0.666667 | - | 25 | 1.0 | - +--------+--------------+ - ``` + +--------+--------------+ + | points | percent_rank | + +--------+--------------+ + | 100 | 0.0 | + | 100 | 0.0 | + | 50 | 0.666667 | + | 25 | 1.0 | + +--------+--------------+ To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. @@ -1900,18 +1895,16 @@ def cume_dist() -> Expr: This window function is similar to :py:func:`rank` except that the returned values are the ratio of the row number to the total numebr of rows. Here is an example of a dataframe with a window ordered by descending ``points`` and the associated - cumulative distribution. + cumulative distribution:: - ``` - +--------+-----------+ - | points | cume_dist | - +--------+-----------+ - | 100 | 0.5 | - | 100 | 0.5 | - | 50 | 0.75 | - | 25 | 1.0 | - +--------+-----------+ - ``` + +--------+-----------+ + | points | cume_dist | + +--------+-----------+ + | 100 | 0.5 | + | 100 | 0.5 | + | 50 | 0.75 | + | 25 | 1.0 | + +--------+-----------+ To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. @@ -1925,23 +1918,20 @@ def ntile(groups: int) -> Expr: This window function orders the window frame into a give number of groups based on the ordering criteria. It then returns which group the current row is assigned to. Here is an example of a dataframe with a window ordered by descending ``points`` - and the associated n-tile function. - - ``` - +--------+-------+ - | points | ntile | - +--------+-------+ - | 120 | 1 | - | 100 | 1 | - | 80 | 2 | - | 60 | 2 | - | 40 | 3 | - | 20 | 3 | - +--------+-------+ - ``` + and the associated n-tile function:: + + +--------+-------+ + | points | ntile | + +--------+-------+ + | 120 | 1 | + | 100 | 1 | + | 80 | 2 | + | 60 | 2 | + | 40 | 3 | + | 20 | 3 | + +--------+-------+ To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. """ - # Developer note: ntile only accepts literal values. return Expr(f.ntile(Expr.literal(groups).expr)) From 6d5497398473eb9557214dba94471860ae1acb78 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 24 Aug 2024 06:20:24 -0400 Subject: [PATCH 16/26] Add pyo build config file to git ignore since this is user specific --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 0030b907b..84dd566ee 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,6 @@ apache-rat-*.jar CHANGELOG.md.bak docs/mdbook/book + +.pyo3_build_config + From 37022e32436ae3a8d07b265ac5fe3103c3926d82 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 24 Aug 2024 07:15:34 -0400 Subject: [PATCH 17/26] Add examples to docstring --- python/datafusion/functions.py | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index bd7c1cca3..9ebd8f0fd 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1770,6 +1770,18 @@ def lead(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) return the 3rd following value in column ``b``. At the end of the partition, where no futher values can be returned it will return the default value of 5. + Here is an example of both the ``lead`` and :py:func:`datafusion.functions.lag` + functions on a simple DataFrame:: + + +--------+------+-----+ + | points | lead | lag | + +--------+------+-----+ + | 100 | 100 | | + | 100 | 50 | 100 | + | 50 | 25 | 100 | + | 25 | | 50 | + +--------+------+-----+ + To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. @@ -1792,6 +1804,18 @@ def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) - will return the 3rd previous value in column ``b``. At the beginnig of the partition, where no values can be returned it will return the default value of 5. + Here is an example of both the ``lag`` and :py:func:`datafusion.functions.lead` + functions on a simple DataFrame:: + + +--------+------+-----+ + | points | lead | lag | + +--------+------+-----+ + | 100 | 100 | | + | 100 | 50 | 100 | + | 50 | 25 | 100 | + | 25 | | 50 | + +--------+------+-----+ + To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. @@ -1811,6 +1835,17 @@ def row_number() -> Expr: Returns the row number of the window function. + Here is an example of the ``row_number`` on a simple DataFrame:: + + +--------+------------+ + | points | row number | + +--------+------------+ + | 100 | 1 | + | 100 | 2 | + | 50 | 3 | + | 25 | 4 | + +--------+------------+ + To set window function parameters use the window builder approach described in the ref:`_window_functions` online documentation. """ From ebf7c964f21c00407ae311c3a4ec09f41c3f95ec Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 29 Aug 2024 20:49:28 -0400 Subject: [PATCH 18/26] Optionally add window function parameters during function call --- python/datafusion/functions.py | 248 +++++++++++++++++++--- python/datafusion/tests/test_dataframe.py | 165 ++++++++++---- src/functions.rs | 189 +++++++++++++++-- 3 files changed, 517 insertions(+), 85 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 9ebd8f0fd..037fe94e4 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1762,7 +1762,15 @@ def bool_or(arg: Expr, distinct: bool = False) -> Expr: return Expr(f.bool_or(arg.expr, distinct=distinct)) -def lead(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) -> Expr: +def lead( + arg: Expr, + shift_offset: int = 1, + default_value: Optional[Any] = None, + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a lead window function. Lead operation will return the argument that is in the next shift_offset-th row in @@ -1789,14 +1797,42 @@ def lead(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) arg: Value to return shift_offset: Number of rows following the current row. default_value: Value to return if shift_offet row does not exist. + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ - if not isinstance(default_value, pa.Scalar): + if not isinstance(default_value, pa.Scalar) and default_value is not None: default_value = pa.scalar(default_value) - return Expr(f.lead(arg.expr, shift_offset, default_value)) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + return Expr( + f.lead( + arg.expr, + shift_offset, + default_value, + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) -> Expr: + +def lag( + arg: Expr, + shift_offset: int = 1, + default_value: Optional[Any] = None, + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a lag window function. Lag operation will return the argument that is in the previous shift_offset-th row @@ -1816,21 +1852,43 @@ def lag(arg: Expr, shift_offset: int = 1, default_value: Optional[Any] = None) - | 25 | | 50 | +--------+------+-----+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. - Args: arg: Value to return shift_offset: Number of rows before the current row. default_value: Value to return if shift_offet row does not exist. + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ if not isinstance(default_value, pa.Scalar): default_value = pa.scalar(default_value) - return Expr(f.lag(arg.expr, shift_offset, default_value)) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + + return Expr( + f.lag( + arg.expr, + shift_offset, + default_value, + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def row_number() -> Expr: +def row_number( + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a row number window function. Returns the row number of the window function. @@ -1846,13 +1904,34 @@ def row_number() -> Expr: | 25 | 4 | +--------+------------+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. + Args: + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ - return Expr(f.row_number()) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + + return Expr( + f.row_number( + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def rank() -> Expr: +def rank( + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a rank window function. Returns the rank based upon the window order. Consecutive equal values will receive @@ -1873,13 +1952,34 @@ def rank() -> Expr: | 25 | 4 | +--------+------+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. + Args: + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ - return Expr(f.rank()) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + + return Expr( + f.rank( + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def dense_rank() -> Expr: +def dense_rank( + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a dense_rank window function. This window function is similar to :py:func:`rank` except that the returned values @@ -1895,13 +1995,34 @@ def dense_rank() -> Expr: | 25 | 3 | +--------+------------+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. + Args: + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ - return Expr(f.dense_rank()) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + return Expr( + f.dense_rank( + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def percent_rank() -> Expr: + +def percent_rank( + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a percent_rank window function. This window function is similar to :py:func:`rank` except that the returned values @@ -1918,13 +2039,34 @@ def percent_rank() -> Expr: | 25 | 1.0 | +--------+--------------+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. + Args: + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ - return Expr(f.percent_rank()) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + + return Expr( + f.percent_rank( + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def cume_dist() -> Expr: +def cume_dist( + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a cumulative distribution window function. This window function is similar to :py:func:`rank` except that the returned values @@ -1941,13 +2083,35 @@ def cume_dist() -> Expr: | 25 | 1.0 | +--------+-----------+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. + Args: + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. """ - return Expr(f.cume_dist()) + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + + return Expr( + f.cume_dist( + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) -def ntile(groups: int) -> Expr: +def ntile( + groups: int, + partition_by: Optional[list[Expr]] = None, + order_by: Optional[list[Expr]] = None, + window_frame: Optional[WindowFrame] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Create a n-tile window function. This window function orders the window frame into a give number of groups based on @@ -1966,7 +2130,25 @@ def ntile(groups: int) -> Expr: | 20 | 3 | +--------+-------+ - To set window function parameters use the window builder approach described in the - ref:`_window_functions` online documentation. - """ - return Expr(f.ntile(Expr.literal(groups).expr)) + Args: + groups: Number of groups for the n-tile to be divided into. + partition_by: Expressions to partition the window frame on. + order_by: Set ordering within the window frame. + window_frame: Override default window frame. + null_treatment: Specify how nulls are to be treated. + """ + partition_cols = ( + [col.expr for col in partition_by] if partition_by is not None else None + ) + order_cols = [col.expr for col in order_by] if order_by is not None else None + window_val = window_frame.window_frame if window_frame is not None else None + + return Expr( + f.ntile( + Expr.literal(groups).expr, + partition_by=partition_cols, + order_by=order_cols, + window_frame=window_val, + null_treatment=null_treatment, + ) + ) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 668c36def..78b65cf26 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -30,6 +30,7 @@ literal, udf, ) +from datafusion.common import NullTreatment @pytest.fixture @@ -84,6 +85,23 @@ def aggregate_df(): return ctx.sql("select c1, sum(c2) from test group by c1") +@pytest.fixture +def partitioned_df(): + ctx = SessionContext() + + # create a RecordBatch and a new DataFrame from it + batch = pa.RecordBatch.from_arrays( + [ + pa.array([0, 1, 2, 3, 4, 5, 6]), + pa.array([7, None, 7, 8, 9, None, 9]), + pa.array(["A", "A", "A", "A", "B", "B", "B"]), + ], + names=["a", "b", "c"], + ) + + return ctx.create_dataframe([[batch]]) + + def test_select(df): df = df.select( column("a") + column("b"), @@ -279,79 +297,154 @@ def test_distinct(): data_test_window_functions = [ - ("row", f.row_number().order_by(column("c").sort()).build(), [2, 1, 3]), - ("rank", f.rank().order_by(column("c").sort()).build(), [2, 1, 2]), + ( + "row", + f.row_number(order_by=[column("b"), column("a").sort(ascending=False)]), + [4, 2, 3, 5, 7, 1, 6], + ), + ( + "row_w_params", + f.row_number( + order_by=[column("b"), column("a")], + partition_by=[column("c")], + window_frame=WindowFrame("rows", 1, 0), + null_treatment=NullTreatment.RESPECT_NULLS, + ), + [2, 1, 3, 4, 2, 1, 3], + ), + ("rank", f.rank(order_by=[column("b")]), [3, 1, 3, 5, 6, 1, 6]), + ( + "rank_w_params", + f.rank(order_by=[column("b"), column("a")], partition_by=[column("c")]), + [2, 1, 3, 4, 2, 1, 3], + ), ( "dense_rank", - f.dense_rank().order_by((column("c").sort())).build(), - [2, 1, 2], + f.dense_rank(order_by=[column("b")]), + [2, 1, 2, 3, 4, 1, 4], + ), + ( + "dense_rank_w_params", + f.dense_rank(order_by=[column("b"), column("a")], partition_by=[column("c")]), + [2, 1, 3, 4, 2, 1, 3], ), ( "percent_rank", - f.percent_rank().order_by(column("c").sort()).build(), - [0.5, 0, 0.5], + f.round(f.percent_rank(order_by=[column("b")]), literal(3)), + [0.333, 0.0, 0.333, 0.667, 0.833, 0.0, 0.833], + ), + ( + "percent_rank_w_params", + f.round( + f.percent_rank( + order_by=[column("b"), column("a")], partition_by=[column("c")] + ), + literal(3), + ), + [0.333, 0.0, 0.667, 1.0, 0.5, 0.0, 1.0], ), ( "cume_dist", - f.cume_dist().order_by(column("b").sort()).build(), - [0.3333333333333333, 0.6666666666666666, 1.0], + f.round(f.cume_dist(order_by=[column("b")]), literal(3)), + [0.571, 0.286, 0.571, 0.714, 1.0, 0.286, 1.0], + ), + ( + "cume_dist_w_params", + f.round( + f.cume_dist( + order_by=[column("b"), column("a")], partition_by=[column("c")] + ), + literal(3), + ), + [0.5, 0.25, 0.75, 1.0, 0.667, 0.333, 1.0], ), ( "ntile", - f.ntile(2).order_by(column("c").sort()).build(), - [1, 1, 2], + f.ntile(2, order_by=[column("b")]), + [1, 1, 1, 2, 2, 1, 2], ), - ("lead", f.lead(column("b")).order_by(column("b").sort()).build(), [5, 6, None]), ( - "lead_by_2", - f.lead(column("b"), shift_offset=2, default_value=-1) - .order_by(column("b").sort()) - .build(), - [6, -1, -1], + "ntile_w_params", + f.ntile(2, order_by=[column("b"), column("a")], partition_by=[column("c")]), + [1, 1, 2, 2, 1, 1, 2], ), - ("lag", f.lag(column("b")).order_by(column("b").sort()).build(), [None, 4, 5]), + ("lead", f.lead(column("b"), order_by=[column("b")]), [7, None, 8, 9, 9, 7, None]), ( - "lag_by_2", - f.lag(column("b"), shift_offset=2, default_value=-1) - .order_by(column("b").sort()) - .build(), - [-1, -1, 4], + "lead_w_params", + f.lead( + column("b"), + shift_offset=2, + default_value=-1, + order_by=[column("b"), column("a")], + partition_by=[column("c")], + window_frame=WindowFrame("rows", 3, 0), + ), + [8, 7, -1, -1, -1, 9, -1], + ), + ("lag", f.lag(column("b"), order_by=[column("b")]), [None, None, 7, 7, 8, None, 9]), + ( + "lag_w_params", + f.lag( + column("b"), + shift_offset=2, + default_value=-1, + order_by=[column("b"), column("a")], + partition_by=[column("c")], + window_frame=WindowFrame("rows", 3, 0), + ), + [-1, -1, None, 7, -1, -1, None], ), # TODO update all aggregate functions as windows once upstream merges https://github.com/apache/datafusion-python/issues/833 pytest.param( "first_value", - f.window("first_value", [column("a")], order_by=[f.order_by(column("b"))]), - [1, 1, 1], + f.window( + "first_value", + [column("a")], + order_by=[f.order_by(column("b"))], + partition_by=[column("c")], + ), + [1, 1, 1, 1, 5, 5, 5], ), pytest.param( "last_value", - f.window("last_value", [column("b")], order_by=[f.order_by(column("b"))]), - [4, 5, 6], + f.window("last_value", [column("a")]) + .window_frame(WindowFrame("rows", 0, None)) + .order_by(column("b").sort()) + .partition_by(column("c")) + .build(), + [3, 3, 3, 3, 6, 6, 6], ), pytest.param( - "2nd_value", + "3rd_value", f.window( "nth_value", - [column("b"), literal(2)], - order_by=[f.order_by(column("b"))], + [column("b"), literal(3)], + order_by=[f.order_by(column("a"))], ), - [None, 5, 5], + [None, None, 7, 7, 7, 7, 7], ), pytest.param( "avg", - f.window("avg", [column("b")]), - [4.0, 4.5, 5.0], + f.round(f.window("avg", [column("b")]), literal(3)), + [7.0, 7.0, 7.0, 7.333, 7.75, 7.75, 8.0], ), ] @pytest.mark.parametrize("name,expr,result", data_test_window_functions) -def test_window_functions(df, name, expr, result): - df = df.select(column("a"), column("b"), column("c"), f.alias(expr, name)) - +def test_window_functions(partitioned_df, name, expr, result): + df = partitioned_df.select( + column("a"), column("b"), column("c"), f.alias(expr, name) + ) + df.sort(column("a").sort()).show() table = pa.Table.from_batches(df.collect()) - expected = {"a": [1, 2, 3], "b": [4, 5, 6], "c": [8, 5, 8], name: result} + expected = { + "a": [0, 1, 2, 3, 4, 5, 6], + "b": [7, None, 7, 8, 9, None, 9], + "c": ["A", "A", "A", "A", "B", "B", "B"], + name: result, + } assert table.sort_by("a").to_pydict() == expected diff --git a/src/functions.rs b/src/functions.rs index 3b981b052..b9b11d1b7 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -867,44 +867,201 @@ aggregate_function!(array_agg, functions_aggregate::array_agg::array_agg_udaf); aggregate_function!(max, functions_aggregate::min_max::max_udaf); aggregate_function!(min, functions_aggregate::min_max::min_udaf); +fn add_builder_fns_to_window( + window_fn: Expr, + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + // Since ExprFuncBuilder::new() is private, set an empty partition and then + // override later if appropriate. + let mut builder = window_fn.partition_by(vec![]); + + if let Some(partition_cols) = partition_by { + builder = builder.partition_by( + partition_cols + .into_iter() + .map(|col| col.clone().into()) + .collect(), + ); + } + + if let Some(order_by_cols) = order_by { + let order_by_cols = order_by_cols + .into_iter() + .map(|col| { + let order_by_expr: Expr = col.into(); + if let Expr::Sort(_) = order_by_expr { + order_by_expr + } else { + order_by_expr.sort(true, true) + } + }) + .collect(); + builder = builder.order_by(order_by_cols); + } + + if let Some(window_frame_vals) = window_frame { + builder = builder.window_frame(window_frame_vals.into()); + } + + if let Some(null_treatment_val) = null_treatment { + builder = builder.null_treatment(Some(null_treatment_val.into())); + } + + builder.build().map(|e| e.into()).map_err(|err| err.into()) +} + #[pyfunction] -pub fn lead(arg: PyExpr, shift_offset: i64, default_value: Option) -> PyExpr { - window_function::lead(arg.expr, Some(shift_offset), default_value).into() +pub fn lead( + arg: PyExpr, + shift_offset: i64, + default_value: Option, + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::lead(arg.expr, Some(shift_offset), default_value); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn lag(arg: PyExpr, shift_offset: i64, default_value: Option) -> PyExpr { - window_function::lag(arg.expr, Some(shift_offset), default_value).into() +pub fn lag( + arg: PyExpr, + shift_offset: i64, + default_value: Option, + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::lag(arg.expr, Some(shift_offset), default_value); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn row_number() -> PyExpr { - window_function::row_number().into() +pub fn row_number( + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::row_number(); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn rank() -> PyExpr { - window_function::rank().into() +pub fn rank( + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::rank(); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn dense_rank() -> PyExpr { - window_function::dense_rank().into() +pub fn dense_rank( + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::dense_rank(); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn percent_rank() -> PyExpr { - window_function::percent_rank().into() +pub fn percent_rank( + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::percent_rank(); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn cume_dist() -> PyExpr { - window_function::cume_dist().into() +pub fn cume_dist( + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::cume_dist(); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } #[pyfunction] -pub fn ntile(arg: PyExpr) -> PyExpr { - window_function::ntile(arg.into()).into() +pub fn ntile( + arg: PyExpr, + partition_by: Option>, + order_by: Option>, + window_frame: Option, + null_treatment: Option, +) -> PyResult { + let window_fn = window_function::ntile(arg.into()); + + add_builder_fns_to_window( + window_fn, + partition_by, + order_by, + window_frame, + null_treatment, + ) } pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { From 14c71695678058ba126bf85a461e7fa2dc242fee Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 30 Aug 2024 08:30:31 -0400 Subject: [PATCH 19/26] Update sort and order_by to apply automatic ordering if any other expression is given --- src/dataframe.rs | 3 ++- src/expr.rs | 23 +++++++++++++++++++---- src/functions.rs | 13 ++----------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 22b05226c..d7abab400 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -39,6 +39,7 @@ use pyo3::types::{PyCapsule, PyTuple}; use tokio::task::JoinHandle; use crate::errors::py_datafusion_err; +use crate::expr::to_sort_expressions; use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; @@ -150,7 +151,7 @@ impl PyDataFrame { #[pyo3(signature = (*exprs))] fn sort(&self, exprs: Vec) -> PyResult { - let exprs = exprs.into_iter().map(|e| e.into()).collect(); + let exprs = to_sort_expressions(exprs); let df = self.df.as_ref().clone().sort(exprs)?; Ok(Self::new(df)) } diff --git a/src/expr.rs b/src/expr.rs index a40679e03..697682d4c 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -519,8 +519,10 @@ impl PyExpr { // Expression Function Builder functions pub fn order_by(&self, order_by: Vec) -> PyExprFuncBuilder { - let order_by = order_by.iter().map(|e| e.expr.clone()).collect(); - self.expr.clone().order_by(order_by).into() + self.expr + .clone() + .order_by(to_sort_expressions(order_by)) + .into() } pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder { @@ -560,11 +562,24 @@ impl From for PyExprFuncBuilder { } } +pub fn to_sort_expressions(order_by: Vec) -> Vec { + order_by + .iter() + .map(|e| e.expr.clone()) + .map(|e| match e { + Expr::Sort(_) => e, + _ => e.sort(true, true), + }) + .collect() +} + #[pymethods] impl PyExprFuncBuilder { pub fn order_by(&self, order_by: Vec) -> PyExprFuncBuilder { - let order_by = order_by.iter().map(|e| e.expr.clone()).collect(); - self.builder.clone().order_by(order_by).into() + self.builder + .clone() + .order_by(to_sort_expressions(order_by)) + .into() } pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder { diff --git a/src/functions.rs b/src/functions.rs index b9b11d1b7..2062bf72b 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -24,6 +24,7 @@ use crate::common::data_type::NullTreatment; use crate::context::PySessionContext; use crate::errors::DataFusionError; use crate::expr::conditional_expr::PyCaseBuilder; +use crate::expr::to_sort_expressions; use crate::expr::window::PyWindowFrame; use crate::expr::PyExpr; use datafusion::execution::FunctionRegistry; @@ -888,17 +889,7 @@ fn add_builder_fns_to_window( } if let Some(order_by_cols) = order_by { - let order_by_cols = order_by_cols - .into_iter() - .map(|col| { - let order_by_expr: Expr = col.into(); - if let Expr::Sort(_) = order_by_expr { - order_by_expr - } else { - order_by_expr.sort(true, true) - } - }) - .collect(); + let order_by_cols = to_sort_expressions(order_by_cols); builder = builder.order_by(order_by_cols); } From b5f33e85e7d0539ec9ffab7a915a9cfd794d6858 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 30 Aug 2024 08:31:19 -0400 Subject: [PATCH 20/26] Update unit tests to be cleaner and use default sort on expressions --- python/datafusion/tests/test_dataframe.py | 30 ++++++++++------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 78b65cf26..850caaca2 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -267,7 +267,7 @@ def test_join(): df = df.join(df1, join_keys=(["a"], ["a"]), how="inner") df.show() - df = df.sort(column("l.a").sort(ascending=True)) + df = df.sort(column("l.a")) table = pa.Table.from_batches(df.collect()) expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} @@ -281,17 +281,13 @@ def test_distinct(): [pa.array([1, 2, 3, 1, 2, 3]), pa.array([4, 5, 6, 4, 5, 6])], names=["a", "b"], ) - df_a = ( - ctx.create_dataframe([[batch]]) - .distinct() - .sort(column("a").sort(ascending=True)) - ) + df_a = ctx.create_dataframe([[batch]]).distinct().sort(column("a")) batch = pa.RecordBatch.from_arrays( [pa.array([1, 2, 3]), pa.array([4, 5, 6])], names=["a", "b"], ) - df_b = ctx.create_dataframe([[batch]]).sort(column("a").sort(ascending=True)) + df_b = ctx.create_dataframe([[batch]]).sort(column("a")) assert df_a.collect() == df_b.collect() @@ -409,7 +405,7 @@ def test_distinct(): "last_value", f.window("last_value", [column("a")]) .window_frame(WindowFrame("rows", 0, None)) - .order_by(column("b").sort()) + .order_by(column("b")) .partition_by(column("c")) .build(), [3, 3, 3, 3, 6, 6, 6], @@ -436,7 +432,7 @@ def test_window_functions(partitioned_df, name, expr, result): df = partitioned_df.select( column("a"), column("b"), column("c"), f.alias(expr, name) ) - df.sort(column("a").sort()).show() + df.sort(column("a")).show() table = pa.Table.from_batches(df.collect()) expected = { @@ -617,9 +613,9 @@ def test_intersect(): [pa.array([3]), pa.array([6])], names=["a", "b"], ) - df_c = ctx.create_dataframe([[batch]]).sort(column("a").sort(ascending=True)) + df_c = ctx.create_dataframe([[batch]]).sort(column("a")) - df_a_i_b = df_a.intersect(df_b).sort(column("a").sort(ascending=True)) + df_a_i_b = df_a.intersect(df_b).sort(column("a")) assert df_c.collect() == df_a_i_b.collect() @@ -643,9 +639,9 @@ def test_except_all(): [pa.array([1, 2]), pa.array([4, 5])], names=["a", "b"], ) - df_c = ctx.create_dataframe([[batch]]).sort(column("a").sort(ascending=True)) + df_c = ctx.create_dataframe([[batch]]).sort(column("a")) - df_a_e_b = df_a.except_all(df_b).sort(column("a").sort(ascending=True)) + df_a_e_b = df_a.except_all(df_b).sort(column("a")) assert df_c.collect() == df_a_e_b.collect() @@ -678,9 +674,9 @@ def test_union(ctx): [pa.array([1, 2, 3, 3, 4, 5]), pa.array([4, 5, 6, 6, 7, 8])], names=["a", "b"], ) - df_c = ctx.create_dataframe([[batch]]).sort(column("a").sort(ascending=True)) + df_c = ctx.create_dataframe([[batch]]).sort(column("a")) - df_a_u_b = df_a.union(df_b).sort(column("a").sort(ascending=True)) + df_a_u_b = df_a.union(df_b).sort(column("a")) assert df_c.collect() == df_a_u_b.collect() @@ -702,9 +698,9 @@ def test_union_distinct(ctx): [pa.array([1, 2, 3, 4, 5]), pa.array([4, 5, 6, 7, 8])], names=["a", "b"], ) - df_c = ctx.create_dataframe([[batch]]).sort(column("a").sort(ascending=True)) + df_c = ctx.create_dataframe([[batch]]).sort(column("a")) - df_a_u_b = df_a.union(df_b, True).sort(column("a").sort(ascending=True)) + df_a_u_b = df_a.union(df_b, True).sort(column("a")) assert df_c.collect() == df_a_u_b.collect() assert df_c.collect() == df_a_u_b.collect() From 5fd129cfd24671f0370aacea4c2a0cd42d9490c5 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 30 Aug 2024 15:52:27 -0400 Subject: [PATCH 21/26] Ignore vscode folder specific settings --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 84dd566ee..aaeaaa5b1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ target /docs/temp /docs/build .DS_Store +.vscode # Byte-compiled / optimized / DLL files __pycache__/ From 8863931451619910126874e0210f9ac0bd88f2c3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 30 Aug 2024 21:07:46 -0400 Subject: [PATCH 22/26] Window frames should only apply to aggregate functions used as window functions. Also pass in scalar pyarrow values so we can set a range other than a uint --- python/datafusion/expr.py | 12 +++- python/datafusion/functions.py | 41 ++--------- python/datafusion/tests/test_dataframe.py | 5 +- src/expr/window.rs | 12 ++-- src/functions.rs | 88 +++++------------------ 5 files changed, 39 insertions(+), 119 deletions(-) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 098cc0cea..c7272bb3b 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -24,7 +24,7 @@ from ._internal import expr as expr_internal, LogicalPlan from datafusion.common import NullTreatment, RexType, DataTypeMap -from typing import Any +from typing import Any, Optional import pyarrow as pa # The following are imported from the internal representation. We may choose to @@ -509,7 +509,7 @@ class WindowFrame: """Defines a window frame for performing window operations.""" def __init__( - self, units: str, start_bound: int | None, end_bound: int | None + self, units: str, start_bound: Optional[Any], end_bound: Optional[Any] ) -> None: """Construct a window frame using the given parameters. @@ -522,6 +522,14 @@ def __init__( will be set to unbounded. If unit type is ``groups``, this parameter must be set. """ + if not isinstance(start_bound, pa.Scalar) and start_bound is not None: + start_bound = pa.scalar(start_bound) + if units == "rows" or units == "groups": + start_bound = start_bound.cast(pa.uint64()) + if not isinstance(end_bound, pa.Scalar) and end_bound is not None: + end_bound = pa.scalar(end_bound) + if units == "rows" or units == "groups": + end_bound = end_bound.cast(pa.uint64()) self.window_frame = expr_internal.WindowFrame(units, start_bound, end_bound) def get_frame_units(self) -> str: diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 037fe94e4..8f20255bb 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -399,12 +399,11 @@ def window( ) -> Expr: """Creates a new Window function expression. - This interface is deprecateted. Instead of using this interface, users should call - the window functions directly. For example, to perform a lag use + This interface will soon be deprecated. Instead of using this interface, + users should call the window functions directly. For example, to perform a + lag use:: - ``` - df.select(functions.lag(col("a")).partition_by(col("b")).build()) - ``` + df.select(functions.lag(col("a")).partition_by(col("b")).build()) """ args = [a.expr for a in args] partition_by = [e.expr for e in partition_by] if partition_by is not None else None @@ -1768,7 +1767,6 @@ def lead( default_value: Optional[Any] = None, partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a lead window function. @@ -1799,7 +1797,6 @@ def lead( default_value: Value to return if shift_offet row does not exist. partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ if not isinstance(default_value, pa.Scalar) and default_value is not None: @@ -1809,7 +1806,6 @@ def lead( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.lead( @@ -1818,7 +1814,6 @@ def lead( default_value, partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -1830,7 +1825,6 @@ def lag( default_value: Optional[Any] = None, partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a lag window function. @@ -1858,7 +1852,6 @@ def lag( default_value: Value to return if shift_offet row does not exist. partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ if not isinstance(default_value, pa.Scalar): @@ -1868,7 +1861,6 @@ def lag( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.lag( @@ -1877,7 +1869,6 @@ def lag( default_value, partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -1886,7 +1877,6 @@ def lag( def row_number( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a row number window function. @@ -1907,20 +1897,17 @@ def row_number( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.row_number( partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -1929,7 +1916,6 @@ def row_number( def rank( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a rank window function. @@ -1955,20 +1941,17 @@ def rank( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.rank( partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -1977,7 +1960,6 @@ def rank( def dense_rank( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a dense_rank window function. @@ -1998,20 +1980,17 @@ def dense_rank( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.dense_rank( partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -2020,7 +1999,6 @@ def dense_rank( def percent_rank( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a percent_rank window function. @@ -2042,20 +2020,17 @@ def percent_rank( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.percent_rank( partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -2064,7 +2039,6 @@ def percent_rank( def cume_dist( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a cumulative distribution window function. @@ -2086,20 +2060,17 @@ def cume_dist( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.cume_dist( partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) @@ -2109,7 +2080,6 @@ def ntile( groups: int, partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - window_frame: Optional[WindowFrame] = None, null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a n-tile window function. @@ -2134,21 +2104,18 @@ def ntile( groups: Number of groups for the n-tile to be divided into. partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - window_frame: Override default window frame. null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None ) order_cols = [col.expr for col in order_by] if order_by is not None else None - window_val = window_frame.window_frame if window_frame is not None else None return Expr( f.ntile( Expr.literal(groups).expr, partition_by=partition_cols, order_by=order_cols, - window_frame=window_val, null_treatment=null_treatment, ) ) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 850caaca2..634d40de3 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -303,7 +303,6 @@ def test_distinct(): f.row_number( order_by=[column("b"), column("a")], partition_by=[column("c")], - window_frame=WindowFrame("rows", 1, 0), null_treatment=NullTreatment.RESPECT_NULLS, ), [2, 1, 3, 4, 2, 1, 3], @@ -373,7 +372,6 @@ def test_distinct(): default_value=-1, order_by=[column("b"), column("a")], partition_by=[column("c")], - window_frame=WindowFrame("rows", 3, 0), ), [8, 7, -1, -1, -1, 9, -1], ), @@ -386,7 +384,6 @@ def test_distinct(): default_value=-1, order_by=[column("b"), column("a")], partition_by=[column("c")], - window_frame=WindowFrame("rows", 3, 0), ), [-1, -1, None, 7, -1, -1, None], ), @@ -421,7 +418,7 @@ def test_distinct(): ), pytest.param( "avg", - f.round(f.window("avg", [column("b")]), literal(3)), + f.round(f.window("avg", [column("b")], order_by=[column("a")]), literal(3)), [7.0, 7.0, 7.0, 7.333, 7.75, 7.75, 8.0], ), ] diff --git a/src/expr/window.rs b/src/expr/window.rs index 786651194..7eb586082 100644 --- a/src/expr/window.rs +++ b/src/expr/window.rs @@ -168,7 +168,11 @@ fn not_window_function_err(expr: Expr) -> PyErr { impl PyWindowFrame { #[new] #[pyo3(signature=(unit, start_bound, end_bound))] - pub fn new(unit: &str, start_bound: Option, end_bound: Option) -> PyResult { + pub fn new( + unit: &str, + start_bound: Option, + end_bound: Option, + ) -> PyResult { let units = unit.to_ascii_lowercase(); let units = match units.as_str() { "rows" => WindowFrameUnits::Rows, @@ -182,9 +186,7 @@ impl PyWindowFrame { } }; let start_bound = match start_bound { - Some(start_bound) => { - WindowFrameBound::Preceding(ScalarValue::UInt64(Some(start_bound))) - } + Some(start_bound) => WindowFrameBound::Preceding(start_bound), None => match units { WindowFrameUnits::Range => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), @@ -197,7 +199,7 @@ impl PyWindowFrame { }, }; let end_bound = match end_bound { - Some(end_bound) => WindowFrameBound::Following(ScalarValue::UInt64(Some(end_bound))), + Some(end_bound) => WindowFrameBound::Following(end_bound), None => match units { WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), WindowFrameUnits::Range => WindowFrameBound::Following(ScalarValue::UInt64(None)), diff --git a/src/functions.rs b/src/functions.rs index 2062bf72b..552367a93 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -18,6 +18,7 @@ use datafusion::functions_aggregate::all_default_aggregate_functions; use datafusion_expr::window_function; use datafusion_expr::ExprFunctionExt; +use datafusion_expr::WindowFrame; use pyo3::{prelude::*, wrap_pyfunction}; use crate::common::data_type::NullTreatment; @@ -596,9 +597,11 @@ fn window( ctx: Option, ) -> PyResult { let fun = find_window_fn(name, ctx)?; + let window_frame = window_frame - .unwrap_or_else(|| PyWindowFrame::new("rows", None, Some(0)).unwrap()) - .into(); + .map(|w| w.into()) + .unwrap_or(WindowFrame::new(order_by.as_ref().map(|v| !v.is_empty()))); + Ok(PyExpr { expr: datafusion_expr::Expr::WindowFunction(WindowFunction { fun, @@ -612,6 +615,10 @@ fn window( .unwrap_or_default() .into_iter() .map(|x| x.expr) + .map(|e| match e { + Expr::Sort(_) => e, + _ => e.sort(true, true), + }) .collect::>(), window_frame, null_treatment: None, @@ -872,7 +879,6 @@ fn add_builder_fns_to_window( window_fn: Expr, partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { // Since ExprFuncBuilder::new() is private, set an empty partition and then @@ -893,10 +899,6 @@ fn add_builder_fns_to_window( builder = builder.order_by(order_by_cols); } - if let Some(window_frame_vals) = window_frame { - builder = builder.window_frame(window_frame_vals.into()); - } - if let Some(null_treatment_val) = null_treatment { builder = builder.null_treatment(Some(null_treatment_val.into())); } @@ -911,18 +913,11 @@ pub fn lead( default_value: Option, partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::lead(arg.expr, Some(shift_offset), default_value); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] @@ -932,108 +927,66 @@ pub fn lag( default_value: Option, partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::lag(arg.expr, Some(shift_offset), default_value); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] pub fn row_number( partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::row_number(); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] pub fn rank( partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::rank(); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] pub fn dense_rank( partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::dense_rank(); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] pub fn percent_rank( partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::percent_rank(); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] pub fn cume_dist( partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::cume_dist(); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } #[pyfunction] @@ -1041,18 +994,11 @@ pub fn ntile( arg: PyExpr, partition_by: Option>, order_by: Option>, - window_frame: Option, null_treatment: Option, ) -> PyResult { let window_fn = window_function::ntile(arg.into()); - add_builder_fns_to_window( - window_fn, - partition_by, - order_by, - window_frame, - null_treatment, - ) + add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) } pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { From 47af8290d4fb95daf560675127c1a49f64cac510 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 30 Aug 2024 22:07:51 -0400 Subject: [PATCH 23/26] Remove deprecated warning until we actually have a way to use all functions without calling window() --- python/datafusion/functions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 8f20255bb..b0661eed6 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -28,7 +28,6 @@ from datafusion.context import SessionContext from typing import Any, Optional -from typing_extensions import deprecated import pyarrow as pa From 009c361a72e8521bdc235842ce629b78bdb376ee Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 31 Aug 2024 07:30:06 -0400 Subject: [PATCH 24/26] Built in window functions do not have any impact by setting null_treatment so remove from user facing --- python/datafusion/functions.py | 24 ---------------- python/datafusion/tests/test_dataframe.py | 2 -- src/functions.rs | 34 ++++++----------------- 3 files changed, 9 insertions(+), 51 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index b0661eed6..d4b24994a 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1766,7 +1766,6 @@ def lead( default_value: Optional[Any] = None, partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a lead window function. @@ -1796,7 +1795,6 @@ def lead( default_value: Value to return if shift_offet row does not exist. partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ if not isinstance(default_value, pa.Scalar) and default_value is not None: default_value = pa.scalar(default_value) @@ -1813,7 +1811,6 @@ def lead( default_value, partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -1824,7 +1821,6 @@ def lag( default_value: Optional[Any] = None, partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a lag window function. @@ -1851,7 +1847,6 @@ def lag( default_value: Value to return if shift_offet row does not exist. partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ if not isinstance(default_value, pa.Scalar): default_value = pa.scalar(default_value) @@ -1868,7 +1863,6 @@ def lag( default_value, partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -1876,7 +1870,6 @@ def lag( def row_number( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a row number window function. @@ -1896,7 +1889,6 @@ def row_number( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None @@ -1907,7 +1899,6 @@ def row_number( f.row_number( partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -1915,7 +1906,6 @@ def row_number( def rank( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a rank window function. @@ -1940,7 +1930,6 @@ def rank( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None @@ -1951,7 +1940,6 @@ def rank( f.rank( partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -1959,7 +1947,6 @@ def rank( def dense_rank( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a dense_rank window function. @@ -1979,7 +1966,6 @@ def dense_rank( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None @@ -1990,7 +1976,6 @@ def dense_rank( f.dense_rank( partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -1998,7 +1983,6 @@ def dense_rank( def percent_rank( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a percent_rank window function. @@ -2019,7 +2003,6 @@ def percent_rank( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None @@ -2030,7 +2013,6 @@ def percent_rank( f.percent_rank( partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -2038,7 +2020,6 @@ def percent_rank( def cume_dist( partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a cumulative distribution window function. @@ -2059,7 +2040,6 @@ def cume_dist( Args: partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None @@ -2070,7 +2050,6 @@ def cume_dist( f.cume_dist( partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) @@ -2079,7 +2058,6 @@ def ntile( groups: int, partition_by: Optional[list[Expr]] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Create a n-tile window function. @@ -2103,7 +2081,6 @@ def ntile( groups: Number of groups for the n-tile to be divided into. partition_by: Expressions to partition the window frame on. order_by: Set ordering within the window frame. - null_treatment: Specify how nulls are to be treated. """ partition_cols = ( [col.expr for col in partition_by] if partition_by is not None else None @@ -2115,6 +2092,5 @@ def ntile( Expr.literal(groups).expr, partition_by=partition_cols, order_by=order_cols, - null_treatment=null_treatment, ) ) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 634d40de3..c2a5f22ba 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -30,7 +30,6 @@ literal, udf, ) -from datafusion.common import NullTreatment @pytest.fixture @@ -303,7 +302,6 @@ def test_distinct(): f.row_number( order_by=[column("b"), column("a")], partition_by=[column("c")], - null_treatment=NullTreatment.RESPECT_NULLS, ), [2, 1, 3, 4, 2, 1, 3], ), diff --git a/src/functions.rs b/src/functions.rs index 552367a93..2675240fc 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -879,7 +879,6 @@ fn add_builder_fns_to_window( window_fn: Expr, partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { // Since ExprFuncBuilder::new() is private, set an empty partition and then // override later if appropriate. @@ -899,10 +898,6 @@ fn add_builder_fns_to_window( builder = builder.order_by(order_by_cols); } - if let Some(null_treatment_val) = null_treatment { - builder = builder.null_treatment(Some(null_treatment_val.into())); - } - builder.build().map(|e| e.into()).map_err(|err| err.into()) } @@ -913,11 +908,10 @@ pub fn lead( default_value: Option, partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::lead(arg.expr, Some(shift_offset), default_value); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] @@ -927,66 +921,57 @@ pub fn lag( default_value: Option, partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::lag(arg.expr, Some(shift_offset), default_value); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] pub fn row_number( partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::row_number(); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] -pub fn rank( - partition_by: Option>, - order_by: Option>, - null_treatment: Option, -) -> PyResult { +pub fn rank(partition_by: Option>, order_by: Option>) -> PyResult { let window_fn = window_function::rank(); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] pub fn dense_rank( partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::dense_rank(); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] pub fn percent_rank( partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::percent_rank(); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] pub fn cume_dist( partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::cume_dist(); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } #[pyfunction] @@ -994,11 +979,10 @@ pub fn ntile( arg: PyExpr, partition_by: Option>, order_by: Option>, - null_treatment: Option, ) -> PyResult { let window_fn = window_function::ntile(arg.into()); - add_builder_fns_to_window(window_fn, partition_by, order_by, null_treatment) + add_builder_fns_to_window(window_fn, partition_by, order_by) } pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { From bc3be5d4b6f64703efda75020e52489cd234f9ef Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 31 Aug 2024 08:01:28 -0400 Subject: [PATCH 25/26] Update user documentation on how to pass parameters for different window functions and what their impacts are --- .../user-guide/common-operations/windows.rst | 120 +++++++++++++++--- 1 file changed, 105 insertions(+), 15 deletions(-) diff --git a/docs/source/user-guide/common-operations/windows.rst b/docs/source/user-guide/common-operations/windows.rst index cf4722fa6..609176897 100644 --- a/docs/source/user-guide/common-operations/windows.rst +++ b/docs/source/user-guide/common-operations/windows.rst @@ -57,6 +57,10 @@ previous row in the DataFrame. Setting Parameters ------------------ + +Ordering +^^^^^^^^ + You can control the order in which rows are processed by window functions by providing a list of ``order_by`` functions for the ``order_by`` parameter. @@ -66,28 +70,114 @@ a list of ``order_by`` functions for the ``order_by`` parameter. col('"Name"'), col('"Attack"'), col('"Type 1"'), - f.rank() - .partition_by(col('"Type 1"')) - .order_by(col('"Attack"').sort(ascending=True)) - .build() - .alias("rank"), - ).sort(col('"Type 1"').sort(), col('"Attack"').sort()) + f.rank( + partition_by=[col('"Type 1"')], + order_by=[col('"Attack"').sort(ascending=True)], + ).alias("rank"), + ).sort(col('"Type 1"'), col('"Attack"')) + +Partitions +^^^^^^^^^^ + +A window function can take a list of ``partition_by`` columns similar to an +:ref:`Aggregation Function`. This will cause the window values to be evaluated +independently for each of the partitions. In the example above, we found the rank of each +Pokemon per ``Type 1`` partitions. We can see the first couple of each partition if we do +the following: + +.. ipython:: python + + df.select( + col('"Name"'), + col('"Attack"'), + col('"Type 1"'), + f.rank( + partition_by=[col('"Type 1"')], + order_by=[col('"Attack"').sort(ascending=True)], + ).alias("rank"), + ).filter(col("rank") < lit(3)).sort(col('"Type 1"'), col("rank")) + +Window Frame +^^^^^^^^^^^^ + +When using aggregate functions, the Window Frame of defines the rows over which it operates. +If you do not specify a Window Frame, the frame will be set depending on the following +criteria. + +* If an ``order_by`` clause is set, the default window frame is defined as the rows between + unbounded preceeding and the current row. +* If an ``order_by`` is not set, the default frame is defined as the rows betwene unbounded + and unbounded following (the entire partition). + +Window Frames are defined by three parameters: unit type, starting bound, and ending bound. + +The unit types available are: -Window Functions can be configured using a builder approach to set a few parameters. -To create a builder you simply need to call any one of these functions +* Rows: The starting and ending boundaries are defined by the number of rows relative to the + current row. +* Range: When using Range, the ``order_by`` clause must have exactly one term. The boundaries + are defined bow how close the rows are to the value of the expression in the ``order_by`` + parameter. +* Groups: A "group" is the set of all rows that have equivalent values for all terms in the + ``order_by`` clause. -- :py:func:`datafusion.expr.Expr.order_by` to set the window ordering. -- :py:func:`datafusion.expr.Expr.null_treatment` to set how ``null`` values should be handled. -- :py:func:`datafusion.expr.Expr.partition_by` to set the partitions for processing. -- :py:func:`datafusion.expr.Expr.window_frame` to set boundary of operation. +In this example we perform a "rolling average" of the speed of the current Pokemon and the +two preceeding rows. -After these parameters are set, you must call ``build()`` on the resultant object to get an -expression as shown in the example above. +.. ipython:: python + + from datafusion.expr import WindowFrame + + df.select( + col('"Name"'), + col('"Speed"'), + f.window("avg", + [col('"Speed"')], + order_by=[col('"Speed"')], + window_frame=WindowFrame("rows", 2, 0) + ).alias("Previous Speed") + ) + +Null Treatment +^^^^^^^^^^^^^^ + +When using aggregate functions as window functions, it is often useful to specify how null values +should be treated. In order to do this you need to use the builder function. In future releases +we expect this to be simplified in the interface. + +One common usage for handling nulls is the case where you want to find the last value up to the +current row. In the following example we demonstrate how setting the null treatment to ignore +nulls will fill in with the value of the most recent non-null row. To do this, we also will set +the window frame so that we only process up to the current row. + +In this example, we filter down to one specific type of Pokemon that does have some entries in +it's ``Type 2`` column that are null. + +.. ipython:: python + + from datafusion.common import NullTreatment + + df.filter(col('"Type 1"') == lit("Bug")).select( + '"Name"', + '"Type 2"', + f.window("last_value", [col('"Type 2"')]) + .window_frame(WindowFrame("rows", None, 0)) + .order_by(col('"Speed"')) + .null_treatment(NullTreatment.IGNORE_NULLS) + .build() + .alias("last_wo_null"), + f.window("last_value", [col('"Type 2"')]) + .window_frame(WindowFrame("rows", None, 0)) + .order_by(col('"Speed"')) + .null_treatment(NullTreatment.RESPECT_NULLS) + .build() + .alias("last_with_null") + ) Aggregate Functions ------------------- -You can use any :ref:`Aggregation Function` as a window function. Currently +You can use any :ref:`Aggregation Function` as a window function. Currently aggregate functions must use the deprecated :py:func:`datafusion.functions.window` API but this should be resolved in DataFusion 42.0 (`Issue Link `_). Here From 070b595f3441332f76cbaf4921048cdcd49267b5 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 31 Aug 2024 08:18:30 -0400 Subject: [PATCH 26/26] Make first_value and last_value identical in the interface --- python/datafusion/functions.py | 30 ++++++++++++++---- python/datafusion/tests/test_functions.py | 1 + src/functions.rs | 37 +++++++++++++++++------ 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index d4b24994a..28201c1d1 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1710,29 +1710,47 @@ def regr_syy(y: Expr, x: Expr, distinct: bool = False) -> Expr: def first_value( arg: Expr, distinct: bool = False, - filter: bool = None, - order_by: Expr | None = None, - null_treatment: common.NullTreatment | None = None, + filter: Optional[bool] = None, + order_by: Optional[list[Expr]] = None, + null_treatment: Optional[common.NullTreatment] = None, ) -> Expr: """Returns the first value in a group of values.""" + order_by_cols = [e.expr for e in order_by] if order_by is not None else None + return Expr( f.first_value( arg.expr, distinct=distinct, filter=filter, - order_by=order_by, + order_by=order_by_cols, null_treatment=null_treatment, ) ) -def last_value(arg: Expr) -> Expr: +def last_value( + arg: Expr, + distinct: bool = False, + filter: Optional[bool] = None, + order_by: Optional[list[Expr]] = None, + null_treatment: Optional[common.NullTreatment] = None, +) -> Expr: """Returns the last value in a group of values. To set parameters on this expression, use ``.order_by()``, ``.distinct()``, ``.filter()``, or ``.null_treatment()``. """ - return Expr(f.last_value(arg.expr)) + order_by_cols = [e.expr for e in order_by] if order_by is not None else None + + return Expr( + f.last_value( + arg.expr, + distinct=distinct, + filter=filter, + order_by=order_by_cols, + null_treatment=null_treatment, + ) + ) def bit_and(arg: Expr, distinct: bool = False) -> Expr: diff --git a/python/datafusion/tests/test_functions.py b/python/datafusion/tests/test_functions.py index e5429bd60..fe092c456 100644 --- a/python/datafusion/tests/test_functions.py +++ b/python/datafusion/tests/test_functions.py @@ -963,6 +963,7 @@ def test_first_last_value(df): assert result.column(3) == pa.array(["!"]) assert result.column(4) == pa.array([6]) assert result.column(5) == pa.array([datetime(2020, 7, 2)]) + df.show() def test_binary_string_functions(df): diff --git a/src/functions.rs b/src/functions.rs index 2675240fc..aed4de474 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -319,18 +319,15 @@ pub fn regr_syy(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult, order_by: Option>, null_treatment: Option, ) -> PyResult { - // If we initialize the UDAF with order_by directly, then it gets over-written by the builder - let agg_fn = functions_aggregate::expr_fn::first_value(expr.expr, None); - - // luckily, I can guarantee initializing a builder with an `order_by` default of empty vec + // Since ExprFuncBuilder::new() is private, we can guarantee initializing + // a builder with an `order_by` default of empty vec let order_by = order_by .map(|x| x.into_iter().map(|x| x.expr).collect::>()) .unwrap_or_default(); @@ -351,8 +348,30 @@ pub fn first_value( } #[pyfunction] -pub fn last_value(expr: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::last_value(vec![expr.expr]).into() +pub fn first_value( + expr: PyExpr, + distinct: bool, + filter: Option, + order_by: Option>, + null_treatment: Option, +) -> PyResult { + // If we initialize the UDAF with order_by directly, then it gets over-written by the builder + let agg_fn = functions_aggregate::expr_fn::first_value(expr.expr, None); + + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) +} + +#[pyfunction] +pub fn last_value( + expr: PyExpr, + distinct: bool, + filter: Option, + order_by: Option>, + null_treatment: Option, +) -> PyResult { + let agg_fn = functions_aggregate::expr_fn::last_value(vec![expr.expr]); + + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) } #[pyfunction]