Skip to content
This repository was archived by the owner on Jul 25, 2022. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Simple usage:
```python
import datafusion
from datafusion import functions as f
from datafusion import col
import pyarrow

# create a context
Expand All @@ -54,8 +55,8 @@ df = ctx.create_dataframe([[batch]])

# create a new statement
df = df.select(
f.col("a") + f.col("b"),
f.col("a") - f.col("b"),
col("a") + col("b"),
col("a") - col("b"),
)

# execute and collect the first (and only) batch
Expand All @@ -68,31 +69,35 @@ assert result.column(1) == pyarrow.array([-3, -3, -3])
### UDFs

```python
from datafusion import udf

def is_null(array: pyarrow.Array) -> pyarrow.Array:
return array.is_null()

udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_())
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')

df = df.select(is_null_arr(col("a")))

result = df.collect()

df = df.select(udf(f.col("a")))
assert result.column(0) == pyarrow.array([False] * 3)
```

### UDAF

```python
import pyarrow
import pyarrow.compute
from datafusion import udaf, Accumulator


class Accumulator:
class MyAccumulator(Accumulator):
"""
Interface of a user-defined accumulation.
"""
def __init__(self):
self._sum = pyarrow.scalar(0.0)

def to_scalars(self) -> [pyarrow.Scalar]:
return [self._sum]

def update(self, values: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
Expand All @@ -101,18 +106,25 @@ class Accumulator:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())

def state(self) -> pyarrow.Array:
return pyarrow.array([self._sum.as_py()])

def evaluate(self) -> pyarrow.Scalar:
return self._sum


df = ...
df = ctx.create_dataframe([[batch]])

udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()])
my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')

df = df.aggregate(
[],
[udaf(f.col("a"))]
[my_udaf(col("a"))]
)

result = df.collect()[0]

assert result.column(0) == pyarrow.array([6.0])
```

## How to install (from pip)
Expand Down