diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 3d1394876..8fa832918 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -1,9 +1,10 @@ import operator from collections import defaultdict from functools import reduce -from typing import Callable, Dict, List, Tuple, Union +from typing import Any, Callable, Dict, List, Tuple, Union import logging +import pandas as pd import dask.dataframe as dd from dask_sql.utils import new_temporary_column @@ -13,45 +14,50 @@ logger = logging.getLogger(__name__) -class GroupDatasetDescription: +class ReduceAggregation(dd.Aggregation): """ - Helper class to put dataframes which are filtered according to a specific column - into a dictionary. - Applying the same filter twice on the same dataframe does not give different - dataframes. Therefore we only hash these dataframes according to the column - they are filtered by. + A special form of an aggregation, that applies a given operation + on all elements in a group with "reduce". """ - def __init__(self, df: dd.DataFrame, filtered_column: str = ""): - self.df = df - self.filtered_column = filtered_column + def __init__(self, name: str, operation: Callable): + series_aggregate = lambda s: s.aggregate(lambda x: reduce(operation, x)) - def __eq__(self, rhs: "GroupDatasetDescription") -> bool: - """They are equal of they are filtered by the same column""" - return self.filtered_column == rhs.filtered_column + super().__init__(name, series_aggregate, series_aggregate) - def __hash__(self) -> str: - return hash(self.filtered_column) - def __repr__(self) -> str: - return f"GroupDatasetDescription({self.filtered_column})" +class AggregationOnPandas(dd.Aggregation): + """ + A special form of an aggregation, which does not apply the given function + (given as attribute name) directly to the dask groupby, but + via the groupby().apply() method. This is needed to call + functions directly on the pandas dataframes, but should be done + very carefully (as it is a performance bottleneck). + """ + def __init__(self, function_name: str): + def _f(s): + return s.apply(lambda s0: getattr(s0.dropna(), function_name)()) -# Description of an aggregation in the form of a mapping -# input column -> output column -> aggregation -AggregationDescription = Dict[str, Dict[str, Union[str, dd.Aggregation]]] + super().__init__(function_name, _f, _f) -class ReduceAggregation(dd.Aggregation): +class AggregationSpecification: """ - A special form of an aggregation, that applies a given operation - on all elements in a group with "reduce". + Most of the aggregations in SQL are already + implemented 1:1 in dask and can just be called via their name + (e.g. AVG is the mean). However sometimes those already + implemented functions only work well for numerical + functions. This small container class therefore + can have an additional aggregation function, which is + valid for non-numerical types. """ - def __init__(self, name: str, operation: Callable): - series_aggregate = lambda s: s.aggregate(lambda x: reduce(operation, x)) - - super().__init__(name, series_aggregate, series_aggregate) + def __init__(self, numerical_aggregation, non_numerical_aggregation=None): + self.numerical_aggregation = numerical_aggregation + self.non_numerical_aggregation = ( + non_numerical_aggregation or numerical_aggregation + ) class LogicalAggregatePlugin(BaseRelPlugin): @@ -63,31 +69,45 @@ class LogicalAggregatePlugin(BaseRelPlugin): group over, in the second case we "cheat" and add a 1-column to the dataframe, which allows us to reuse every aggregation function we already know of. + As NULLs are not groupable in dask, we handle them special + by adding a temporary column which is True for all NULL values + and False otherwise (and also group by it). The rest is just a lot of column-name-bookkeeping. Fortunately calcite will already make sure, that each aggregation function will only every be called with a single input column (by splitting the inner calculation to a step before). + + Open TODO: So far we are following the dask default + to only have a single partition after the group by (which is usual + a reasonable assumption). It would be nice to control + these things via HINTs. """ class_name = "org.apache.calcite.rel.logical.LogicalAggregate" AGGREGATION_MAPPING = { - "$sum0": "sum", - "any_value": dd.Aggregation( - "any_value", - lambda s: s.sample(n=1).values, - lambda s0: s0.sample(n=1).values, + "$sum0": AggregationSpecification("sum", AggregationOnPandas("sum")), + "any_value": AggregationSpecification( + dd.Aggregation( + "any_value", + lambda s: s.sample(n=1).values, + lambda s0: s0.sample(n=1).values, + ) + ), + "avg": AggregationSpecification("mean", AggregationOnPandas("mean")), + "bit_and": AggregationSpecification( + ReduceAggregation("bit_and", operator.and_) ), - "avg": "mean", - "bit_and": ReduceAggregation("bit_and", operator.and_), - "bit_or": ReduceAggregation("bit_or", operator.or_), - "bit_xor": ReduceAggregation("bit_xor", operator.xor), - "count": "count", - "every": dd.Aggregation("every", lambda s: s.all(), lambda s0: s0.all()), - "max": "max", - "min": "min", - "single_value": "first", + "bit_or": AggregationSpecification(ReduceAggregation("bit_or", operator.or_)), + "bit_xor": AggregationSpecification(ReduceAggregation("bit_xor", operator.xor)), + "count": AggregationSpecification("count"), + "every": AggregationSpecification( + dd.Aggregation("every", lambda s: s.all(), lambda s0: s0.all()) + ), + "max": AggregationSpecification("max", AggregationOnPandas("max")), + "min": AggregationSpecification("min", AggregationOnPandas("min")), + "single_value": AggregationSpecification("first"), } def convert( @@ -110,65 +130,22 @@ def convert( cc.get_backend_by_frontend_index(i) for i in group_column_indices ] - # Always keep an additional column around for empty groups and aggregates - additional_column_name = new_temporary_column(df) - - # NOTE: it might be the case that - # we do not need this additional - # column, but hopefully adding a single - # column of 1 is not so problematic... - df = df.assign(**{additional_column_name: 1}) - cc = cc.add(additional_column_name) dc = DataContainer(df, cc) - # Collect all aggregates - filtered_aggregations, output_column_order = self._collect_aggregations( - rel, dc, group_columns, additional_column_name, context - ) - if not group_columns: # There was actually no GROUP BY specified in the SQL # Still, this plan can also be used if we need to aggregate something over the full # data sample # To reuse the code, we just create a new column at the end with a single value - # It is important to do this after creating the aggregations, - # as we do not want this additional column to be used anywhere - group_columns = [additional_column_name] - logger.debug("Performing full-table aggregation") - # Now we can perform the aggregates - # We iterate through all pairs of (possible pre-filtered) - # dataframes and the aggregations to perform in this data... - df_agg = None - for filtered_df_desc, aggregation in filtered_aggregations.items(): - filtered_column = filtered_df_desc.filtered_column - if filtered_column: - logger.debug( - f"Aggregating {dict(aggregation)} on the data filtered by {filtered_column}" - ) - else: - logger.debug(f"Aggregating {dict(aggregation)} on the data") - - # ... we perform the aggregations ... - filtered_df = filtered_df_desc.df - # TODO: we could use the type information for - # pre-calculating the meta information - filtered_df_agg = filtered_df.groupby(by=group_columns).agg(aggregation) - - # ... fix the column names to a single level ... - filtered_df_agg.columns = filtered_df_agg.columns.get_level_values(-1) - - # ... and finally concat the new data with the already present columns - if df_agg is None: - df_agg = filtered_df_agg - else: - df_agg = df_agg.assign( - **{col: filtered_df_agg[col] for col in filtered_df_agg.columns} - ) + # Do all aggregates + df_result, output_column_order = self._do_aggregations( + rel, dc, group_columns, context, + ) # SQL does not care about the index, but we do not want to have any multiindices - df_agg = df_agg.reset_index(drop=True) + df_agg = df_result.reset_index(drop=True) # Fix the column names and the order of them, as this was messed with during the aggregations df_agg.columns = df_agg.columns.get_level_values(-1) @@ -179,48 +156,103 @@ def convert( dc = self.fix_dtype_to_row_type(dc, rel.getRowType()) return dc - def _collect_aggregations( + def _do_aggregations( self, rel: "org.apache.calcite.rel.RelNode", dc: DataContainer, group_columns: List[str], - additional_column_name: str, context: "dask_sql.Context", - ) -> Tuple[ - Dict[GroupDatasetDescription, AggregationDescription], List[int], - ]: + ) -> Tuple[dd.DataFrame, List[str]]: """ - Create a mapping of dataframe -> aggregations (in the form input colum, output column, aggregation) - and the expected order of output columns. + Main functionality: return the result dataframe + and the output column order """ - aggregations = defaultdict(lambda: defaultdict(dict)) - output_column_order = [] df = dc.df cc = dc.column_container - # SQL needs to copy the old content also. As the values of the group columns + # We might need it later. + # If not, lets hope that adding a single column should not + # be a huge problem... + additional_column_name = new_temporary_column(df) + df = df.assign(**{additional_column_name: 1}) + + # Add an entry for every grouped column, as SQL wants them first + output_column_order = group_columns.copy() + + # Collect all aggregations we need to do + collected_aggregations, output_column_order = self._collect_aggregations( + rel, df, cc, context, additional_column_name, output_column_order + ) + + # SQL needs to have a column with the grouped values as the first + # output column. + # As the values of the group columns # are the same for a single group anyways, we just use the first row for col in group_columns: - aggregations[GroupDatasetDescription(df)][col][col] = "first" - output_column_order.append(col) + collected_aggregations[None].append((col, col, "first")) + + # Now we can go ahead and use these grouped aggregations + # to perform the actual aggregation + # It is very important to start with the non-filtered entry. + # Otherwise we might loose some entries in the grouped columns + key = None + aggregations = collected_aggregations.pop(key) + df_result = self._perform_aggregation( + df, None, aggregations, additional_column_name, group_columns, + ) + + # Now we can also the the rest + for filter_column, aggregations in collected_aggregations.items(): + agg_result = self._perform_aggregation( + df, filter_column, aggregations, additional_column_name, group_columns, + ) + + # ... and finally concat the new data with the already present columns + df_result = df_result.assign( + **{col: agg_result[col] for col in agg_result.columns} + ) + + return df_result, output_column_order + + def _collect_aggregations( + self, + rel: "org.apache.calcite.rel.RelNode", + df: dd.DataFrame, + cc: ColumnContainer, + context: "dask_sql.Context", + additional_column_name: str, + output_column_order: List[str], + ) -> Tuple[Dict[Tuple[str, str], List[Tuple[str, str, Any]]], List[str]]: + """ + Collect all aggregations together, which have the same filter column + so that the aggregations only need to be done once. + + Returns the aggregations as mapping filter_column -> List of Aggregations + where the aggregations are in the form (input_col, output_col, aggregation function (or string)) + """ + collected_aggregations = defaultdict(list) - # Now collect all aggregations for agg_call in rel.getNamedAggCalls(): - output_col = str(agg_call.getValue()) expr = agg_call.getKey() - if expr.hasFilter(): - filter_column = cc.get_backend_by_frontend_index(expr.filterArg) - filter_expression = df[filter_column] - filtered_df = df[filter_expression] - - grouped_df = GroupDatasetDescription(filtered_df, filter_column) + # Find out about the input column + inputs = expr.getArgList() + if len(inputs) == 1: + input_col = cc.get_backend_by_frontend_index(inputs[0]) + elif len(inputs) == 0: + input_col = additional_column_name else: - grouped_df = GroupDatasetDescription(df) + raise NotImplementedError("Can not cope with more than one input") - if expr.isDistinct(): - raise NotImplementedError("DISTINCT is not implemented (yet)") + # Extract flags (filtering/distinct) + if expr.isDistinct(): # pragma: no cover + raise ValueError("Apache Calcite should optimize them away!") + filter_column = None + if expr.hasFilter(): + filter_column = cc.get_backend_by_frontend_index(expr.filterArg) + + # Find out which aggregation function to use aggregation_name = str(expr.getAggregation().getName()) aggregation_name = aggregation_name.lower() try: @@ -232,16 +264,74 @@ def _collect_aggregations( raise NotImplementedError( f"Aggregation function {aggregation_name} not implemented (yet)." ) + if isinstance(aggregation_function, AggregationSpecification): + dtype = df[input_col].dtype + if pd.api.types.is_numeric_dtype(dtype): + aggregation_function = aggregation_function.numerical_aggregation + else: + aggregation_function = ( + aggregation_function.non_numerical_aggregation + ) - inputs = expr.getArgList() - if len(inputs) == 1: - input_col = cc.get_backend_by_frontend_index(inputs[0]) - elif len(inputs) == 0: - input_col = additional_column_name - else: - raise NotImplementedError("Can not cope with more than one input") + # Finally, extract the output column name + output_col = str(agg_call.getValue()) - aggregations[grouped_df][input_col][output_col] = aggregation_function + # Store the aggregation + key = filter_column + value = (input_col, output_col, aggregation_function) + collected_aggregations[key].append(value) output_column_order.append(output_col) - return aggregations, output_column_order + return collected_aggregations, output_column_order + + def _perform_aggregation( + self, + df: dd.DataFrame, + filter_column: str, + aggregations: List[Tuple[str, str, Any]], + additional_column_name: str, + group_columns: List[str], + ): + tmp_df = df + + if filter_column: + filter_expression = tmp_df[filter_column] + tmp_df = tmp_df[filter_expression] + + logger.debug(f"Filtered by {filter_column} before aggregation.") + + # SQL and dask are treating null columns a bit different: + # SQL will put them to the front, dask will just ignore them + # Therefore we use the same trick as fugue does: + # we will group by both the NaN and the real column value + group_columns_and_nulls = [] + for group_column in group_columns: + # the ~ makes NaN come first + is_null_column = ~(tmp_df[group_column].isnull()) + non_nan_group_column = tmp_df[group_column].fillna(0) + + group_columns_and_nulls += [is_null_column, non_nan_group_column] + + if not group_columns_and_nulls: + # This can happen in statements like + # SELECT SUM(x) FROM data + # without any groupby statement + group_columns_and_nulls = [additional_column_name] + + grouped_df = tmp_df.groupby(by=group_columns_and_nulls) + + # Convert into the correct format for dask + aggregations_dict = defaultdict(dict) + for aggregation in aggregations: + input_col, output_col, aggregation_f = aggregation + + aggregations_dict[input_col][output_col] = aggregation_f + + # Now apply the aggregation + logger.debug(f"Performing aggregation {dict(aggregations_dict)}") + agg_result = grouped_df.agg(aggregations_dict) + + # ... fix the column names to a single level ... + agg_result.columns = agg_result.columns.get_level_values(-1) + + return agg_result diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index 8c2dd8e0f..093845304 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -100,6 +100,23 @@ def convert( f"common_{i}": df_rhs_renamed.iloc[:, index] for i, index in enumerate(rhs_on) } + + # SQL compatibility: when joining on columns that + # contain NULLs, pandas will actually happily + # keep those NULLs. That is however not compatible with + # SQL, so we get rid of them here + if join_type in ["inner", "right"]: + df_lhs_filter = reduce( + operator.and_, + [~df_lhs_renamed.iloc[:, index].isna() for index in lhs_on], + ) + df_lhs_renamed = df_lhs_renamed[df_lhs_filter] + if join_type in ["inner", "left"]: + df_rhs_filter = reduce( + operator.and_, + [~df_rhs_renamed.iloc[:, index].isna() for index in rhs_on], + ) + df_rhs_renamed = df_rhs_renamed[df_rhs_filter] else: # We are in the complex join case # where we have no column to merge on diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index 3038e445e..4c4a22ad3 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -135,7 +135,7 @@ def _sort_first_column( col = df[first_sort_column] is_na = col.isna().persist() if is_na.any().compute(): - df_is_na = df[is_na].reset_index(drop=True) + df_is_na = df[is_na].reset_index(drop=True).repartition(1) df_not_is_na = ( df[~is_na] .set_index(first_sort_column, drop=False) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index ffd82bd57..8d5666a2f 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -240,6 +240,21 @@ def null(self, df: SeriesOrScalar,) -> SeriesOrScalar: return pd.isna(df) or df is None or np.isnan(df) +class IsNotDistinctOperation(Operation): + """The is not distinct operator""" + + def __init__(self): + super().__init__(self.not_distinct) + + def not_distinct(self, lhs: SeriesOrScalar, rhs: SeriesOrScalar) -> SeriesOrScalar: + """ + Returns true where `lhs` is not distinct from `rhs` (or both are null). + """ + is_null = IsNullOperation() + + return (is_null(lhs) & is_null(rhs)) | (lhs == rhs) + + class RegexOperation(Operation): """An abstract regex operation, which transforms the SQL regex into something python can understand""" @@ -627,6 +642,8 @@ class RexCallPlugin(BaseRexPlugin): "-": ReduceOperation(operation=operator.sub, unary_operation=lambda x: -x), "/": ReduceOperation(operation=SQLDivisionOperator()), "*": ReduceOperation(operation=operator.mul), + "is distinct from": NotOperation().of(IsNotDistinctOperation()), + "is not distinct from": IsNotDistinctOperation(), # special operations "cast": lambda x: x, "case": CaseOperation(), diff --git a/docs/pages/sql.rst b/docs/pages/sql.rst index ace1297ab..c371084af 100644 --- a/docs/pages/sql.rst +++ b/docs/pages/sql.rst @@ -199,14 +199,16 @@ Limitatons ``dask-sql`` is still in early development, therefore exist some limitations: -* Not all operations and aggregations are implemented already, most prominently: ``WINDOW`` is not implemented so far. -* ``GROUP BY`` aggregations can not use ``DISTINCT`` +Not all operations and aggregations are implemented already, most prominently: ``WINDOW`` is not implemented so far. .. note:: Whenever you find a not already implemented operation, keyword or functionality, please raise an issue at our `issue tracker `_ with your use-case. +Dask/pandas and SQL treat null-values (or nan) differently on sorting, grouping and joining. +``dask-sql`` tries to follow the SQL standard as much as possible, so results might be different to what you expect from Dask/pandas. + Apart from those functional limitations, there is a operation which need special care: ``ORDER BY```. Normally, ``dask-sql`` calls create a ``dask`` data frame, which gets only computed when you call the ``.compute()`` member. Due to internal constraints, this is currently not the case for ``ORDER BY``. @@ -218,4 +220,5 @@ Including this operation will trigger a calculation of the full data frame alrea The data inside ``dask`` is partitioned, to distribute it over the cluster. ``head`` will only return the first N elements from the first partition - even if N is larger than the partition size. As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``. - ``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not. + ``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - + but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not. diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py new file mode 100644 index 000000000..b3f1dab1b --- /dev/null +++ b/tests/integration/test_compatibility.py @@ -0,0 +1,884 @@ +""" +The tests in this module are taken from +the fugue-sql module to test the compatibility +with their "understanding" of SQL +They run randomized tests and compare with sqlite. + +There are some changes compared to the fugueSQL +tests, especially when it comes to sort order: +dask-sql does not enforce a specific order after groupby +""" + +import sqlite3 +from datetime import datetime, timedelta + +import pandas as pd +import numpy as np +from pandas.testing import assert_frame_equal +from dask_sql import Context + + +def eq_sqlite(sql, **dfs): + c = Context() + engine = sqlite3.connect(":memory:") + + for name, df in dfs.items(): + c.create_table(name, df) + df.to_sql(name, engine, index=False) + + dask_result = c.sql(sql).compute().reset_index(drop=True) + sqlite_result = pd.read_sql(sql, engine).reset_index(drop=True) + + assert_frame_equal(dask_result, sqlite_result, check_dtype=False) + + +def make_rand_df(size: int, **kwargs): + np.random.seed(0) + data = {} + for k, v in kwargs.items(): + if not isinstance(v, tuple): + v = (v, 0.0) + dt, null_ct = v[0], v[1] + if dt is int: + s = np.random.randint(10, size=size) + elif dt is bool: + s = np.where(np.random.randint(2, size=size), True, False) + elif dt is float: + s = np.random.rand(size) + elif dt is str: + r = [f"ssssss{x}" for x in range(10)] + c = np.random.randint(10, size=size) + s = np.array([r[x] for x in c]) + elif dt is datetime: + rt = [datetime(2020, 1, 1) + timedelta(days=x) for x in range(10)] + c = np.random.randint(10, size=size) + s = np.array([rt[x] for x in c]) + else: + raise NotImplementedError + ps = pd.Series(s) + if null_ct > 0: + idx = np.random.choice(size, null_ct, replace=False).tolist() + ps[idx] = None + data[k] = ps + return pd.DataFrame(data) + + +def test_basic_select_from(): + df = make_rand_df(5, a=(int, 2), b=(str, 3), c=(float, 4)) + eq_sqlite("SELECT 1 AS a, 1.5 AS b, 'x' AS c") + eq_sqlite("SELECT 1+2 AS a, 1.5*3 AS b, 'x' AS c") + eq_sqlite("SELECT * FROM a", a=df) + eq_sqlite("SELECT * FROM a AS x", a=df) + eq_sqlite("SELECT b AS bb, a+1-2*3.0/4 AS cc, x.* FROM a AS x", a=df) + eq_sqlite("SELECT *, 1 AS x, 2.5 AS y, 'z' AS z FROM a AS x", a=df) + eq_sqlite("SELECT *, -(1.0+a)/3 AS x, +(2.5) AS y FROM a AS x", a=df) + + +def test_case_when(): + a = make_rand_df(100, a=(int, 20), b=(str, 30), c=(float, 40)) + eq_sqlite( + """ + SELECT a,b,c, + CASE + WHEN a<10 THEN a+3 + WHEN c<0.5 THEN a+5 + ELSE (1+2)*3 + a + END AS d + FROM a + """, + a=a, + ) + + +def test_drop_duplicates(): + # simplest + a = make_rand_df(100, a=int, b=int) + eq_sqlite( + """ + SELECT DISTINCT b, a FROM a + ORDER BY a NULLS LAST, b NULLS FIRST + """, + a=a, + ) + # mix of number and nan + a = make_rand_df(100, a=(int, 50), b=(int, 50)) + eq_sqlite( + """ + SELECT DISTINCT b, a FROM a + ORDER BY a NULLS LAST, b NULLS FIRST + """, + a=a, + ) + # mix of number and string and nulls + a = make_rand_df(100, a=(int, 50), b=(str, 50), c=float) + eq_sqlite( + """ + SELECT DISTINCT b, a FROM a + ORDER BY a NULLS LAST, b NULLS FIRST + """, + a=a, + ) + + +def test_order_by_no_limit(): + a = make_rand_df(100, a=(int, 50), b=(str, 50), c=float) + eq_sqlite( + """ + SELECT DISTINCT b, a FROM a + ORDER BY a NULLS LAST, b NULLS FIRST + """, + a=a, + ) + + +def test_order_by_limit(): + a = make_rand_df(100, a=(int, 50), b=(str, 50), c=float) + eq_sqlite( + """ + SELECT DISTINCT b, a FROM a LIMIT 0 + """, + a=a, + ) + eq_sqlite( + """ + SELECT DISTINCT b, a FROM a ORDER BY a NULLS FIRST, b NULLS FIRST LIMIT 2 + """, + a=a, + ) + eq_sqlite( + """ + SELECT b, a FROM a + ORDER BY a NULLS LAST, b NULLS FIRST LIMIT 10 + """, + a=a, + ) + + +def test_where(): + df = make_rand_df(100, a=(int, 30), b=(str, 30), c=(float, 30)) + eq_sqlite("SELECT * FROM a WHERE TRUE OR TRUE", a=df) + eq_sqlite("SELECT * FROM a WHERE TRUE AND TRUE", a=df) + eq_sqlite("SELECT * FROM a WHERE FALSE OR FALSE", a=df) + eq_sqlite("SELECT * FROM a WHERE FALSE AND FALSE", a=df) + + eq_sqlite("SELECT * FROM a WHERE TRUE OR b<='ssssss8'", a=df) + eq_sqlite("SELECT * FROM a WHERE TRUE AND b<='ssssss8'", a=df) + eq_sqlite("SELECT * FROM a WHERE FALSE OR b<='ssssss8'", a=df) + eq_sqlite("SELECT * FROM a WHERE FALSE AND b<='ssssss8'", a=df) + eq_sqlite("SELECT * FROM a WHERE a=10 OR b<='ssssss8'", a=df) + eq_sqlite("SELECT * FROM a WHERE c IS NOT NULL OR (a<5 AND b IS NOT NULL)", a=df) + + df = make_rand_df(100, a=(float, 30), b=(float, 30), c=(float, 30)) + eq_sqlite("SELECT * FROM a WHERE a<0.5 AND b<0.5 AND c<0.5", a=df) + eq_sqlite("SELECT * FROM a WHERE a<0.5 OR b<0.5 AND c<0.5", a=df) + eq_sqlite("SELECT * FROM a WHERE a IS NULL OR (b<0.5 AND c<0.5)", a=df) + eq_sqlite("SELECT * FROM a WHERE a*b IS NULL OR (b*c<0.5 AND c*a<0.5)", a=df) + + +def test_in_between(): + df = make_rand_df(10, a=(int, 3), b=(str, 3)) + eq_sqlite("SELECT * FROM a WHERE a IN (2,4,6)", a=df) + eq_sqlite("SELECT * FROM a WHERE a BETWEEN 2 AND 4+1", a=df) + eq_sqlite("SELECT * FROM a WHERE a NOT IN (2,4,6) AND a IS NOT NULL", a=df) + eq_sqlite("SELECT * FROM a WHERE a NOT BETWEEN 2 AND 4+1 AND a IS NOT NULL", a=df) + + +def test_join_inner(): + a = make_rand_df(100, a=(int, 40), b=(str, 40), c=(float, 40)) + b = make_rand_df(80, d=(float, 10), a=(int, 10), b=(str, 10)) + eq_sqlite( + """ + SELECT + a.*, d, d*c AS x + FROM a + INNER JOIN b ON a.a=b.a AND a.b=b.b + ORDER BY a.a NULLS FIRST, a.b NULLS FIRST, a.c NULLS FIRST, d NULLS FIRST + """, + a=a, + b=b, + ) + + +def test_join_left(): + a = make_rand_df(100, a=(int, 40), b=(str, 40), c=(float, 40)) + b = make_rand_df(80, d=(float, 10), a=(int, 10), b=(str, 10)) + eq_sqlite( + """ + SELECT + a.*, d, d*c AS x + FROM a LEFT JOIN b ON a.a=b.a AND a.b=b.b + ORDER BY a.a NULLS FIRST, a.b NULLS FIRST, a.c NULLS FIRST, d NULLS FIRST + """, + a=a, + b=b, + ) + + +def test_join_cross(): + a = make_rand_df(10, a=(int, 4), b=(str, 4), c=(float, 4)) + b = make_rand_df(20, dd=(float, 1), aa=(int, 1), bb=(str, 1)) + eq_sqlite("SELECT * FROM a CROSS JOIN b", a=a, b=b) + + +def test_join_multi(): + a = make_rand_df(100, a=(int, 40), b=(str, 40), c=(float, 40)) + b = make_rand_df(80, d=(float, 10), a=(int, 10), b=(str, 10)) + c = make_rand_df(80, dd=(float, 10), a=(int, 10), b=(str, 10)) + eq_sqlite( + """ + SELECT a.*,d,dd FROM a + INNER JOIN b ON a.a=b.a AND a.b=b.b + INNER JOIN c ON a.a=c.a AND c.b=b.b + ORDER BY a.a NULLS FIRST, a.b NULLS FIRST, a.c NULLS FIRST, dd NULLS FIRST, d NULLS FIRST + """, + a=a, + b=b, + c=c, + ) + + +def test_agg_count_no_group_by(): + a = make_rand_df( + 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + ) + eq_sqlite( + """ + SELECT + COUNT(a) AS c_a, + COUNT(DISTINCT a) AS cd_a, + COUNT(b) AS c_b, + COUNT(DISTINCT b) AS cd_b, + COUNT(c) AS c_c, + COUNT(DISTINCT c) AS cd_c, + COUNT(d) AS c_d, + COUNT(DISTINCT d) AS cd_d, + COUNT(e) AS c_e, + COUNT(DISTINCT a) AS cd_e + FROM a + """, + a=a, + ) + + +def test_agg_count(): + a = make_rand_df( + 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + ) + eq_sqlite( + """ + SELECT + a, b, a+1 AS c, + COUNT(c) AS c_c, + COUNT(DISTINCT c) AS cd_c, + COUNT(d) AS c_d, + COUNT(DISTINCT d) AS cd_d, + COUNT(e) AS c_e, + COUNT(DISTINCT a) AS cd_e + FROM a GROUP BY a, b + """, + a=a, + ) + + +def test_agg_sum_avg_no_group_by(): + eq_sqlite( + """ + SELECT + SUM(a) AS sum_a, + AVG(a) AS avg_a + FROM a + """, + a=pd.DataFrame({"a": [float("nan")]}), + ) + a = make_rand_df( + 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + ) + eq_sqlite( + """ + SELECT + SUM(a) AS sum_a, + AVG(a) AS avg_a, + SUM(c) AS sum_c, + AVG(c) AS avg_c, + SUM(e) AS sum_e, + AVG(e) AS avg_e, + SUM(a)+AVG(e) AS mix_1, + SUM(a+e) AS mix_2 + FROM a + """, + a=a, + ) + + +def test_agg_sum_avg(): + a = make_rand_df( + 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + ) + eq_sqlite( + """ + SELECT + a,b, a+1 AS c, + SUM(c) AS sum_c, + AVG(c) AS avg_c, + SUM(e) AS sum_e, + AVG(e) AS avg_e, + SUM(a)+AVG(e) AS mix_1, + SUM(a+e) AS mix_2 + FROM a GROUP BY a,b + """, + a=a, + ) + + +def test_agg_min_max_no_group_by(): + a = make_rand_df( + 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + ) + eq_sqlite( + """ + SELECT + MIN(a) AS min_a, + MAX(a) AS max_a, + MIN(b) AS min_b, + MAX(b) AS max_b, + MIN(c) AS min_c, + MAX(c) AS max_c, + MIN(d) AS min_d, + MAX(d) AS max_d, + MIN(e) AS min_e, + MAX(e) AS max_e, + MIN(a+e) AS mix_1, + MIN(a)+MIN(e) AS mix_2 + FROM a + """, + a=a, + ) + + +def test_agg_min_max(): + a = make_rand_df( + 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + ) + eq_sqlite( + """ + SELECT + a, b, a+1 AS c, + MIN(c) AS min_c, + MAX(c) AS max_c, + MIN(d) AS min_d, + MAX(d) AS max_d, + MIN(e) AS min_e, + MAX(e) AS max_e, + MIN(a+e) AS mix_1, + MIN(a)+MIN(e) AS mix_2 + FROM a GROUP BY a, b + """, + a=a, + ) + + +# TODO: Except not implemented so far +# def test_window_row_number(): +# a = make_rand_df(100, a=int, b=(float, 50)) +# eq_sqlite( +# """ +# SELECT *, +# ROW_NUMBER() OVER (ORDER BY a ASC, b DESC NULLS FIRST) AS a1, +# ROW_NUMBER() OVER (ORDER BY a ASC, b DESC NULLS LAST) AS a2, +# ROW_NUMBER() OVER (ORDER BY a ASC, b ASC NULLS FIRST) AS a3, +# ROW_NUMBER() OVER (ORDER BY a ASC, b ASC NULLS LAST) AS a4, +# ROW_NUMBER() OVER (PARTITION BY a ORDER BY a,b DESC) AS a5 +# FROM a +# """, +# a=a, +# ) + +# a = make_rand_df( +# 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=float +# ) +# eq_sqlite( +# """ +# SELECT *, +# ROW_NUMBER() OVER (ORDER BY a ASC, b DESC NULLS FIRST, e) AS a1, +# ROW_NUMBER() OVER (ORDER BY a ASC, b DESC NULLS LAST, e) AS a2, +# ROW_NUMBER() OVER (PARTITION BY a ORDER BY a,b DESC, e) AS a3, +# ROW_NUMBER() OVER (PARTITION BY a,c ORDER BY a,b DESC, e) AS a4 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_row_number_partition_by(): +# a = make_rand_df(100, a=int, b=(float, 50)) +# eq_sqlite( +# """ +# SELECT *, +# ROW_NUMBER() OVER (PARTITION BY a ORDER BY a,b DESC) AS a5 +# FROM a +# """, +# a=a, +# ) + +# a = make_rand_df( +# 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=float +# ) +# eq_sqlite( +# """ +# SELECT *, +# ROW_NUMBER() OVER (PARTITION BY a ORDER BY a,b DESC, e) AS a3, +# ROW_NUMBER() OVER (PARTITION BY a,c ORDER BY a,b DESC, e) AS a4 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_ranks(): +# a = make_rand_df(100, a=int, b=(float, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT *, +# RANK() OVER (PARTITION BY a ORDER BY b DESC NULLS FIRST, c) AS a1, +# DENSE_RANK() OVER (ORDER BY a ASC, b DESC NULLS LAST, c DESC) AS a2, +# PERCENT_RANK() OVER (ORDER BY a ASC, b ASC NULLS LAST, c) AS a4 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_ranks_partition_by(): +# a = make_rand_df(100, a=int, b=(float, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT *, +# RANK() OVER (PARTITION BY a ORDER BY b DESC NULLS FIRST, c) AS a1, +# DENSE_RANK() OVER +# (PARTITION BY a ORDER BY a ASC, b DESC NULLS LAST, c DESC) +# AS a2, +# PERCENT_RANK() OVER +# (PARTITION BY a ORDER BY a ASC, b ASC NULLS LAST, c) AS a4 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_lead_lag(): +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT +# LEAD(b,1) OVER (ORDER BY a) AS a1, +# LEAD(b,2,10) OVER (ORDER BY a) AS a2, +# LEAD(b,1) OVER (PARTITION BY c ORDER BY a) AS a3, +# LEAD(b,1) OVER (PARTITION BY c ORDER BY b, a ASC NULLS LAST) AS a5, + +# LAG(b,1) OVER (ORDER BY a) AS b1, +# LAG(b,2,10) OVER (ORDER BY a) AS b2, +# LAG(b,1) OVER (PARTITION BY c ORDER BY a) AS b3, +# LAG(b,1) OVER (PARTITION BY c ORDER BY b, a ASC NULLS LAST) AS b5 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_lead_lag_partition_by(): +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT +# LEAD(b,1,10) OVER (PARTITION BY c ORDER BY a) AS a3, +# LEAD(b,1) OVER (PARTITION BY c ORDER BY b, a ASC NULLS LAST) AS a5, + +# LAG(b,1) OVER (PARTITION BY c ORDER BY a) AS b3, +# LAG(b,1) OVER (PARTITION BY c ORDER BY b, a ASC NULLS LAST) AS b5 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_sum_avg(): +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# for func in ["SUM", "AVG"]: +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER () AS a1, +# {func}(b) OVER (PARTITION BY c) AS a2, +# {func}(b+a) OVER (PARTITION BY c,b) AS a3, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS a4, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS a5, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS a6 +# FROM a +# """, +# a=a, +# ) +# # >= 1.1.0 has bug on these agg function with groupby+rolloing +# # https://github.com/pandas-dev/pandas/issues/35557 +# if pd.__version__ < "1.1": +# # irregular windows +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING) AS a6, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) AS a7, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND UNBOUNDED FOLLOWING) AS a8 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_sum_avg_partition_by(): +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# for func in ["SUM", "AVG"]: +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b+a) OVER (PARTITION BY c,b) AS a3, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS a4, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS a5, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS a6 +# FROM a +# """, +# a=a, +# ) +# # 1.1.0 has bug on these agg function with groupby+rolloing +# # https://github.com/pandas-dev/pandas/issues/35557 +# if pd.__version__ < "1.1": +# # irregular windows +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING) AS a6, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) AS a7, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND UNBOUNDED FOLLOWING) AS a8 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_min_max(): +# for func in ["MIN", "MAX"]: +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER () AS a1, +# {func}(b) OVER (PARTITION BY c) AS a2, +# {func}(b+a) OVER (PARTITION BY c,b) AS a3, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS a4, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS a5, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS a6 +# FROM a +# """, +# a=a, +# ) +# # < 1.1.0 has bugs on these agg function with rolloing (no group by) +# if pd.__version__ >= "1.1": +# # irregular windows +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING) AS a6, +# {func}(b) OVER (ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) AS a7, +# {func}(b) OVER (ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND UNBOUNDED FOLLOWING) AS a8 +# FROM a +# """, +# a=a, +# ) +# # == 1.1.0 has bugs on these agg function with rolloing (with group by) +# # https://github.com/pandas-dev/pandas/issues/35557 +# # < 1.1.0 has bugs on nulls when rolling with forward looking +# if pd.__version__ < "1.1": +# b = make_rand_df(10, a=float, b=(int, 0), c=(str, 0)) +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING) AS a6 +# FROM a +# """, +# a=b, +# ) + +# TODO: Except not implemented so far +# def test_window_min_max_partition_by(): +# for func in ["MIN", "MAX"]: +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY c) AS a2, +# {func}(b+a) OVER (PARTITION BY c,b) AS a3, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS a4, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS a5, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS a6 +# FROM a +# """, +# a=a, +# ) +# # >= 1.1.0 has bugs on these agg function with rolloing (with group by) +# # https://github.com/pandas-dev/pandas/issues/35557 +# # < 1.1.0 has bugs on nulls when rolling with forward looking +# if pd.__version__ < "1.1": +# b = make_rand_df(10, a=float, b=(int, 0), c=(str, 0)) +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING) AS a6 +# FROM a +# """, +# a=b, +# ) + +# TODO: Except not implemented so far +# def test_window_count(): +# for func in ["COUNT"]: +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER () AS a1, +# {func}(b) OVER (PARTITION BY c) AS a2, +# {func}(b+a) OVER (PARTITION BY c,b) AS a3, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS a4, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS a5, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS a6, + +# {func}(c) OVER () AS b1, +# {func}(c) OVER (PARTITION BY c) AS b2, +# {func}(c) OVER (PARTITION BY c,b) AS b3, +# {func}(c) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS b4, +# {func}(c) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS b5, +# {func}(c) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS b6 +# FROM a +# """, +# a=a, +# ) +# # < 1.1.0 has bugs on these agg function with rolloing (no group by) +# # == 1.1.0 has this bug +# # https://github.com/pandas-dev/pandas/issues/35579 +# if pd.__version__ >= "1.1": +# # irregular windows +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 0 PRECEDING) AS a6, +# {func}(b) OVER (PARTITION BY c ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 0 PRECEDING) AS a9, + +# {func}(c) OVER (ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 0 PRECEDING) AS b6, +# {func}(c) OVER (PARTITION BY c ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 0 PRECEDING) AS b9 +# FROM a +# """, +# a=a, +# ) + +# TODO: Except not implemented so far +# def test_window_count_partition_by(): +# for func in ["COUNT"]: +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY c) AS a2, +# {func}(b+a) OVER (PARTITION BY c,b) AS a3, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS a4, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS a5, +# {func}(b+a) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS a6, + +# {func}(c) OVER (PARTITION BY c) AS b2, +# {func}(c) OVER (PARTITION BY c,b) AS b3, +# {func}(c) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS b4, +# {func}(c) OVER (PARTITION BY b ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS b5, +# {func}(c) OVER (PARTITION BY b ORDER BY a +# ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +# AS b6 +# FROM a +# """, +# a=a, +# ) +# # < 1.1.0 has bugs on these agg function with rolloing (no group by) +# # == 1.1.0 has this bug +# # https://github.com/pandas-dev/pandas/issues/35579 +# if pd.__version__ >= "1.1": +# # irregular windows +# eq_sqlite( +# f""" +# SELECT a,b, +# {func}(b) OVER (PARTITION BY c ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 0 PRECEDING) AS a9, + +# {func}(c) OVER (PARTITION BY c ORDER BY a DESC +# ROWS BETWEEN 2 PRECEDING AND 0 PRECEDING) AS b9 +# FROM a +# """, +# a=a, +# ) + +# TODO: Windowing not implemented so far +# def test_nested_query(): +# a = make_rand_df(100, a=float, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT * FROM ( +# SELECT *, +# ROW_NUMBER() OVER (PARTITION BY c ORDER BY b, a ASC NULLS LAST) AS r +# FROM a) +# WHERE r=1 +# """, +# a=a, +# ) + + +def test_union(): + a = make_rand_df(30, b=(int, 10), c=(str, 10)) + b = make_rand_df(80, b=(int, 50), c=(str, 50)) + c = make_rand_df(100, b=(int, 50), c=(str, 50)) + eq_sqlite( + """ + SELECT * FROM a + UNION SELECT * FROM b + UNION SELECT * FROM c + ORDER BY b NULLS FIRST, c NULLS FIRST + """, + a=a, + b=b, + c=c, + ) + eq_sqlite( + """ + SELECT * FROM a + UNION ALL SELECT * FROM b + UNION ALL SELECT * FROM c + ORDER BY b NULLS FIRST, c NULLS FIRST + """, + a=a, + b=b, + c=c, + ) + + +# TODO: Except not implemented so far +# def test_except(): +# a = make_rand_df(30, b=(int, 10), c=(str, 10)) +# b = make_rand_df(80, b=(int, 50), c=(str, 50)) +# c = make_rand_df(100, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT * FROM c +# EXCEPT SELECT * FROM b +# EXCEPT SELECT * FROM c +# """, +# a=a, +# b=b, +# c=c, +# ) + +# TODO: Intersect not implemented so far +# def test_intersect(): +# a = make_rand_df(30, b=(int, 10), c=(str, 10)) +# b = make_rand_df(80, b=(int, 50), c=(str, 50)) +# c = make_rand_df(100, b=(int, 50), c=(str, 50)) +# eq_sqlite( +# """ +# SELECT * FROM c +# INTERSECT SELECT * FROM b +# INTERSECT SELECT * FROM c +# """, +# a=a, +# b=b, +# c=c, +# ) + + +def test_with(): + a = make_rand_df(30, a=(int, 10), b=(str, 10)) + b = make_rand_df(80, ax=(int, 10), bx=(str, 10)) + eq_sqlite( + """ + WITH + aa AS ( + SELECT a AS aa, b AS bb FROM a + ), + c AS ( + SELECT aa-1 AS aa, bb FROM aa + ) + SELECT * FROM c UNION SELECT * FROM b + ORDER BY aa NULLS FIRST, bb NULLS FIRST + """, + a=a, + b=b, + ) + + +def test_integration_1(): + a = make_rand_df(100, a=int, b=str, c=float, d=int, e=bool, f=str, g=str, h=float) + eq_sqlite( + """ + WITH + a1 AS ( + SELECT a+1 AS a, b, c FROM a + ), + a2 AS ( + SELECT a,MAX(b) AS b_max, AVG(c) AS c_avg FROM a GROUP BY a + ), + a3 AS ( + SELECT d+2 AS d, f, g, h FROM a WHERE e + ) + SELECT a1.a,b,c,b_max,c_avg,f,g,h FROM a1 + INNER JOIN a2 ON a1.a=a2.a + LEFT JOIN a3 ON a1.a=a3.d + ORDER BY a1.a NULLS FIRST, b NULLS FIRST, c NULLS FIRST, f NULLS FIRST, g NULLS FIRST, h NULLS FIRST + """, + a=a, + ) diff --git a/tests/integration/test_groupby.py b/tests/integration/test_groupby.py index 76d35939d..d305d6b06 100644 --- a/tests/integration/test_groupby.py +++ b/tests/integration/test_groupby.py @@ -127,7 +127,7 @@ def test_group_by_nan(c): ) df = df.compute() - expected_df = pd.DataFrame({"c": [3, 1]}) + expected_df = pd.DataFrame({"c": [3, float("nan"), 1]}) # The dtype in pandas 1.0.5 and pandas 1.1.0 are different, so # we can not check here assert_frame_equal(df, expected_df, check_dtype=False) @@ -206,3 +206,16 @@ def test_aggregations(c): } ) assert_frame_equal(df.sort_values("user_id").reset_index(drop=True), expected_df) + + df = c.sql( + """ + SELECT + MAX(a) AS "max", + MIN(a) AS "min" + FROM string_table + """ + ) + df = df.compute() + + expected_df = pd.DataFrame({"max": ["a normal string"], "min": ["%_%"]}) + assert_frame_equal(df.reset_index(drop=True), expected_df) diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index 5d8c83807..ffec77a84 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -228,6 +228,29 @@ def test_sort_with_nan_more_columns(): ) +def test_sort_with_nan_many_partitions(): + c = Context() + df = pd.DataFrame({"a": [float("nan"), 1] * 30, "b": [1, 2, 3] * 20,}) + c.create_table("df", dd.from_pandas(df, npartitions=10)) + + df_result = ( + c.sql("SELECT * FROM df ORDER BY a NULLS FIRST, b ASC NULLS FIRST") + .compute() + .reset_index(drop=True) + ) + + assert_frame_equal( + df_result, + pd.DataFrame( + { + "a": [float("nan")] * 30 + [1] * 30, + "b": [1] * 10 + [2] * 10 + [3] * 10 + [1] * 10 + [2] * 10 + [3] * 10, + } + ), + check_names=False, + ) + + def test_sort_strings(c): string_table = pd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]}) c.create_table("string_table", string_table)