-
Notifications
You must be signed in to change notification settings - Fork 15
Convert to BodoDataFrame/BodoSeries on fallback #855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
964094a
35edb21
53cdff4
abca6a3
36a45b1
fdeffcb
c5d792d
b491e9f
553e453
21c53cd
4580bc2
3da3fb8
ade9ee9
393303e
b5ebaad
dc25355
92490c8
8f9e8f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| import csv | ||
| import importlib | ||
| import typing as pt | ||
| import warnings | ||
| from collections.abc import ( | ||
| Hashable, | ||
| Iterable, | ||
|
|
@@ -54,6 +55,7 @@ | |
| from bodo.pandas.series import BodoSeries, _get_series_func_plan | ||
| from bodo.pandas.utils import ( | ||
| BODO_NONE_DUMMY, | ||
| BodoDictionaryTypeInvalidException, | ||
| BodoLibNotImplementedException, | ||
| arrow_to_empty_df, | ||
| check_args_fallback, | ||
|
|
@@ -73,6 +75,24 @@ def from_pandas(df): | |
| if not isinstance(df, pd.DataFrame): | ||
| raise TypeError("Input must be a pandas DataFrame") | ||
|
|
||
| if isinstance(df.columns, pd.MultiIndex): | ||
| raise BodoLibNotImplementedException( | ||
| "from_pandas(): Hierarchical column names are not supported in Bodo yet." | ||
| ) | ||
| new_columns = [] | ||
| for c in df.columns: | ||
| if isinstance(df[c], pd.DataFrame): | ||
| raise BodoLibNotImplementedException( | ||
| f"from_pandas(): Duplicate column name: '{c}'." | ||
|
||
| ) | ||
| elif not isinstance(c, str): | ||
| warnings.warn( | ||
| f"The column name '{c}' with type {type(c)} was converted to string." | ||
| ) | ||
| new_columns.append(str(c)) | ||
|
|
||
| df.columns = new_columns | ||
|
|
||
| # Avoid datetime64[us] that is commonly used in Pandas but not supported in Bodo. | ||
| df = ensure_datetime64ns(df) | ||
|
|
||
|
|
@@ -84,8 +104,24 @@ def from_pandas(df): | |
| for col in df.select_dtypes(include=["object"]).columns: | ||
| if len(df[col]) > 0 and type(df[col].iloc[0]) is BodoScalar: | ||
| df[col] = df[col].apply(lambda x: x.get_value() if x is not None else None) | ||
| pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size]) | ||
| empty_df = arrow_to_empty_df(pa_schema) | ||
|
|
||
| try: | ||
| pa_schema = pa.Schema.from_pandas(df.iloc[:sample_size]) | ||
| except pa.lib.ArrowInvalid as e: | ||
| # TODO: add specific unsupported columns to message. | ||
| raise BodoLibNotImplementedException( | ||
| "from_pandas(): Could not convert DataFrame to Bodo: " | ||
| + "Unsupported datatype encountered in one or more columns: " | ||
| + str(e) | ||
| ) | ||
|
|
||
| try: | ||
| empty_df = arrow_to_empty_df(pa_schema) | ||
| except BodoDictionaryTypeInvalidException as e: | ||
| raise BodoLibNotImplementedException( | ||
| "from_pandas(): Could not convert DataFrame to Bodo: " + str(e) | ||
| ) | ||
|
|
||
| n_rows = len(df) | ||
|
|
||
| res_id = None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -241,6 +241,12 @@ class BodoLibNotImplementedException(Exception): | |
| """ | ||
|
|
||
|
|
||
| class BodoDictionaryTypeInvalidException(Exception): | ||
| """Exception raised in the Bodo DataFrames when unsupported dictionary type is | ||
| encountered (either values are not strings or index type is not int32). | ||
| """ | ||
|
|
||
|
|
||
| class BodoLibFallbackWarning(Warning): | ||
| """Warning raised in the Bodo library in the fallback decorator when some | ||
| functionality is not implemented yet and we need to fall back to Pandas. | ||
|
|
@@ -272,6 +278,35 @@ def report_times(): | |
| atexit.register(report_times) | ||
|
|
||
|
|
||
| def _maybe_create_bodo_obj(cls, obj: pd.DataFrame | pd.Series): | ||
| """Wrap obj with a Bodo constructor or return obj unchanged if | ||
| it contains invalid Arrow types.""" | ||
|
|
||
| try: | ||
| return cls(obj) | ||
| except BodoLibNotImplementedException as e: | ||
| warnings.warn( | ||
| BodoLibFallbackWarning( | ||
| f"Could not convert object to {cls.__name__} during fallback, " | ||
| + f"execution will continue using Pandas: {e}" | ||
| ) | ||
| ) | ||
|
|
||
| return obj | ||
|
|
||
|
|
||
| def convert_to_bodo(obj): | ||
| """Returns a new version of *obj* that is the equivalent Bodo type or leave unchanged | ||
| if not a DataFrame or Series.""" | ||
| from bodo.pandas import BodoDataFrame, BodoSeries | ||
|
|
||
| if isinstance(obj, pd.DataFrame) and not isinstance(obj, BodoDataFrame): | ||
| return _maybe_create_bodo_obj(BodoDataFrame, obj) | ||
| elif isinstance(obj, pd.Series) and not isinstance(obj, BodoSeries): | ||
| return _maybe_create_bodo_obj(BodoSeries, obj) | ||
| return obj | ||
|
|
||
|
|
||
| def check_args_fallback( | ||
| unsupported=None, | ||
| supported=None, | ||
|
|
@@ -413,7 +448,8 @@ def wrapper(*args, **kwargs): | |
| if except_msg: | ||
| msg += f"\nException: {except_msg}" | ||
| warnings.warn(BodoLibFallbackWarning(msg)) | ||
| return getattr(py_pkg, func.__name__)(*args, **kwargs) | ||
| py_res = getattr(py_pkg, func.__name__)(*args, **kwargs) | ||
| return convert_to_bodo(py_res) | ||
| else: | ||
|
|
||
| @functools.wraps(func) | ||
|
|
@@ -531,19 +567,26 @@ def df_to_cpp_table(df) -> tuple[int, pa.Schema]: | |
| return plan_optimizer.arrow_to_cpp_table(arrow_table), arrow_table.schema | ||
|
|
||
|
|
||
| def _empty_pd_array(pa_type): | ||
| def _empty_pd_array(pa_type, field_name=None): | ||
| """Create an empty pandas array with the given Arrow type.""" | ||
|
|
||
| # Workaround Arrows conversion gaps for dictionary types | ||
| if isinstance(pa_type, pa.DictionaryType): | ||
| assert pa_type.index_type == pa.int32() and ( | ||
| pa_type.value_type == pa.string() or pa_type.value_type == pa.large_string() | ||
| ), ( | ||
| "Invalid dictionary type " | ||
| + str(pa_type.index_type) | ||
| + " " | ||
| + str(pa_type.value_type) | ||
| ) | ||
| if not ( | ||
| pa_type.index_type == pa.int32() | ||
| and ( | ||
| pa_type.value_type == pa.string() | ||
| or pa_type.value_type == pa.large_string() | ||
| ) | ||
| ): | ||
| field_part = f" at column {field_name}" if field_name is not None else "" | ||
| raise BodoDictionaryTypeInvalidException( | ||
| f"Encountered invalid dictionary type{field_part}: " | ||
| + str(pa_type.index_type) | ||
| + " " | ||
| + str(pa_type.value_type) | ||
| + " not supported yet." | ||
| ) | ||
| return pd.array( | ||
| ["dummy"], pd.ArrowDtype(pa.dictionary(pa.int32(), pa.string())) | ||
| )[:0] | ||
|
|
@@ -905,7 +948,10 @@ def _reconstruct_pandas_index(df, arrow_schema): | |
| def arrow_to_empty_df(arrow_schema): | ||
| """Create an empty dataframe with the same schema as the Arrow schema""" | ||
| empty_df = pd.DataFrame( | ||
| {field.name: _empty_pd_array(field.type) for field in arrow_schema} | ||
| { | ||
| field.name: _empty_pd_array(field.type, field_name=field.name) | ||
| for field in arrow_schema | ||
| } | ||
| ) | ||
| return _reconstruct_pandas_index(empty_df, arrow_schema) | ||
|
|
||
|
|
@@ -1122,6 +1168,24 @@ def ensure_datetime64ns(df): | |
| return df | ||
|
|
||
|
|
||
| class FallbackContext: | ||
| """Context manager for tracking nested fallback calls.""" | ||
|
|
||
| level = 0 | ||
|
|
||
| @classmethod | ||
| def is_top_level(cls): | ||
| """Check we are in the top level context i.e. this fallback was not triggered | ||
| by another fallback.""" | ||
| return FallbackContext.level == 0 | ||
|
|
||
| def __enter__(self): | ||
| FallbackContext.level += 1 | ||
|
|
||
| def __exit__(self, exc_type, exc_value, traceback): | ||
| FallbackContext.level -= 1 | ||
|
|
||
|
|
||
| # TODO: further generalize. Currently, this method is only used for BodoSeries and BodoDataFrame. | ||
| def fallback_wrapper(self, attr, name, msg): | ||
| """ | ||
|
|
@@ -1149,10 +1213,16 @@ def silenced_method(*args, **kwargs): | |
| with warnings.catch_warnings(): | ||
| warnings.simplefilter("ignore", category=BodoLibFallbackWarning) | ||
| try: | ||
| return attr(*args, **kwargs) | ||
| with FallbackContext(): | ||
| py_res = attr(*args, **kwargs) | ||
|
|
||
| # Convert objects to Bodo before returning them to the user. | ||
| if FallbackContext.is_top_level(): | ||
| return convert_to_bodo(py_res) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My observation was that Pandas methods call a lot of internal functions we do not support (example: This is also currently hiding a small bug in a lot of the tests that I haven't figured out yet, but seemed minor to me: df = pd.DataFrame({"A": [1, 2, 3], "B": ["a", "b", "c"]}, index = [1,2,3])
bdf = bd.from_pandas(df)
bdf1 = bdf.rename_axis("index123")
bdf2 = bdf1.copy()
print("bodo result: ", bdf2.index.name)
pdf1 = df.rename_axis("index123")
pdf2 = pdf1.copy()
print("pandas result: ", pdf2.index.name)Pandas result: "index123", Bodo result: |
||
|
|
||
| return py_res | ||
| except TypeError as e: | ||
| msg = e | ||
| pass | ||
|
|
||
| # In some cases, fallback fails and raises TypeError due to some operations being unsupported between PyArrow types. | ||
| # Below logic processes deeper fallback that converts problematic PyArrow types to their Pandas equivalents. | ||
|
|
@@ -1169,7 +1239,9 @@ def silenced_method(*args, **kwargs): | |
| ) | ||
| ) | ||
| converted = pd_self.array._pa_array.to_pandas() | ||
| return getattr(converted, attr.__name__)(*args[1:], **kwargs) | ||
| return convert_to_bodo( | ||
| getattr(converted, attr.__name__)(*args[1:], **kwargs) | ||
| ) | ||
|
|
||
| # Raise TypeError from initial call if self does not fall into any of the covered cases. | ||
| raise TypeError(msg) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
df.columns.has_duplicatesis simpler and more reliable.columnsis an Index, which is sort of a set and should have this info internally I think.