diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 1170ba07c..5857a3321 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -2,8 +2,14 @@ import dask.dataframe as dd import pandas as pd +from dask.utils import M -from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +from dask_sql.utils import make_pickable_without_dask_sql + +try: + import dask_cudf +except ImportError: + dask_cudf = None def apply_sort( @@ -12,6 +18,46 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: + # if we have a single partition, we can sometimes sort with map_partitions + if df.npartitions == 1: + if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame): + # cudf only supports null positioning if `ascending` is a single boolean: + # https://github.com/rapidsai/cudf/issues/9400 + if (all(sort_ascending) or not any(sort_ascending)) and not any( + sort_null_first[1:] + ): + return df.map_partitions( + M.sort_values, + by=sort_columns, + ascending=all(sort_ascending), + na_position="first" if sort_null_first[0] else "last", + ) + if not any(sort_null_first): + return df.map_partitions( + M.sort_values, by=sort_columns, ascending=sort_ascending + ) + elif not any(sort_null_first[1:]): + return df.map_partitions( + M.sort_values, + by=sort_columns, + ascending=sort_ascending, + na_position="first" if sort_null_first[0] else "last", + ) + + # dask-cudf only supports ascending sort / nulls last: + # https://github.com/rapidsai/cudf/pull/9250 + # https://github.com/rapidsai/cudf/pull/9264 + if ( + dask_cudf is not None + and isinstance(df, dask_cudf.DataFrame) + and all(sort_ascending) + and not any(sort_null_first) + ): + try: + return df.sort_values(sort_columns, ignore_index=True) + except ValueError: + pass + # Split the first column. We need to handle this one with set_index first_sort_column = sort_columns[0] first_sort_ascending = sort_ascending[0] diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 4566d3690..7fc3ff32b 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -9,6 +9,11 @@ from dask.distributed import Client from pandas.testing import assert_frame_equal +try: + import cudf +except ImportError: + cudf = None + @pytest.fixture() def timeseries_df(c): @@ -86,6 +91,21 @@ def datetime_table(): ) +@pytest.fixture() +def gpu_user_table_1(user_table_1): + return cudf.from_pandas(user_table_1) if cudf else None + + +@pytest.fixture() +def gpu_df(df): + return cudf.from_pandas(df) if cudf else None + + +@pytest.fixture() +def gpu_long_table(long_table): + return cudf.from_pandas(long_table) if cudf else None + + @pytest.fixture() def c( df_simple, @@ -97,6 +117,9 @@ def c( user_table_nan, string_table, datetime_table, + gpu_user_table_1, + gpu_df, + gpu_long_table, ): dfs = { "df_simple": df_simple, @@ -108,6 +131,9 @@ def c( "user_table_nan": user_table_nan, "string_table": string_table, "datetime_table": datetime_table, + "gpu_user_table_1": gpu_user_table_1, + "gpu_df": gpu_df, + "gpu_long_table": gpu_long_table, } # Lazy import, otherwise the pytest framework has problems @@ -115,6 +141,8 @@ def c( c = Context() for df_name, df in dfs.items(): + if df is None: + continue dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index 2165699ca..eb9c18337 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -2,6 +2,11 @@ import pytest from pandas.testing import assert_frame_equal +try: + import cudf +except ImportError: + cudf = None + def test_schemas(c): df = c.sql("SHOW SCHEMAS") @@ -36,6 +41,21 @@ def test_tables(c): "string_table", "datetime_table", ] + if cudf is None + else [ + "df", + "df_simple", + "user_table_1", + "user_table_2", + "long_table", + "user_table_inf", + "user_table_nan", + "string_table", + "datetime_table", + "gpu_user_table_1", + "gpu_df", + "gpu_long_table", + ] } ) diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index 34dc94e4f..fb8af592c 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -15,12 +15,12 @@ def test_sort(c, user_table_1, df): ORDER BY b, user_id DESC """ ) - df_result = df_result.compute().reset_index(drop=True) - df_expected = user_table_1.sort_values( - ["b", "user_id"], ascending=[True, False] - ).reset_index(drop=True) + df_result = df_result.compute() + df_expected = user_table_1.sort_values(["b", "user_id"], ascending=[True, False]) - assert_frame_equal(df_result, df_expected) + assert_frame_equal( + df_result.reset_index(drop=True), df_expected.reset_index(drop=True) + ) df_result = c.sql( """ @@ -87,17 +87,23 @@ def test_sort_by_alias(c, user_table_1): assert_frame_equal(df_result, df_expected) -def test_sort_with_nan(): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_with_nan(gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd + c = Context() - df = pd.DataFrame( + df = xd.DataFrame( {"a": [1, 2, float("nan"), 2], "b": [4, float("nan"), 5, float("inf")]} ) c.create_table("df", df) df_result = c.sql("SELECT * FROM df ORDER BY a").compute().reset_index(drop=True) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -107,9 +113,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [float("nan"), 1, 2, 2], "b": [5, 4, float("nan"), float("inf")]} ), ) @@ -117,9 +123,9 @@ def test_sort_with_nan(): df_result = ( c.sql("SELECT * FROM df ORDER BY a NULLS LAST").compute().reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -127,9 +133,9 @@ def test_sort_with_nan(): df_result = ( c.sql("SELECT * FROM df ORDER BY a ASC").compute().reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -139,9 +145,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [float("nan"), 1, 2, 2], "b": [5, 4, float("nan"), float("inf")]} ), ) @@ -151,9 +157,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -161,10 +167,10 @@ def test_sort_with_nan(): df_result = ( c.sql("SELECT * FROM df ORDER BY a DESC").compute().reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( - {"a": [float("nan"), 2, 2, 1], "b": [5, float("inf"), float("nan"), 4]} + xd.DataFrame( + {"a": [float("nan"), 2, 2, 1], "b": [5, float("nan"), float("inf"), 4]} ), ) @@ -173,10 +179,10 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( - {"a": [float("nan"), 2, 2, 1], "b": [5, float("inf"), float("nan"), 4]} + xd.DataFrame( + {"a": [float("nan"), 2, 2, 1], "b": [5, float("nan"), float("inf"), 4]} ), ) @@ -185,17 +191,23 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( - {"a": [2, 2, 1, float("nan")], "b": [float("inf"), float("nan"), 4, 5]} + xd.DataFrame( + {"a": [2, 2, 1, float("nan")], "b": [float("nan"), float("inf"), 4, 5]} ), ) -def test_sort_with_nan_more_columns(): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_with_nan_more_columns(gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd + c = Context() - df = pd.DataFrame( + df = xd.DataFrame( { "a": [1, 1, 2, 2, float("nan"), float("nan")], "b": [1, 1, 2, float("nan"), float("inf"), 5], @@ -211,9 +223,7 @@ def test_sort_with_nan_more_columns(): .c.compute() .reset_index(drop=True) ) - assert_series_equal( - df_result, pd.Series([5, 6, float("nan"), 1, 3, 4]), check_names=False - ) + dd.assert_eq(df_result, xd.Series([5, 6, float("nan"), 1, 3, 4]), check_names=False) df_result = ( c.sql( @@ -222,14 +232,18 @@ def test_sort_with_nan_more_columns(): .c.compute() .reset_index(drop=True) ) - assert_series_equal( - df_result, pd.Series([1, float("nan"), 4, 3, 5, 6]), check_names=False - ) + dd.assert_eq(df_result, xd.Series([1, float("nan"), 4, 3, 5, 6]), check_names=False) + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_with_nan_many_partitions(gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd -def test_sort_with_nan_many_partitions(): c = Context() - df = pd.DataFrame({"a": [float("nan"), 1] * 30, "b": [1, 2, 3] * 20,}) + df = xd.DataFrame({"a": [float("nan"), 1] * 30, "b": [1, 2, 3] * 20,}) c.create_table("df", dd.from_pandas(df, npartitions=10)) df_result = ( @@ -238,9 +252,9 @@ def test_sort_with_nan_many_partitions(): .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( { "a": [float("nan")] * 30 + [1] * 30, "b": [1] * 10 + [2] * 10 + [3] * 10 + [1] * 10 + [2] * 10 + [3] * 10, @@ -248,16 +262,22 @@ def test_sort_with_nan_many_partitions(): ), ) - df = pd.DataFrame({"a": [float("nan"), 1] * 30}) + df = xd.DataFrame({"a": [float("nan"), 1] * 30}) c.create_table("df", dd.from_pandas(df, npartitions=10)) df_result = c.sql("SELECT * FROM df ORDER BY a").compute().reset_index(drop=True) - assert_frame_equal(df_result, pd.DataFrame({"a": [1] * 30 + [float("nan")] * 30,})) + dd.assert_eq(df_result, xd.DataFrame({"a": [1] * 30 + [float("nan")] * 30,})) -def test_sort_strings(c): - string_table = pd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]}) +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_strings(c, gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd + + string_table = xd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]}) c.create_table("string_table", string_table) df_result = c.sql( @@ -271,13 +291,19 @@ def test_sort_strings(c): df_result = df_result.compute().reset_index(drop=True) df_expected = string_table.sort_values(["a"], ascending=True).reset_index(drop=True) - assert_frame_equal(df_result, df_expected) + dd.assert_eq(df_result, df_expected) + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_not_allowed(c, gpu): + if gpu: + table_name = "gpu_user_table_1" + else: + table_name = "user_table_1" -def test_sort_not_allowed(c): # Wrong column with pytest.raises(Exception): - c.sql("SELECT * FROM user_table_1 ORDER BY 42") + c.sql(f"SELECT * FROM {table_name} ORDER BY 42") def test_limit(c, long_table): @@ -310,3 +336,113 @@ def test_limit(c, long_table): df_result = df_result.compute() assert_frame_equal(df_result, long_table.iloc[101 : 101 + 101]) + + +@pytest.mark.gpu +def test_sort_gpu(c, gpu_user_table_1, gpu_df): + df_result = c.sql( + """ + SELECT + * + FROM gpu_user_table_1 + ORDER BY b, user_id DESC + """ + ) + df_result = df_result.compute() + df_expected = gpu_user_table_1.sort_values( + ["b", "user_id"], ascending=[True, False] + ) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + df_result = c.sql( + """ + SELECT + * + FROM gpu_df + ORDER BY b DESC, a DESC + """ + ) + df_result = df_result.compute() + df_expected = gpu_df.sort_values(["b", "a"], ascending=[False, False]) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + df_result = c.sql( + """ + SELECT + * + FROM gpu_df + ORDER BY a DESC, b + """ + ) + df_result = df_result.compute() + df_expected = gpu_df.sort_values(["a", "b"], ascending=[False, True]) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + df_result = c.sql( + """ + SELECT + * + FROM gpu_df + ORDER BY b, a + """ + ) + df_result = df_result.compute() + df_expected = gpu_df.sort_values(["b", "a"], ascending=[True, True]) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + +@pytest.mark.gpu +def test_sort_gpu_by_alias(c, gpu_user_table_1): + df_result = c.sql( + """ + SELECT + b AS my_column + FROM gpu_user_table_1 + ORDER BY my_column, user_id DESC + """ + ) + df_result = ( + df_result.compute().reset_index(drop=True).rename(columns={"my_column": "b"}) + ) + df_expected = gpu_user_table_1.sort_values( + ["b", "user_id"], ascending=[True, False] + ).reset_index(drop=True)[["b"]] + + dd.assert_eq(df_result, df_expected) + + +@pytest.mark.gpu +def test_limit_gpu(c, gpu_long_table): + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 101") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[:101]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 200") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[:200]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 100") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[:100]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 100 OFFSET 99") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[99 : 99 + 100]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 100 OFFSET 100") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[100 : 100 + 100]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 101 OFFSET 101") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[101 : 101 + 101])