Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bodo/libs/_distributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ std::shared_ptr<array_info> scatter_array(
std::shared_ptr<array_info> out_inds =
scatter_array(in_arr->child_arrays[1], send_counts_ptr, mpi_root,
n_pes, myrank, comm_ptr);
out_arr = create_dict_string_array(dict_arr, out_inds);
out_arr = create_dict_string_array(out_dict, out_inds);

} else if (arr_type == bodo_array_type::TIMESTAMPTZ) {
MPI_Datatype utc_mpi_typ = get_MPI_typ(dtype);
Expand Down
36 changes: 34 additions & 2 deletions bodo/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from bodo.pandas.series import BodoSeries, _get_series_func_plan
from bodo.pandas.utils import (
BODO_NONE_DUMMY,
BodoDictionaryTypeInvalidException,
BodoLibNotImplementedException,
arrow_to_empty_df,
check_args_fallback,
Expand All @@ -73,6 +74,21 @@ def from_pandas(df):
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")

if isinstance(df.columns, pd.MultiIndex):
raise BodoLibNotImplementedException(
"from_pandas(): Hierarchical column names are not supported in Bodo yet."
)

for c in df.columns:
if not isinstance(c, str):
raise BodoLibNotImplementedException(
f"from_pandas(): Expected column names to be type string, found: {type(c)}."
)
elif isinstance(df[c], pd.DataFrame):
raise BodoLibNotImplementedException(
f"from_pandas(): Duplicate column names are not supported: '{c}'."
)

# Avoid datetime64[us] that is commonly used in Pandas but not supported in Bodo.
df = ensure_datetime64ns(df)

Expand All @@ -84,8 +100,24 @@ def from_pandas(df):
for col in df.select_dtypes(include=["object"]).columns:
if len(df[col]) > 0 and type(df[col].iloc[0]) is BodoScalar:
df[col] = df[col].apply(lambda x: x.get_value() if x is not None else None)
pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size])
empty_df = arrow_to_empty_df(pa_schema)

try:
pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size])
except pa.lib.ArrowInvalid as e:
# TODO: add specific unsupported columns to message.
raise BodoLibNotImplementedException(
"from_pandas(): Could not convert DataFrame to Bodo: "
+ "Unsupported datatype encountered in one or more columns:"
+ str(e)
)

try:
empty_df = arrow_to_empty_df(pa_schema)
except BodoDictionaryTypeInvalidException as e:
raise BodoLibNotImplementedException(
"from_pandas(): Could not convert DataFrame to Bodo: " + str(e)
)

n_rows = len(df)

res_id = None
Expand Down
1 change: 1 addition & 0 deletions bodo/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __new__(cls, *args, **kwargs):
df = pd.DataFrame({f"{S.name}": S})
bodo_S = bodo.pandas.base.from_pandas(df)[f"{S.name}"]
bodo_S._name = S.name
bodo_S._head_s.name = S.name
return bodo_S

def __init__(self, *args, **kwargs):
Expand Down
100 changes: 86 additions & 14 deletions bodo/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ class BodoLibNotImplementedException(Exception):
"""


class BodoDictionaryTypeInvalidException(Exception):
"""Exception raised in the Bodo DataFrames when unsupported dictionary type is
encountered (either values are not strings or index type is not int32).
"""


class BodoLibFallbackWarning(Warning):
"""Warning raised in the Bodo library in the fallback decorator when some
functionality is not implemented yet and we need to fall back to Pandas.
Expand Down Expand Up @@ -272,6 +278,35 @@ def report_times():
atexit.register(report_times)


def _maybe_create_bodo_obj(cls, obj: pd.DataFrame | pd.Series):
"""Wrap obj with a Bodo constructor or return obj unchanged if
it contains invalid Arrow types."""

try:
return cls(obj)
except BodoLibNotImplementedException as e:
warnings.warn(
BodoLibFallbackWarning(
f"Could not convert object to {cls.__name__} during fallback, "
+ f"execution will continue using Pandas: {e}"
)
)

return obj


def convert_to_bodo(obj):
"""Returns a new version of *obj* that is the equivalent Bodo type or leave unchanged
if not a DataFrame or Series."""
from bodo.pandas import BodoDataFrame, BodoSeries

if isinstance(obj, pd.DataFrame) and not isinstance(obj, BodoDataFrame):
return _maybe_create_bodo_obj(BodoDataFrame, obj)
elif isinstance(obj, pd.Series) and not isinstance(obj, BodoSeries):
return _maybe_create_bodo_obj(BodoSeries, obj)
return obj


