Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions bodo/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import csv
import importlib
import typing as pt
import warnings
from collections.abc import (
Hashable,
Iterable,
Expand Down Expand Up @@ -54,6 +55,7 @@
from bodo.pandas.series import BodoSeries, _get_series_func_plan
from bodo.pandas.utils import (
BODO_NONE_DUMMY,
BodoDictionaryTypeInvalidException,
BodoLibNotImplementedException,
arrow_to_empty_df,
check_args_fallback,
Expand All @@ -73,6 +75,26 @@ def from_pandas(df):
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")

if isinstance(df.columns, pd.MultiIndex):
raise BodoLibNotImplementedException(
"from_pandas(): Hierarchical column names are not supported in Bodo yet."
)

if df.columns.has_duplicates:
raise BodoLibNotImplementedException(
"from_pandas(): Duplicate column names are not supported in Bodo yet."
)

new_columns = []
for c in df.columns:
if not isinstance(c, str):
warnings.warn(
f"The column name '{c}' with type {type(c)} was converted to string."
)
new_columns.append(str(c))

df.columns = new_columns

# Avoid datetime64[us] that is commonly used in Pandas but not supported in Bodo.
df = ensure_datetime64ns(df)

Expand All @@ -84,8 +106,24 @@ def from_pandas(df):
for col in df.select_dtypes(include=["object"]).columns:
if len(df[col]) > 0 and type(df[col].iloc[0]) is BodoScalar:
df[col] = df[col].apply(lambda x: x.get_value() if x is not None else None)
pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size])
empty_df = arrow_to_empty_df(pa_schema)

try:
pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size])
except pa.lib.ArrowInvalid as e:
# TODO: add specific unsupported columns to message.
raise BodoLibNotImplementedException(
"from_pandas(): Could not convert DataFrame to Bodo: "
+ "Unsupported datatype encountered in one or more columns: "
+ str(e)
)

try:
empty_df = arrow_to_empty_df(pa_schema)
except BodoDictionaryTypeInvalidException as e:
raise BodoLibNotImplementedException(
"from_pandas(): Could not convert DataFrame to Bodo: " + str(e)
)

n_rows = len(df)

res_id = None
Expand Down
1 change: 1 addition & 0 deletions bodo/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __new__(cls, *args, **kwargs):
df = pd.DataFrame({f"{S.name}": S})
bodo_S = bodo.pandas.base.from_pandas(df)[f"{S.name}"]
bodo_S._name = S.name
bodo_S._head_s.name = S.name
return bodo_S

def __init__(self, *args, **kwargs):
Expand Down
100 changes: 86 additions & 14 deletions bodo/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ class BodoLibNotImplementedException(Exception):
"""


class BodoDictionaryTypeInvalidException(Exception):
"""Exception raised in the Bodo DataFrames when unsupported dictionary type is
encountered (either values are not strings or index type is not int32).
"""


class BodoLibFallbackWarning(Warning):
"""Warning raised in the Bodo library in the fallback decorator when some
functionality is not implemented yet and we need to fall back to Pandas.
Expand Down Expand Up @@ -272,6 +278,35 @@ def report_times():
atexit.register(report_times)


def _maybe_create_bodo_obj(cls, obj: pd.DataFrame | pd.Series):
"""Wrap obj with a Bodo constructor or return obj unchanged if
it contains invalid Arrow types."""

try:
return cls(obj)
except BodoLibNotImplementedException as e:
warnings.warn(
BodoLibFallbackWarning(
f"Could not convert object to {cls.__name__} during fallback, "
+ f"execution will continue using Pandas: {e}"
)
)

return obj


def convert_to_bodo(obj):
"""Returns a new version of *obj* that is the equivalent Bodo type or leave unchanged
if not a DataFrame or Series."""
from bodo.pandas import BodoDataFrame, BodoSeries

if isinstance(obj, pd.DataFrame) and not isinstance(obj, BodoDataFrame):
return _maybe_create_bodo_obj(BodoDataFrame, obj)
elif isinstance(obj, pd.Series) and not isinstance(obj, BodoSeries):
return _maybe_create_bodo_obj(BodoSeries, obj)
return obj


