diff --git a/conftest.py b/conftest.py index 62559c061..ef816512d 100644 --- a/conftest.py +++ b/conftest.py @@ -14,7 +14,7 @@ def pytest_addoption(parser): def pytest_runtest_setup(item): # TODO: get pyarrow strings and p2p shuffle working dask.config.set({"dataframe.convert-string": False}) - dask.config.set({"dataframe.shuffle.algorithm": "tasks"}) + dask.config.set({"dataframe.shuffle.method": "tasks"}) if "gpu" in item.keywords: if not item.config.getoption("--rungpu"): pytest.skip("need --rungpu option to run") diff --git a/dask_sql/context.py b/dask_sql/context.py index d20e919e9..83d7820b9 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -853,7 +853,7 @@ def _get_ral(self, sql): except DFOptimizationException as oe: # Use original plan and warn about inability to optimize plan rel = nonOptimizedRel - logger.warn(str(oe)) + logger.warning(str(oe)) else: rel = nonOptimizedRel diff --git a/dask_sql/mappings.py b/dask_sql/mappings.py index ca0e23691..de13cc6e4 100644 --- a/dask_sql/mappings.py +++ b/dask_sql/mappings.py @@ -37,7 +37,7 @@ pd.UInt16Dtype(): SqlTypeName.SMALLINT, np.uint8: SqlTypeName.TINYINT, pd.UInt8Dtype(): SqlTypeName.TINYINT, - np.bool8: SqlTypeName.BOOLEAN, + np.bool_: SqlTypeName.BOOLEAN, pd.BooleanDtype(): SqlTypeName.BOOLEAN, str: SqlTypeName.VARCHAR, np.object_: SqlTypeName.VARCHAR, @@ -55,7 +55,7 @@ "SqlTypeName.INTEGER": np.int32, "SqlTypeName.SMALLINT": np.int16, "SqlTypeName.TINYINT": np.int8, - "SqlTypeName.BOOLEAN": np.bool8, + "SqlTypeName.BOOLEAN": np.bool_, "SqlTypeName.VARCHAR": str, "SqlTypeName.CHAR": str, "SqlTypeName.NULL": type(None), @@ -100,7 +100,7 @@ def python_to_sql_type(python_type) -> "DaskTypeMap": if isinstance(python_type, np.dtype): python_type = python_type.type - if pd.api.types.is_datetime64tz_dtype(python_type): + if isinstance(python_type, pd.DatetimeTZDtype): return DaskTypeMap( SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, unit=str(python_type.unit), @@ -277,8 +277,8 @@ def similar_type(lhs: type, rhs: type) -> bool: is_object = pdt.is_object_dtype is_string = pdt.is_string_dtype is_dt_ns = pdt.is_datetime64_ns_dtype - is_dt_tz = lambda t: is_dt_ns(t) and pdt.is_datetime64tz_dtype(t) - is_dt_ntz = lambda t: is_dt_ns(t) and not pdt.is_datetime64tz_dtype(t) + is_dt_tz = lambda t: is_dt_ns(t) and isinstance(t, pd.DatetimeTZDtype) + is_dt_ntz = lambda t: is_dt_ns(t) and not isinstance(t, pd.DatetimeTZDtype) is_td_ns = pdt.is_timedelta64_ns_dtype is_bool = pdt.is_bool_dtype @@ -334,8 +334,8 @@ def cast_column_to_type(col: dd.Series, expected_type: str): pdt = pd.api.types is_dt_ns = pdt.is_datetime64_ns_dtype - is_dt_tz = lambda t: is_dt_ns(t) and pdt.is_datetime64tz_dtype(t) - is_dt_ntz = lambda t: is_dt_ns(t) and not pdt.is_datetime64tz_dtype(t) + is_dt_tz = lambda t: is_dt_ns(t) and isinstance(t, pd.DatetimeTZDtype) + is_dt_ntz = lambda t: is_dt_ns(t) and not isinstance(t, pd.DatetimeTZDtype) current_type = col.dtype diff --git a/dask_sql/physical/rel/logical/window.py b/dask_sql/physical/rel/logical/window.py index 42b0f9613..adebed8c1 100644 --- a/dask_sql/physical/rel/logical/window.py +++ b/dask_sql/physical/rel/logical/window.py @@ -341,7 +341,9 @@ def _apply_window( # TODO: That is a bit of a hack. We should really use the real column dtype meta = df._meta.assign(**{col: 0.0 for col in newly_created_columns}) - df = df.groupby(group_columns, dropna=False).apply(filled_map, meta=meta) + df = df.groupby(group_columns, dropna=False)[df.columns.tolist()].apply( + filled_map, meta=meta + ) logger.debug( f"Having created a dataframe {LoggableDataFrame(df)} after windowing. Will now drop {temporary_columns}." ) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 8db8ca048..a6e3ac98e 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -16,7 +16,7 @@ from dask.highlevelgraph import HighLevelGraph from dask.utils import random_state_data -from dask_sql._compat import DASK_CUDF_TODATETIME_SUPPORT, PANDAS_GT_200 +from dask_sql._compat import DASK_CUDF_TODATETIME_SUPPORT from dask_sql._datafusion_lib import SqlTypeName from dask_sql.datacontainer import DataContainer from dask_sql.mappings import ( @@ -311,7 +311,7 @@ def false_( Returns false on nan. """ if is_frame(df): - return ~df.fillna(True) + return ~df.astype("boolean").fillna(True) return not pd.isna(df) and df is not None and not np.isnan(df) and not bool(df) @@ -331,7 +331,7 @@ def true_( Returns false on nan. """ if is_frame(df): - return df.fillna(False) + return df.astype("boolean").fillna(False) return not pd.isna(df) and df is not None and not np.isnan(df) and bool(df) @@ -794,11 +794,11 @@ def _round_datetime(self, *operands): unit_map = { "DAY": "D", - "HOUR": "H", - "MINUTE": "T", - "SECOND": "S", + "HOUR": "h", + "MINUTE": "min", + "SECOND": "s", "MICROSECOND": "U", - "MILLISECOND": "L", + "MILLISECOND": "ms", } try: @@ -960,7 +960,7 @@ def date_part(self, what, df: SeriesOrScalar): elif what in {"SECOND", "SECONDS"}: return df.second elif what in {"WEEK", "WEEKS"}: - return df.isocalendar().week if PANDAS_GT_200 else df.week + return df.isocalendar().week elif what in {"YEAR", "YEARS"}: return df.year elif what == "DATE": diff --git a/dask_sql/server/presto_jdbc.py b/dask_sql/server/presto_jdbc.py index 02f77a1b4..56c1d919c 100644 --- a/dask_sql/server/presto_jdbc.py +++ b/dask_sql/server/presto_jdbc.py @@ -25,7 +25,7 @@ def create_meta_data(c: Context): """ if c is None: - logger.warn("Context None: jdbc meta data not created") + logger.warning("Context None: jdbc meta data not created") return catalog = "" system_schema = "system_jdbc" diff --git a/docs/source/sql.rst b/docs/source/sql.rst index b56818b56..0ed924495 100644 --- a/docs/source/sql.rst +++ b/docs/source/sql.rst @@ -112,7 +112,7 @@ For this, it uses the following mapping: +-----------------------+----------------+ | From Python Type | To SQL Type | +=======================+================+ -| ``np.bool8`` | ``BOOLEAN`` | +| ``np.bool_`` | ``BOOLEAN`` | +-----------------------+----------------+ | ``np.datetime64`` | ``TIMESTAMP`` | +-----------------------+----------------+ diff --git a/pyproject.toml b/pyproject.toml index 75ec4519f..ccb507a09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,3 +89,18 @@ locked = true [tool.isort] profile = "black" + +[tool.pytest.ini_options] +markers = [ + "gpu: marks tests that require GPUs (skipped by default, run with --rungpu)", + "queries: marks tests that run test queries (skipped by default, run with --runqueries)", +] +addopts = "-v -rsxfE --color=yes --cov dask_sql --cov-config=.coveragerc --cov-report=term-missing" +filterwarnings = [ + "error:::dask_sql[.*]", + "error:::dask[.*]", + "ignore:Need to do a cross-join:ResourceWarning:dask_sql[.*]", + "ignore:Dask doesn't support Dask frames:ResourceWarning:dask_sql[.*]", + "ignore:Running on a single-machine scheduler:UserWarning:dask[.*]", +] +xfail_strict = true diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 168bed5ee..000000000 --- a/pytest.ini +++ /dev/null @@ -1,11 +0,0 @@ -[pytest] -addopts = - --cov dask_sql - --cov-config=.coveragerc - --cov-report=term-missing -testpaths = - tests -markers = - gpu: marks tests that require GPUs (skipped by default, run with '--rungpu') - queries: marks tests that run test queries (skipped by default, run with '--runqueries') -xfail_strict=true diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index cd4e38928..669260215 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -8,7 +8,7 @@ from dask.datasets import timeseries as dd_timeseries from dask.distributed import Client -from tests.utils import assert_eq +from tests.utils import assert_eq, convert_nullable_columns try: import cudf @@ -333,20 +333,17 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs): # allow that the names are different # as expressions are handled differently - dask_result.columns = sql_result.columns + sql_result.columns = dask_result.columns - # replace all pd.NA scalars, which are resistent to - # check_dype=False and .astype() - dask_result = dask_result.replace({pd.NA: None}) + sql_result = sql_result.convert_dtypes() + dask_result = dask_result.convert_dtypes() - if sort_columns: - sql_result = sql_result.sort_values(sort_columns) - dask_result = dask_result.sort_values(sort_columns) + convert_nullable_columns(sql_result) + convert_nullable_columns(dask_result) - sql_result = sql_result.reset_index(drop=True) - dask_result = dask_result.reset_index(drop=True) - - assert_eq(sql_result, dask_result, check_dtype=False, **kwargs) + assert_eq( + sql_result, dask_result, check_dtype=False, check_index=False, **kwargs + ) return _assert_query_gives_same_result diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py index 7b8808629..e9d372c88 100644 --- a/tests/integration/test_compatibility.py +++ b/tests/integration/test_compatibility.py @@ -19,22 +19,10 @@ from dask_sql import Context from dask_sql.utils import ParsingException -from tests.utils import assert_eq +from tests.utils import assert_eq, convert_nullable_columns -def cast_datetime_to_string(df): - cols = df.select_dtypes(include=["datetime64[ns]"]).columns.tolist() - - if not cols: - return df - - for col in cols: - df[col] = df[col].dt.strftime("%Y-%m-%d %H:%M:%S") - - return df - - -def eq_sqlite(sql, check_index=True, **dfs): +def eq_sqlite(sql, **dfs): c = Context() engine = sqlite3.connect(":memory:") @@ -42,18 +30,21 @@ def eq_sqlite(sql, check_index=True, **dfs): c.create_table(name, df) df.to_sql(name, engine, index=False) - dask_result = c.sql(sql).reset_index(drop=True) - sqlite_result = pd.read_sql(sql, engine).reset_index(drop=True) + dask_result = c.sql(sql).compute().convert_dtypes() + sqlite_result = pd.read_sql(sql, engine).convert_dtypes() + + convert_nullable_columns(dask_result) + convert_nullable_columns(sqlite_result) - # casting to object to ensure equality with sql-lite - # which returns object dtype for datetime inputs - dask_result = cast_datetime_to_string(dask_result) + datetime_cols = dask_result.select_dtypes( + include=["datetime64[ns]"] + ).columns.tolist() + for col in datetime_cols: + sqlite_result[col] = pd.to_datetime(sqlite_result[col]) - # Make sure SQL and Dask use the same "NULL" value - dask_result = dask_result.fillna(np.NaN) - sqlite_result = sqlite_result.fillna(np.NaN) + sqlite_result = sqlite_result.astype(dask_result.dtypes) - assert_eq(dask_result, sqlite_result, check_dtype=False, check_index=check_index) + assert_eq(dask_result, sqlite_result, check_dtype=False, check_index=False) def make_rand_df(size: int, **kwargs): @@ -953,7 +944,6 @@ def test_union(): UNION ALL SELECT * FROM c ORDER BY b NULLS FIRST, c NULLS FIRST """, - check_index=False, a=a, b=b, c=c, diff --git a/tests/integration/test_groupby.py b/tests/integration/test_groupby.py index 92a2464ab..341e9ef33 100644 --- a/tests/integration/test_groupby.py +++ b/tests/integration/test_groupby.py @@ -171,7 +171,7 @@ def test_group_by_case(c): ) -def test_group_by_nan(c): +def test_group_by_nan(c, user_table_nan): return_df = c.sql( """ SELECT @@ -180,7 +180,7 @@ def test_group_by_nan(c): GROUP BY c """ ) - expected_df = pd.DataFrame({"c": [3, float("nan"), 1]}) + expected_df = user_table_nan.drop_duplicates(subset=["c"]) # we return nullable int dtype instead of float assert_eq(return_df, expected_df, check_dtype=False) diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index e47721108..8254ccbfe 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -364,6 +364,9 @@ def test_conditional_join_with_limit(c): assert_eq(actual_df, expected_df, check_index=False) +@pytest.mark.filterwarnings( + "ignore:You are merging on int and float:UserWarning:dask.dataframe.multi" +) def test_intersect(c): # Join df_simple against itself diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index c341965ce..1dcba616b 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -502,7 +502,9 @@ def test_describe_model(c): .sort_index() ) # test - result = c.sql("DESCRIBE MODEL ex_describe_model")["Params"].apply(lambda x: str(x)) + result = c.sql("DESCRIBE MODEL ex_describe_model")["Params"].apply( + lambda x: str(x), meta=("Params", "object") + ) assert_eq(expected_series, result) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index e262bac96..5f0af726e 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -39,19 +39,20 @@ def test_case(c, df): """ ) expected_df = pd.DataFrame(index=df.index) - expected_df["S1"] = df.a.apply(lambda a: 1 if a == 3 else pd.NA) + expected_df["S1"] = df.a.apply(lambda a: 1 if a == 3 else np.NaN) expected_df["S2"] = df.a.apply(lambda a: a if a > 0 else 1) - expected_df["S3"] = df.a.apply(lambda a: 3 if a == 4 else a + 1) - expected_df["S4"] = df.a.apply(lambda a: 1 if a == 3 else 2 if a > 0 else a) + expected_df["S3"] = df.a.apply(lambda a: 3 if a == 4 else a + 1).astype("Int64") + expected_df["S4"] = df.a.apply(lambda a: 1 if a == 3 else 2 if a > 0 else a).astype( + "Int64" + ) expected_df["S5"] = df.a.apply( lambda a: "in-between" if ((1 <= a < 2) or (a > 2)) else "out-of-range" ) expected_df["S6"] = df.a.apply(lambda a: 42 if ((a < 2) or (3 < a < 4)) else 47) expected_df["S7"] = df.a.apply(lambda a: 1 if (1 < a <= 4) else 0) - expected_df["S8"] = df.a.apply(lambda a: 5 if a == 2 else a + 1) + expected_df["S8"] = df.a.apply(lambda a: 5 if a == 2 else a + 1).astype("Int64") - # Do not check dtypes, as pandas versions are inconsistent here - assert_eq(result_df, expected_df, check_dtype=False) + assert_eq(result_df, expected_df) def test_intervals(c): @@ -392,6 +393,9 @@ def test_null(c): assert_eq(df, expected_df) +@pytest.mark.filterwarnings( + "ignore:divide by zero:RuntimeWarning:dask_sql.physical.rex.core.call" +) @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_coalesce(c, gpu): df = dd.from_pandas( @@ -415,13 +419,14 @@ def test_coalesce(c, gpu): expected_df = pd.DataFrame( { "c1": [3], - "c2": [np.nan], + "c2": [pd.NA], "c3": ["hi"], "c4": ["bye"], "c5": ["1.5"], "c6": [2.0], } ) + expected_df["c2"] = expected_df["c2"].astype("Int8") assert_eq(df, expected_df, check_dtype=False) @@ -456,7 +461,7 @@ def test_boolean_operations(c): ) # turn into a bool column c.create_table("df", df) - df = c.sql( + result_df = c.sql( """ SELECT b IS TRUE AS t, @@ -470,19 +475,15 @@ def test_boolean_operations(c): expected_df = pd.DataFrame( { - "t": [True, False, False], - "f": [False, True, False], - "nt": [False, True, True], - "nf": [True, False, True], - "u": [False, False, True], - "nu": [True, True, False], + "t": df.b.astype("boolean").fillna(False), + "f": ~df.b.astype("boolean").fillna(True), + "nt": ~df.b.astype("boolean").fillna(False), + "nf": df.b.astype("boolean").fillna(True), + "u": df.b.isna(), + "nu": ~df.b.isna().astype("boolean"), }, - dtype="bool", ) - expected_df["nt"] = expected_df["nt"].astype("boolean") - expected_df["nf"] = expected_df["nf"].astype("boolean") - expected_df["nu"] = expected_df["nu"].astype("boolean") - assert_eq(df, expected_df) + assert_eq(result_df, expected_df, check_dtype=False) def test_math_operations(c, df): diff --git a/tests/unit/test_call.py b/tests/unit/test_call.py index cb3a4b624..7e867b14f 100644 --- a/tests/unit/test_call.py +++ b/tests/unit/test_call.py @@ -51,11 +51,17 @@ def test_case(): def test_is_true(): op = call.IsTrueOperation() - assert_eq(op(df1.a > 2), pd.Series([False, False, True]), check_names=False) + assert_eq( + op(df1.a > 2), + pd.Series([False, False, True]), + check_names=False, + check_dtype=False, + ) assert_eq( op(df3.a), - pd.Series([True, False, False], dtype="boolean"), + pd.Series([True, False, False]), check_names=False, + check_dtype=False, ) assert op(1) @@ -68,11 +74,17 @@ def test_is_true(): def test_is_false(): op = call.IsFalseOperation() - assert_eq(op(df1.a > 2), pd.Series([True, True, False]), check_names=False) + assert_eq( + op(df1.a > 2), + pd.Series([True, True, False]), + check_names=False, + check_dtype=False, + ) assert_eq( op(df3.a), - pd.Series([False, False, True], dtype="boolean"), + pd.Series([False, False, True]), check_names=False, + check_dtype=False, ) assert not op(1) diff --git a/tests/utils.py b/tests/utils.py index 10622bf50..291c3bc53 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -14,3 +14,22 @@ def assert_eq(*args, **kwargs): kwargs.setdefault("scheduler", scheduler) return _assert_eq(*args, **kwargs) + + +def convert_nullable_columns(df): + """ + Convert certain nullable columns in `df` to non-nullable columns + when trying to handle np.NaN and pd.NA would otherwise cause issues. + """ + dtypes_mapping = { + "Int64": "float64", + "Float64": "float64", + "boolean": "float64", + } + + for dtype in dtypes_mapping: + selected_cols = df.select_dtypes(include=[dtype]).columns.tolist() + if selected_cols: + df[selected_cols] = df[selected_cols].astype(dtypes_mapping[dtype]) + + return df