def check_args_fallback(
unsupported=None,
supported=None,
Expand Down Expand Up @@ -413,7 +448,8 @@ def wrapper(*args, **kwargs):
if except_msg:
msg += f"\nException: {except_msg}"
warnings.warn(BodoLibFallbackWarning(msg))
return getattr(py_pkg, func.__name__)(*args, **kwargs)
py_res = getattr(py_pkg, func.__name__)(*args, **kwargs)
return convert_to_bodo(py_res)
else:

@functools.wraps(func)
Expand Down Expand Up @@ -531,19 +567,26 @@ def df_to_cpp_table(df) -> tuple[int, pa.Schema]:
return plan_optimizer.arrow_to_cpp_table(arrow_table), arrow_table.schema


def _empty_pd_array(pa_type):
def _empty_pd_array(pa_type, field_name=None):
"""Create an empty pandas array with the given Arrow type."""

# Workaround Arrows conversion gaps for dictionary types
if isinstance(pa_type, pa.DictionaryType):
assert pa_type.index_type == pa.int32() and (
pa_type.value_type == pa.string() or pa_type.value_type == pa.large_string()
), (
"Invalid dictionary type "
+ str(pa_type.index_type)
+ " "
+ str(pa_type.value_type)
)
if not (
pa_type.index_type == pa.int32()
and (
pa_type.value_type == pa.string()
or pa_type.value_type == pa.large_string()
)
):
field_part = f" at column {field_name}" if field_name is not None else ""
raise BodoDictionaryTypeInvalidException(
f"Encountered invalid dictionary type{field_part}: "
+ str(pa_type.index_type)
+ " "
+ str(pa_type.value_type)
+ " not supported yet."
)
return pd.array(
["dummy"], pd.ArrowDtype(pa.dictionary(pa.int32(), pa.string()))
)[:0]
Expand Down Expand Up @@ -899,7 +942,10 @@ def _reconstruct_pandas_index(df, arrow_schema):
def arrow_to_empty_df(arrow_schema):
"""Create an empty dataframe with the same schema as the Arrow schema"""
empty_df = pd.DataFrame(
{field.name: _empty_pd_array(field.type) for field in arrow_schema}
{
field.name: _empty_pd_array(field.type, field_name=field.name)
for field in arrow_schema
}
)
return _reconstruct_pandas_index(empty_df, arrow_schema)

Expand Down Expand Up @@ -1116,6 +1162,24 @@ def ensure_datetime64ns(df):
return df


class FallbackContext:
"""Context manager for tracking nested fallback calls."""

level = 0

@classmethod
def is_top_level(cls):
"""Check we are in the top level context i.e. this fallback was not triggered
by another fallback."""
return FallbackContext.level == 0

def __enter__(self):
FallbackContext.level += 1

def __exit__(self, exc_type, exc_value, traceback):
FallbackContext.level -= 1


# TODO: further generalize. Currently, this method is only used for BodoSeries and BodoDataFrame.
def fallback_wrapper(self, attr, name, msg):
"""
Expand Down Expand Up @@ -1143,10 +1207,16 @@ def silenced_method(*args, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=BodoLibFallbackWarning)
try:
return attr(*args, **kwargs)
with FallbackContext():
py_res = attr(*args, **kwargs)

# Convert objects to Bodo before returning them to the user.
if FallbackContext.is_top_level():
return convert_to_bodo(py_res)
Copy link
Contributor Author

@scott-routledge2 scott-routledge2 Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My observation was that Pandas methods call a lot of internal functions we do not support (example: xs, copy), so we can keep the DataFrame as Pandas for the internal calls and only convert when returning back to the user.

This is also currently hiding a small bug in a lot of the tests that I haven't figured out yet, but seemed minor to me:

df = pd.DataFrame({"A": [1, 2, 3], "B": ["a", "b", "c"]}, index = [1,2,3])
bdf = bd.from_pandas(df)

bdf1 = bdf.rename_axis("index123")
bdf2 = bdf1.copy()
print("bodo result: ", bdf2.index.name)

pdf1 = df.rename_axis("index123")
pdf2 = pdf1.copy()
print("pandas result: ", pdf2.index.name)

Pandas result: "index123", Bodo result: None (index name doesn't propagate in some places)


return py_res
except TypeError as e:
msg = e
pass

# In some cases, fallback fails and raises TypeError due to some operations being unsupported between PyArrow types.
# Below logic processes deeper fallback that converts problematic PyArrow types to their Pandas equivalents.
Expand All @@ -1163,7 +1233,9 @@ def silenced_method(*args, **kwargs):
)
)
converted = pd_self.array._pa_array.to_pandas()
return getattr(converted, attr.__name__)(*args[1:], **kwargs)
return convert_to_bodo(
getattr(converted, attr.__name__)(*args[1:], **kwargs)
)

# Raise TypeError from initial call if self does not fall into any of the covered cases.
raise TypeError(msg)
Expand Down
14 changes: 7 additions & 7 deletions bodo/tests/test_df_lib/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,13 @@ def test_projection(datapath):
py_df1 = pd.read_parquet(datapath("dataframe_library/df1.parquet"))
py_df2 = py_df1["D"]

# TODO: remove copy when df.apply(axis=0) is implemented
# TODO: remove forcing collect when copy() bug with RangeIndex(1) is fixed
_test_equal(
bodo_df2.copy(),
bodo_df2,
py_df2,
check_pandas_types=False,
sort_output=True,
reset_index=False,
# Index is initially RangeIndex, so we ignore final order.
reset_index=True,
)


Expand Down Expand Up @@ -1110,6 +1109,7 @@ def test_set_df_column_extra_proj(datapath, index_val):
_test_equal(bdf2, pdf2, check_pandas_types=False)


@pytest.mark.skip("TODO")
def test_parquet_read_partitioned(datapath):
"""Test reading a partitioned parquet dataset."""
path = datapath("dataframe_library/example_partitioned.parquet")
Expand Down Expand Up @@ -1504,7 +1504,7 @@ def test_series_groupby(dropna, as_index):
bdf2 = bdf1.groupby("A", as_index=as_index, dropna=dropna)["E"].sum()
df2 = df1.groupby("A", as_index=as_index, dropna=dropna)["E"].sum()

_test_equal(bdf2, df2, sort_output=True, reset_index=True)
_test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1679,7 +1679,7 @@ def test_groupby_agg_numeric(groupby_agg_df, func):

assert bdf2.is_lazy_plan()

_test_equal(bdf2, df2, sort_output=True, reset_index=True)
_test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1716,7 +1716,7 @@ def test_groupby_agg_ordered(func):
bdf2 = getattr(bdf1.groupby("K"), func)()
df2 = getattr(df.groupby("K"), func)()

_test_equal(bdf2, df2, sort_output=True, reset_index=True)
_test_equal(bdf2, df2, sort_output=True, reset_index=True, check_pandas_types=False)


def test_compound_projection_expression(datapath):
Expand Down
67 changes: 67 additions & 0 deletions bodo/tests/test_df_lib/test_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import pandas as pd
import pytest
from test_end_to_end import index_val # noqa

import bodo.pandas as bd
from bodo.pandas.utils import BodoLibFallbackWarning
from bodo.tests.utils import _test_equal


def test_read_join_filter_proj(datapath):
Expand Down Expand Up @@ -315,3 +317,68 @@ def test_non_nested_cte():

generated_ctes = final._plan.get_cte_count()
assert generated_ctes == 1


@pytest.mark.parametrize(
"expr, expected_type",
[
# Exprs returning dataframes
pytest.param(
lambda df, _: df.apply(lambda x: x.round(), axis=0),
bd.DataFrame,
id="df_semi_supported_method",
),
pytest.param(
lambda df, _: df.interpolate(), bd.DataFrame, id="df_unsupported_method"
),
pytest.param(
lambda df, pd_: pd_.concat(
[df, df.rename(columns={"A": "AA", "B": "BB"})], axis=1
),
bd.DataFrame,
id="semi_supported_toplevel",
),
pytest.param(
lambda df, pd_: pd_.melt(df, id_vars=["A"], value_vars=["B"]),
bd.DataFrame,
id="df_unsupported_toplevel",
marks=pytest.mark.skip(
"TODO: Warning and fallback for toplevel unsupported methods."
),
),
# Exprs returning series
pytest.param(
lambda df, _: df.A[:], bd.Series, id="series_semi_supported_method"
),
pytest.param(
lambda df, _: df.A.interpolate(), bd.Series, id="series_unsupported_method"
),
pytest.param(
lambda df, pd_: pd_.concat([df.A, df.A], sort=True),
bd.Series,
id="series_semi_supported_toplevel",
),
pytest.param(
lambda df, pd_: pd_.cut(df.A, bins=[1, 2, 3, 4]),
bd.Series,
id="series_unsupported_toplevel",
marks=pytest.mark.skip(
"TODO: Warning and fallback for toplevel unsupported methods."
),
),
],
)
def test_bodo_fallback(expr, expected_type, index_val):
"""Test fallback returns a BodoDataFrame."""

df = pd.DataFrame({"A": [1, 2, 3] * 2, "B": [1.2, 2.4, 4.5] * 2})
df.index = index_val[: len(df)]

bdf = bd.from_pandas(df)

py_out = expr(df, pd)
with pytest.warns(BodoLibFallbackWarning):
bodo_out = expr(bdf, bd)

assert isinstance(bodo_out, expected_type)
_test_equal(py_out, bodo_out, check_pandas_types=False)
Loading