def check_args_fallback(
unsupported=None,
supported=None,
Expand Down Expand Up @@ -413,7 +448,8 @@ def wrapper(*args, **kwargs):
if except_msg:
msg += f"\nException: {except_msg}"
warnings.warn(BodoLibFallbackWarning(msg))
return getattr(py_pkg, func.__name__)(*args, **kwargs)
py_res = getattr(py_pkg, func.__name__)(*args, **kwargs)
return convert_to_bodo(py_res)
else:

@functools.wraps(func)
Expand Down Expand Up @@ -531,19 +567,26 @@ def df_to_cpp_table(df) -> tuple[int, pa.Schema]:
return plan_optimizer.arrow_to_cpp_table(arrow_table), arrow_table.schema


def _empty_pd_array(pa_type):
def _empty_pd_array(pa_type, field_name=None):
"""Create an empty pandas array with the given Arrow type."""

# Workaround Arrows conversion gaps for dictionary types
if isinstance(pa_type, pa.DictionaryType):
assert pa_type.index_type == pa.int32() and (
pa_type.value_type == pa.string() or pa_type.value_type == pa.large_string()
), (
"Invalid dictionary type "
+ str(pa_type.index_type)
+ " "
+ str(pa_type.value_type)
)
if not (
pa_type.index_type == pa.int32()
and (
pa_type.value_type == pa.string()
or pa_type.value_type == pa.large_string()
)
):
field_part = f" at column {field_name}" if field_name is not None else ""
raise BodoDictionaryTypeInvalidException(
f"Encountered invalid dictionary type{field_part}: "
+ str(pa_type.index_type)
+ " "
+ str(pa_type.value_type)
+ " not supported yet."
)
return pd.array(
["dummy"], pd.ArrowDtype(pa.dictionary(pa.int32(), pa.string()))
)[:0]
Expand Down Expand Up @@ -905,7 +948,10 @@ def _reconstruct_pandas_index(df, arrow_schema):
def arrow_to_empty_df(arrow_schema):
"""Create an empty dataframe with the same schema as the Arrow schema"""
empty_df = pd.DataFrame(
{field.name: _empty_pd_array(field.type) for field in arrow_schema}
{
field.name: _empty_pd_array(field.type, field_name=field.name)
for field in arrow_schema
}
)
return _reconstruct_pandas_index(empty_df, arrow_schema)

Expand Down Expand Up @@ -1122,6 +1168,24 @@ def ensure_datetime64ns(df):
return df


class FallbackContext:
"""Context manager for tracking nested fallback calls."""

level = 0

@classmethod
def is_top_level(cls):
"""Check we are in the top level context i.e. this fallback was not triggered
by another fallback."""
return FallbackContext.level == 0

def __enter__(self):
FallbackContext.level += 1

def __exit__(self, exc_type, exc_value, traceback):
FallbackContext.level -= 1


# TODO: further generalize. Currently, this method is only used for BodoSeries and BodoDataFrame.
def fallback_wrapper(self, attr, name, msg):
"""
Expand Down Expand Up @@ -1149,10 +1213,16 @@ def silenced_method(*args, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=BodoLibFallbackWarning)
try:
return attr(*args, **kwargs)
with FallbackContext():
py_res = attr(*args, **kwargs)

# Convert objects to Bodo before returning them to the user.
if FallbackContext.is_top_level():
return convert_to_bodo(py_res)
Copy link
Contributor Author

@scott-routledge2 scott-routledge2 Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My observation was that Pandas methods call a lot of internal functions we do not support (example: xs, copy), so we can keep the DataFrame as Pandas for the internal calls and only convert when returning back to the user.

This is also currently hiding a small bug in a lot of the tests that I haven't figured out yet, but seemed minor to me:

df = pd.DataFrame({"A": [1, 2, 3], "B": ["a", "b", "c"]}, index = [1,2,3])
bdf = bd.from_pandas(df)

bdf1 = bdf.rename_axis("index123")
bdf2 = bdf1.copy()
print("bodo result: ", bdf2.index.name)

pdf1 = df.rename_axis("index123")
pdf2 = pdf1.copy()
print("pandas result: ", pdf2.index.name)

Pandas result: "index123", Bodo result: None (index name doesn't propagate in some places)


return py_res
except TypeError as e:
msg = e
pass

# In some cases, fallback fails and raises TypeError due to some operations being unsupported between PyArrow types.
# Below logic processes deeper fallback that converts problematic PyArrow types to their Pandas equivalents.
Expand All @@ -1169,7 +1239,9 @@ def silenced_method(*args, **kwargs):
)
)
converted = pd_self.array._pa_array.to_pandas()
return getattr(converted, attr.__name__)(*args[1:], **kwargs)
return convert_to_bodo(
getattr(converted, attr.__name__)(*args[1:], **kwargs)
)

# Raise TypeError from initial call if self does not fall into any of the covered cases.
raise TypeError(msg)
Expand Down
13 changes: 6 additions & 7 deletions bodo/tests/test_df_lib/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,13 @@ def test_projection(datapath):
py_df1 = pd.read_parquet(datapath("dataframe_library/df1.parquet"))
py_df2 = py_df1["D"]

# TODO: remove copy when df.apply(axis=0) is implemented
# TODO: remove forcing collect when copy() bug with RangeIndex(1) is fixed
_test_equal(
bodo_df2.copy(),
bodo_df2,
py_df2,
check_pandas_types=False,
sort_output=True,
reset_index=False,
# Index is initially RangeIndex, so we ignore final order.
reset_index=True,
)


Expand Down Expand Up @@ -1504,7 +1503,7 @@ def test_series_groupby(dropna, as_index):
bdf2 = bdf1.groupby("A", as_index=as_index, dropna=dropna)["E"].sum()
df2 = df1.groupby("A", as_index=as_index, dropna=dropna)["E"].sum()

_test_equal(bdf2, df2, sort_output=True, reset_index=True)
_test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1679,7 +1678,7 @@ def test_groupby_agg_numeric(groupby_agg_df, func):

assert bdf2.is_lazy_plan()

_test_equal(bdf2, df2, sort_output=True, reset_index=True)
_test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1716,7 +1715,7 @@ def test_groupby_agg_ordered(func):
bdf2 = getattr(bdf1.groupby("K"), func)()
df2 = getattr(df.groupby("K"), func)()

_test_equal(bdf2, df2, sort_output=True, reset_index=True)
_test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False)


def test_compound_projection_expression(datapath):
Expand Down
90 changes: 89 additions & 1 deletion bodo/tests/test_df_lib/test_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import pandas as pd
import pytest
from test_end_to_end import index_val # noqa

import bodo.pandas as bd
from bodo.pandas.utils import BodoLibFallbackWarning
from bodo.pandas.utils import BodoLibFallbackWarning, BodoLibNotImplementedException
from bodo.tests.utils import _test_equal


def test_read_join_filter_proj(datapath):
Expand Down Expand Up @@ -315,3 +317,89 @@ def test_non_nested_cte():

generated_ctes = final._plan.get_cte_count()
assert generated_ctes == 1


@pytest.mark.parametrize(
"expr, expected_type",
[
# Exprs returning dataframes
pytest.param(
lambda df, _: df.apply(lambda x: x.round(), axis=0),
bd.DataFrame,
id="df_semi_supported_method",
),
pytest.param(
lambda df, _: df.interpolate(), bd.DataFrame, id="df_unsupported_method"
),
pytest.param(
lambda df, pd_: pd_.concat(
[df, df.rename(columns={"A": "AA", "B": "BB"})], axis=1
),
bd.DataFrame,
id="semi_supported_toplevel",
),
pytest.param(
lambda df, pd_: pd_.melt(df, id_vars=["A"], value_vars=["B"]),
bd.DataFrame,
id="df_unsupported_toplevel",
marks=pytest.mark.skip(
"TODO: Warning and fallback for toplevel unsupported methods."
),
),
# Exprs returning series
pytest.param(
lambda df, _: df.A[:], bd.Series, id="series_semi_supported_method"
),
pytest.param(
lambda df, _: df.A.interpolate(), bd.Series, id="series_unsupported_method"
),
pytest.param(
lambda df, pd_: pd_.concat([df.A, df.A], sort=True),
bd.Series,
id="series_semi_supported_toplevel",
),
pytest.param(
lambda df, pd_: pd_.cut(df.A, bins=[1, 2, 3, 4]),
bd.Series,
id="series_unsupported_toplevel",
marks=pytest.mark.skip(
"TODO: Warning and fallback for toplevel unsupported methods."
),
),
],
)
def test_bodo_fallback(expr, expected_type, index_val):
"""Test fallback returns a BodoDataFrame or BodoSeries."""

df = pd.DataFrame({"A": [1, 2, 3] * 2, "B": [1.2, 2.4, 4.5] * 2})
df.index = index_val[: len(df)]

bdf = bd.from_pandas(df)

py_out = expr(df, pd)
with pytest.warns(BodoLibFallbackWarning):
bodo_out = expr(bdf, bd)

assert isinstance(bodo_out, expected_type)
_test_equal(py_out, bodo_out, check_pandas_types=False)


def test_from_pandas_errorchecking():
df1 = pd.DataFrame(
{"A": pd.Categorical(["a", "b", "a", "c", "b", "a"], ["a", "b", "c"])}
)
# invalid bodo type (dict<string, int8>)
with pytest.raises(BodoLibNotImplementedException):
bd.from_pandas(df1)

df2 = pd.DataFrame({"A": [(1, "A"), (2, "A"), (3, "B")]})
# invalid arrow type
with pytest.raises(BodoLibNotImplementedException):
bd.from_pandas(df2)

df3 = pd.DataFrame({"A": [(1, "A"), (2, "A"), (3, "B")]})
df3 = pd.concat([df3, df3], axis=1)

# Duplicate column names
with pytest.raises(BodoLibNotImplementedException):
bd.from_pandas(df3)
Loading