diff --git a/README.md b/README.md index d4b953f..46e6429 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ Simple usage: ```python import datafusion from datafusion import functions as f +from datafusion import col import pyarrow # create a context @@ -54,8 +55,8 @@ df = ctx.create_dataframe([[batch]]) # create a new statement df = df.select( - f.col("a") + f.col("b"), - f.col("a") - f.col("b"), + col("a") + col("b"), + col("a") - col("b"), ) # execute and collect the first (and only) batch @@ -68,12 +69,18 @@ assert result.column(1) == pyarrow.array([-3, -3, -3]) ### UDFs ```python +from datafusion import udf + def is_null(array: pyarrow.Array) -> pyarrow.Array: return array.is_null() -udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_()) +is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable') + +df = df.select(is_null_arr(col("a"))) + +result = df.collect() -df = df.select(udf(f.col("a"))) +assert result.column(0) == pyarrow.array([False] * 3) ``` ### UDAF @@ -81,18 +88,16 @@ df = df.select(udf(f.col("a"))) ```python import pyarrow import pyarrow.compute +from datafusion import udaf, Accumulator -class Accumulator: +class MyAccumulator(Accumulator): """ Interface of a user-defined accumulation. """ def __init__(self): self._sum = pyarrow.scalar(0.0) - def to_scalars(self) -> [pyarrow.Scalar]: - return [self._sum] - def update(self, values: pyarrow.Array) -> None: # not nice since pyarrow scalars can't be summed yet. This breaks on `None` self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py()) @@ -101,18 +106,25 @@ class Accumulator: # not nice since pyarrow scalars can't be summed yet. This breaks on `None` self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py()) + def state(self) -> pyarrow.Array: + return pyarrow.array([self._sum.as_py()]) + def evaluate(self) -> pyarrow.Scalar: return self._sum -df = ... +df = ctx.create_dataframe([[batch]]) -udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()]) +my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable') df = df.aggregate( [], - [udaf(f.col("a"))] + [my_udaf(col("a"))] ) + +result = df.collect()[0] + +assert result.column(0) == pyarrow.array([6.0]) ``` ## How to install (from pip)