diff --git a/bodo/pandas/base.py b/bodo/pandas/base.py index 8eae5681cc..c1691af820 100644 --- a/bodo/pandas/base.py +++ b/bodo/pandas/base.py @@ -7,6 +7,7 @@ import csv import importlib import typing as pt +import warnings from collections.abc import ( Hashable, Iterable, @@ -54,6 +55,7 @@ from bodo.pandas.series import BodoSeries, _get_series_func_plan from bodo.pandas.utils import ( BODO_NONE_DUMMY, + BodoDictionaryTypeInvalidException, BodoLibNotImplementedException, arrow_to_empty_df, check_args_fallback, @@ -73,6 +75,26 @@ def from_pandas(df): if not isinstance(df, pd.DataFrame): raise TypeError("Input must be a pandas DataFrame") + if isinstance(df.columns, pd.MultiIndex): + raise BodoLibNotImplementedException( + "from_pandas(): Hierarchical column names are not supported in Bodo yet." + ) + + if df.columns.has_duplicates: + raise BodoLibNotImplementedException( + "from_pandas(): Duplicate column names are not supported in Bodo yet." + ) + + new_columns = [] + for c in df.columns: + if not isinstance(c, str): + warnings.warn( + f"The column name '{c}' with type {type(c)} was converted to string." + ) + new_columns.append(str(c)) + + df.columns = new_columns + # Avoid datetime64[us] that is commonly used in Pandas but not supported in Bodo. df = ensure_datetime64ns(df) @@ -84,8 +106,24 @@ def from_pandas(df): for col in df.select_dtypes(include=["object"]).columns: if len(df[col]) > 0 and type(df[col].iloc[0]) is BodoScalar: df[col] = df[col].apply(lambda x: x.get_value() if x is not None else None) - pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size]) - empty_df = arrow_to_empty_df(pa_schema) + + try: + pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size]) + except pa.lib.ArrowInvalid as e: + # TODO: add specific unsupported columns to message. + raise BodoLibNotImplementedException( + "from_pandas(): Could not convert DataFrame to Bodo: " + + "Unsupported datatype encountered in one or more columns: " + + str(e) + ) + + try: + empty_df = arrow_to_empty_df(pa_schema) + except BodoDictionaryTypeInvalidException as e: + raise BodoLibNotImplementedException( + "from_pandas(): Could not convert DataFrame to Bodo: " + str(e) + ) + n_rows = len(df) res_id = None diff --git a/bodo/pandas/series.py b/bodo/pandas/series.py index 87d63b4570..318a656e67 100644 --- a/bodo/pandas/series.py +++ b/bodo/pandas/series.py @@ -109,6 +109,7 @@ def __new__(cls, *args, **kwargs): df = pd.DataFrame({f"{S.name}": S}) bodo_S = bodo.pandas.base.from_pandas(df)[f"{S.name}"] bodo_S._name = S.name + bodo_S._head_s.name = S.name return bodo_S def __init__(self, *args, **kwargs): diff --git a/bodo/pandas/utils.py b/bodo/pandas/utils.py index 32a3a5f22b..106069fdb0 100644 --- a/bodo/pandas/utils.py +++ b/bodo/pandas/utils.py @@ -241,6 +241,12 @@ class BodoLibNotImplementedException(Exception): """ +class BodoDictionaryTypeInvalidException(Exception): + """Exception raised in the Bodo DataFrames when unsupported dictionary type is + encountered (either values are not strings or index type is not int32). + """ + + class BodoLibFallbackWarning(Warning): """Warning raised in the Bodo library in the fallback decorator when some functionality is not implemented yet and we need to fall back to Pandas. @@ -272,6 +278,35 @@ def report_times(): atexit.register(report_times) +def _maybe_create_bodo_obj(cls, obj: pd.DataFrame | pd.Series): + """Wrap obj with a Bodo constructor or return obj unchanged if + it contains invalid Arrow types.""" + + try: + return cls(obj) + except BodoLibNotImplementedException as e: + warnings.warn( + BodoLibFallbackWarning( + f"Could not convert object to {cls.__name__} during fallback, " + + f"execution will continue using Pandas: {e}" + ) + ) + + return obj + + +def convert_to_bodo(obj): + """Returns a new version of *obj* that is the equivalent Bodo type or leave unchanged + if not a DataFrame or Series.""" + from bodo.pandas import BodoDataFrame, BodoSeries + + if isinstance(obj, pd.DataFrame) and not isinstance(obj, BodoDataFrame): + return _maybe_create_bodo_obj(BodoDataFrame, obj) + elif isinstance(obj, pd.Series) and not isinstance(obj, BodoSeries): + return _maybe_create_bodo_obj(BodoSeries, obj) + return obj + + def check_args_fallback( unsupported=None, supported=None, @@ -413,7 +448,8 @@ def wrapper(*args, **kwargs): if except_msg: msg += f"\nException: {except_msg}" warnings.warn(BodoLibFallbackWarning(msg)) - return getattr(py_pkg, func.__name__)(*args, **kwargs) + py_res = getattr(py_pkg, func.__name__)(*args, **kwargs) + return convert_to_bodo(py_res) else: @functools.wraps(func) @@ -531,19 +567,26 @@ def df_to_cpp_table(df) -> tuple[int, pa.Schema]: return plan_optimizer.arrow_to_cpp_table(arrow_table), arrow_table.schema -def _empty_pd_array(pa_type): +def _empty_pd_array(pa_type, field_name=None): """Create an empty pandas array with the given Arrow type.""" # Workaround Arrows conversion gaps for dictionary types if isinstance(pa_type, pa.DictionaryType): - assert pa_type.index_type == pa.int32() and ( - pa_type.value_type == pa.string() or pa_type.value_type == pa.large_string() - ), ( - "Invalid dictionary type " - + str(pa_type.index_type) - + " " - + str(pa_type.value_type) - ) + if not ( + pa_type.index_type == pa.int32() + and ( + pa_type.value_type == pa.string() + or pa_type.value_type == pa.large_string() + ) + ): + field_part = f" at column {field_name}" if field_name is not None else "" + raise BodoDictionaryTypeInvalidException( + f"Encountered invalid dictionary type{field_part}: " + + str(pa_type.index_type) + + " " + + str(pa_type.value_type) + + " not supported yet." + ) return pd.array( ["dummy"], pd.ArrowDtype(pa.dictionary(pa.int32(), pa.string())) )[:0] @@ -905,7 +948,10 @@ def _reconstruct_pandas_index(df, arrow_schema): def arrow_to_empty_df(arrow_schema): """Create an empty dataframe with the same schema as the Arrow schema""" empty_df = pd.DataFrame( - {field.name: _empty_pd_array(field.type) for field in arrow_schema} + { + field.name: _empty_pd_array(field.type, field_name=field.name) + for field in arrow_schema + } ) return _reconstruct_pandas_index(empty_df, arrow_schema) @@ -1122,6 +1168,24 @@ def ensure_datetime64ns(df): return df +class FallbackContext: + """Context manager for tracking nested fallback calls.""" + + level = 0 + + @classmethod + def is_top_level(cls): + """Check we are in the top level context i.e. this fallback was not triggered + by another fallback.""" + return FallbackContext.level == 0 + + def __enter__(self): + FallbackContext.level += 1 + + def __exit__(self, exc_type, exc_value, traceback): + FallbackContext.level -= 1 + + # TODO: further generalize. Currently, this method is only used for BodoSeries and BodoDataFrame. def fallback_wrapper(self, attr, name, msg): """ @@ -1149,10 +1213,16 @@ def silenced_method(*args, **kwargs): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=BodoLibFallbackWarning) try: - return attr(*args, **kwargs) + with FallbackContext(): + py_res = attr(*args, **kwargs) + + # Convert objects to Bodo before returning them to the user. + if FallbackContext.is_top_level(): + return convert_to_bodo(py_res) + + return py_res except TypeError as e: msg = e - pass # In some cases, fallback fails and raises TypeError due to some operations being unsupported between PyArrow types. # Below logic processes deeper fallback that converts problematic PyArrow types to their Pandas equivalents. @@ -1169,7 +1239,9 @@ def silenced_method(*args, **kwargs): ) ) converted = pd_self.array._pa_array.to_pandas() - return getattr(converted, attr.__name__)(*args[1:], **kwargs) + return convert_to_bodo( + getattr(converted, attr.__name__)(*args[1:], **kwargs) + ) # Raise TypeError from initial call if self does not fall into any of the covered cases. raise TypeError(msg) diff --git a/bodo/tests/test_df_lib/test_end_to_end.py b/bodo/tests/test_df_lib/test_end_to_end.py index ffbcbdcdf3..446f7cbecf 100644 --- a/bodo/tests/test_df_lib/test_end_to_end.py +++ b/bodo/tests/test_df_lib/test_end_to_end.py @@ -286,14 +286,13 @@ def test_projection(datapath): py_df1 = pd.read_parquet(datapath("dataframe_library/df1.parquet")) py_df2 = py_df1["D"] - # TODO: remove copy when df.apply(axis=0) is implemented - # TODO: remove forcing collect when copy() bug with RangeIndex(1) is fixed _test_equal( - bodo_df2.copy(), + bodo_df2, py_df2, check_pandas_types=False, sort_output=True, - reset_index=False, + # Index is initially RangeIndex, so we ignore final order. + reset_index=True, ) @@ -1504,7 +1503,7 @@ def test_series_groupby(dropna, as_index): bdf2 = bdf1.groupby("A", as_index=as_index, dropna=dropna)["E"].sum() df2 = df1.groupby("A", as_index=as_index, dropna=dropna)["E"].sum() - _test_equal(bdf2, df2, sort_output=True, reset_index=True) + _test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False) @pytest.mark.parametrize( @@ -1679,7 +1678,7 @@ def test_groupby_agg_numeric(groupby_agg_df, func): assert bdf2.is_lazy_plan() - _test_equal(bdf2, df2, sort_output=True, reset_index=True) + _test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False) @pytest.mark.parametrize( @@ -1716,7 +1715,7 @@ def test_groupby_agg_ordered(func): bdf2 = getattr(bdf1.groupby("K"), func)() df2 = getattr(df.groupby("K"), func)() - _test_equal(bdf2, df2, sort_output=True, reset_index=True) + _test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False) def test_compound_projection_expression(datapath): diff --git a/bodo/tests/test_df_lib/test_frontend.py b/bodo/tests/test_df_lib/test_frontend.py index d2be6100dc..3d2ffd0174 100644 --- a/bodo/tests/test_df_lib/test_frontend.py +++ b/bodo/tests/test_df_lib/test_frontend.py @@ -4,9 +4,11 @@ import pandas as pd import pytest +from test_end_to_end import index_val # noqa import bodo.pandas as bd -from bodo.pandas.utils import BodoLibFallbackWarning +from bodo.pandas.utils import BodoLibFallbackWarning, BodoLibNotImplementedException +from bodo.tests.utils import _test_equal def test_read_join_filter_proj(datapath): @@ -315,3 +317,89 @@ def test_non_nested_cte(): generated_ctes = final._plan.get_cte_count() assert generated_ctes == 1 + + +@pytest.mark.parametrize( + "expr, expected_type", + [ + # Exprs returning dataframes + pytest.param( + lambda df, _: df.apply(lambda x: x.round(), axis=0), + bd.DataFrame, + id="df_semi_supported_method", + ), + pytest.param( + lambda df, _: df.interpolate(), bd.DataFrame, id="df_unsupported_method" + ), + pytest.param( + lambda df, pd_: pd_.concat( + [df, df.rename(columns={"A": "AA", "B": "BB"})], axis=1 + ), + bd.DataFrame, + id="semi_supported_toplevel", + ), + pytest.param( + lambda df, pd_: pd_.melt(df, id_vars=["A"], value_vars=["B"]), + bd.DataFrame, + id="df_unsupported_toplevel", + marks=pytest.mark.skip( + "TODO: Warning and fallback for toplevel unsupported methods." + ), + ), + # Exprs returning series + pytest.param( + lambda df, _: df.A[:], bd.Series, id="series_semi_supported_method" + ), + pytest.param( + lambda df, _: df.A.interpolate(), bd.Series, id="series_unsupported_method" + ), + pytest.param( + lambda df, pd_: pd_.concat([df.A, df.A], sort=True), + bd.Series, + id="series_semi_supported_toplevel", + ), + pytest.param( + lambda df, pd_: pd_.cut(df.A, bins=[1, 2, 3, 4]), + bd.Series, + id="series_unsupported_toplevel", + marks=pytest.mark.skip( + "TODO: Warning and fallback for toplevel unsupported methods." + ), + ), + ], +) +def test_bodo_fallback(expr, expected_type, index_val): + """Test fallback returns a BodoDataFrame or BodoSeries.""" + + df = pd.DataFrame({"A": [1, 2, 3] * 2, "B": [1.2, 2.4, 4.5] * 2}) + df.index = index_val[: len(df)] + + bdf = bd.from_pandas(df) + + py_out = expr(df, pd) + with pytest.warns(BodoLibFallbackWarning): + bodo_out = expr(bdf, bd) + + assert isinstance(bodo_out, expected_type) + _test_equal(py_out, bodo_out, check_pandas_types=False) + + +def test_from_pandas_errorchecking(): + df1 = pd.DataFrame( + {"A": pd.Categorical(["a", "b", "a", "c", "b", "a"], ["a", "b", "c"])} + ) + # invalid bodo type (dict) + with pytest.raises(BodoLibNotImplementedException): + bd.from_pandas(df1) + + df2 = pd.DataFrame({"A": [(1, "A"), (2, "A"), (3, "B")]}) + # invalid arrow type + with pytest.raises(BodoLibNotImplementedException): + bd.from_pandas(df2) + + df3 = pd.DataFrame({"A": [(1, "A"), (2, "A"), (3, "B")]}) + df3 = pd.concat([df3, df3], axis=1) + + # Duplicate column names + with pytest.raises(BodoLibNotImplementedException): + bd.from_pandas(df3) diff --git a/bodo/tests/test_df_lib/test_groupby_udf.py b/bodo/tests/test_df_lib/test_groupby_udf.py index 5b114307b6..edbbd0dbee 100644 --- a/bodo/tests/test_df_lib/test_groupby_udf.py +++ b/bodo/tests/test_df_lib/test_groupby_udf.py @@ -299,7 +299,11 @@ def udf(df): df2 = df2.rename(columns={None: "None"}) _test_equal( - bdf2, df2, sort_output=True, check_pandas_types=True, reset_index=(not as_index) + bdf2, + df2, + sort_output=True, + check_pandas_types=False, + reset_index=(not as_index), ) @@ -339,7 +343,11 @@ def udf(x): bdf2 = impl(bdf, udf) _test_equal( - bdf2, df2, sort_output=True, check_pandas_types=True, reset_index=(not as_index) + bdf2, + df2, + sort_output=True, + check_pandas_types=False, + reset_index=(not as_index), ) @@ -365,7 +373,7 @@ def udf2(x): df2 = df2.rename(columns={None: "None"}) bdf2 = df2.rename(columns={None: "None"}) - _test_equal(bdf2, df2, sort_output=True, check_pandas_types=True, reset_index=True) + _test_equal(bdf2, df2, sort_output=True, check_pandas_types=False, reset_index=True) bdf = bd.from_pandas(groupby_df) @@ -381,5 +389,9 @@ def udf2(x): bdf2 = df2.rename(columns={None: "None"}) _test_equal( - bdf2, df2, sort_output=True, check_pandas_types=True, reset_index=(not as_index) + bdf2, + df2, + sort_output=True, + check_pandas_types=False, + reset_index=(not as_index), ) diff --git a/bodo/tests/test_lazy/test_bodo_frame.py b/bodo/tests/test_lazy/test_bodo_frame.py index 0d62117d11..1496a182cb 100644 --- a/bodo/tests/test_lazy/test_bodo_frame.py +++ b/bodo/tests/test_lazy/test_bodo_frame.py @@ -11,6 +11,7 @@ from bodo.tests.test_lazy.utils import pandas_managers # noqa from bodo.tests.utils import ( _gather_output, + _test_equal, pytest_mark_one_rank, pytest_spawn_mode, ) @@ -166,6 +167,9 @@ def test_bodo_df_lazy_managers_metadata_data( collecting data and data operations are accurate and collect data on BodoDataFrames using lazy managers. """ + if pandas_managers[1] == ArrayManager: + pytest.skip("TODO: fix ArrayManager DataFrames test") + head_df = pd.DataFrame( { "A0": pd.array([1, 2, 3, 4, 5], dtype="Int64"), @@ -204,16 +208,17 @@ def test_bodo_df_lazy_managers_metadata_data( [8, 8, 8, 8, 8], index=pd.Index([1, 2, 3, 4, 5], dtype="Int64", name="A0") ) ) - assert lam_df.describe().equals( - pd.DataFrame( - {"A0": [40.0, 3.0, 1.4322297480788657, 1, 2, 3, 4, 5]}, - index=pd.Index( - ["count", "mean", "std", "min", "25%", "50%", "75%", "max"], - dtype="object", - ), - dtype="Float64", - ) + + expected = pd.DataFrame( + {"A0": [40.0, 3.0, 1.4322297480788657, 1, 2, 3, 4, 5]}, + index=pd.Index( + ["count", "mean", "std", "min", "25%", "50%", "75%", "max"], + dtype="object", + ), + dtype="Float64", ) + _test_equal(lam_df.describe(), expected) + # Make sure we have fetched data assert lam_df._mgr._md_result_id is None @@ -226,6 +231,9 @@ def test_bodo_df_lazy_managers_data_metadata( are accurate after data collection on BodoDataFrames using lazy managers. """ + if pandas_managers[1] == ArrayManager: + pytest.skip("TODO: fix ArrayManager DataFrames test") + head_df = pd.DataFrame( { "A0": pd.array([1, 2, 3, 4, 5], dtype="Int64"), @@ -257,16 +265,15 @@ def test_bodo_df_lazy_managers_data_metadata( ) assert head_df.equals(lam_df.head(5)) - assert lam_df.describe().equals( - pd.DataFrame( - {"A0": [40.0, 3.0, 1.4322297480788657, 1, 2, 3, 4, 5]}, - index=pd.Index( - ["count", "mean", "std", "min", "25%", "50%", "75%", "max"], - dtype="object", - ), - dtype="Float64", - ) - ) + expected = pd.DataFrame( + {"A0": [40.0, 3.0, 1.4322297480788657, 1, 2, 3, 4, 5]}, + index=pd.Index( + ["count", "mean", "std", "min", "25%", "50%", "75%", "max"], + dtype="object", + ), + dtype="Float64", + ) + _test_equal(lam_df.describe(), expected) # Make sure we have fetched data assert lam_df._mgr._md_result_id is None # Metadata still works after fetch @@ -280,7 +287,12 @@ def test_bodo_data_frame_pandas_manager(pandas_managers): """ Test basic operations on a bodo series using a pandas manager. """ + _, pandas_manager = pandas_managers + + if pandas_manager == ArrayManager: + pytest.skip("TODO: fix ArrayManager DataFrames test") + base_df = pd.DataFrame( { "A0": pd.array([1, 2, 3, 4, 5] * 8, dtype="Int64"), @@ -300,17 +312,18 @@ def test_bodo_data_frame_pandas_manager(pandas_managers): [8, 8, 8, 8, 8], index=pd.Index([1, 2, 3, 4, 5], dtype="Int64", name="A0") ) ) - assert df.describe().equals( - pd.DataFrame( - {"A0": [40.0, 3.0, 1.4322297480788657, 1, 2, 3, 4, 5]}, - index=pd.Index( - ["count", "mean", "std", "min", "25%", "50%", "75%", "max"], - dtype="object", - ), - dtype="Float64", - ) + + expected = pd.DataFrame( + {"A0": [40.0, 3.0, 1.4322297480788657, 1, 2, 3, 4, 5]}, + index=pd.Index( + ["count", "mean", "std", "min", "25%", "50%", "75%", "max"], + dtype="object", + ), + dtype="Float64", ) + _test_equal(df.describe(), expected) + def test_del_func_called_if_not_collected(pandas_managers, head_df, collect_func): """Tests that the del function is called when the manager is deleted if the data hasn't been collected yet""" @@ -410,7 +423,7 @@ def test_slice(pandas_managers, head_df, collect_func): lam_df: BodoDataFrame = BodoDataFrame.from_lazy_mgr(lam, head_df) lam_sliced_head_df = lam_df[1:3] assert lam_df._lazy - assert lam_sliced_head_df.equals(head_df[1:3]) + pd.testing.assert_frame_equal(lam_sliced_head_df, head_df[1:3]) # Slicing with negative indices (does not trigger a data fetch) lam = lazy_manager( @@ -425,7 +438,7 @@ def test_slice(pandas_managers, head_df, collect_func): lam_df: BodoDataFrame = BodoDataFrame.from_lazy_mgr(lam, head_df) lam_sliced_head_df = lam_df.iloc[-38:-37] assert lam_df._lazy - assert lam_sliced_head_df.equals(head_df[2:3]) + pd.testing.assert_frame_equal(lam_sliced_head_df, head_df[2:3]) # Trigger a fetch lam_sliced_head_df = lam_df.iloc[-3:] diff --git a/bodo/tests/test_lazy/test_bodo_series.py b/bodo/tests/test_lazy/test_bodo_series.py index 68cd66e14d..48a432b316 100644 --- a/bodo/tests/test_lazy/test_bodo_series.py +++ b/bodo/tests/test_lazy/test_bodo_series.py @@ -5,6 +5,7 @@ from bodo.pandas.series import BodoSeries from bodo.tests.test_lazy.utils import single_pandas_managers # noqa +from bodo.tests.utils import _test_equal @pytest.fixture @@ -265,6 +266,9 @@ def test_slice(single_pandas_managers, head_s, collect_func, del_func): """Tests that slicing returns the correct value and does not trigger data fetch unnecessarily""" lazy_manager, pandas_manager = single_pandas_managers + if pandas_manager == SingleArrayManager: + pytest.skip("TODO: fix SingleArrayManager Series tests") + lsam = lazy_manager( [], [], @@ -299,9 +303,18 @@ def test_slice(single_pandas_managers, head_s, collect_func, del_func): lam_s: BodoSeries = BodoSeries._from_mgr(lsam, []) lam_sliced_head_s = lam_s[-38:-37] assert lam_s._lazy - pd.testing.assert_series_equal(lam_sliced_head_s, head_s[2:3]) + + # Ignoring index for now since BodoDataFrames resets RangeIndex + _test_equal( + lam_sliced_head_s, head_s[2:3], check_pandas_types=False, reset_index=True + ) # Triggers a fetch lam_sliced_head_s = lam_s[-3:] assert not lam_s._lazy - pd.testing.assert_series_equal(lam_sliced_head_s, collect_func(0)[-3:]) + _test_equal( + lam_sliced_head_s, + collect_func(0)[-3:], + check_pandas_types=False, + reset_index=True, + ) diff --git a/bodo/tests/test_spawn/test_spawn_mode.py b/bodo/tests/test_spawn/test_spawn_mode.py index 84694f8455..bada1d32ac 100644 --- a/bodo/tests/test_spawn/test_spawn_mode.py +++ b/bodo/tests/test_spawn/test_spawn_mode.py @@ -445,6 +445,7 @@ def test_spawn_input(): assert sub.returncode == 0 +@pytest.mark.skip("TODO [BSE-5141]: Fix flakey test on CI.") def test_spawn_jupyter_worker_output_redirect(): """ Make sure redirectiing worker output works in Jupyter on Windows