Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 211 additions & 121 deletions dask_sql/physical/rel/logical/aggregate.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions dask_sql/physical/rel/logical/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ def convert(
f"common_{i}": df_rhs_renamed.iloc[:, index]
for i, index in enumerate(rhs_on)
}

# SQL compatibility: when joining on columns that
# contain NULLs, pandas will actually happily
# keep those NULLs. That is however not compatible with
# SQL, so we get rid of them here
if join_type in ["inner", "right"]:
df_lhs_filter = reduce(
operator.and_,
[~df_lhs_renamed.iloc[:, index].isna() for index in lhs_on],
)
df_lhs_renamed = df_lhs_renamed[df_lhs_filter]
if join_type in ["inner", "left"]:
df_rhs_filter = reduce(
operator.and_,
[~df_rhs_renamed.iloc[:, index].isna() for index in rhs_on],
)
df_rhs_renamed = df_rhs_renamed[df_rhs_filter]
else:
# We are in the complex join case
# where we have no column to merge on
Expand Down
2 changes: 1 addition & 1 deletion dask_sql/physical/rel/logical/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _sort_first_column(
col = df[first_sort_column]
is_na = col.isna().persist()
if is_na.any().compute():
df_is_na = df[is_na].reset_index(drop=True)
df_is_na = df[is_na].reset_index(drop=True).repartition(1)
df_not_is_na = (
df[~is_na]
.set_index(first_sort_column, drop=False)
Expand Down
17 changes: 17 additions & 0 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ def null(self, df: SeriesOrScalar,) -> SeriesOrScalar:
return pd.isna(df) or df is None or np.isnan(df)


class IsNotDistinctOperation(Operation):
"""The is not distinct operator"""

def __init__(self):
super().__init__(self.not_distinct)

def not_distinct(self, lhs: SeriesOrScalar, rhs: SeriesOrScalar) -> SeriesOrScalar:
"""
Returns true where `lhs` is not distinct from `rhs` (or both are null).
"""
is_null = IsNullOperation()

return (is_null(lhs) & is_null(rhs)) | (lhs == rhs)


class RegexOperation(Operation):
"""An abstract regex operation, which transforms the SQL regex into something python can understand"""

Expand Down Expand Up @@ -627,6 +642,8 @@ class RexCallPlugin(BaseRexPlugin):
"-": ReduceOperation(operation=operator.sub, unary_operation=lambda x: -x),
"/": ReduceOperation(operation=SQLDivisionOperator()),
"*": ReduceOperation(operation=operator.mul),
"is distinct from": NotOperation().of(IsNotDistinctOperation()),
"is not distinct from": IsNotDistinctOperation(),
# special operations
"cast": lambda x: x,
"case": CaseOperation(),
Expand Down
9 changes: 6 additions & 3 deletions docs/pages/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,16 @@ Limitatons

``dask-sql`` is still in early development, therefore exist some limitations:

* Not all operations and aggregations are implemented already, most prominently: ``WINDOW`` is not implemented so far.
* ``GROUP BY`` aggregations can not use ``DISTINCT``
Not all operations and aggregations are implemented already, most prominently: ``WINDOW`` is not implemented so far.

.. note::

Whenever you find a not already implemented operation, keyword
or functionality, please raise an issue at our `issue tracker <https://github.com/nils-braun/dask-sql/issues>`_ with your use-case.

Dask/pandas and SQL treat null-values (or nan) differently on sorting, grouping and joining.
``dask-sql`` tries to follow the SQL standard as much as possible, so results might be different to what you expect from Dask/pandas.

Apart from those functional limitations, there is a operation which need special care: ``ORDER BY```.
Normally, ``dask-sql`` calls create a ``dask`` data frame, which gets only computed when you call the ``.compute()`` member.
Due to internal constraints, this is currently not the case for ``ORDER BY``.
Expand All @@ -218,4 +220,5 @@ Including this operation will trigger a calculation of the full data frame alrea
The data inside ``dask`` is partitioned, to distribute it over the cluster.
``head`` will only return the first N elements from the first partition - even if N is larger than the partition size.
As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered -
but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not.
Loading