-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23380][PYTHON] Adds a conf for Arrow fallback in toPandas/createDataFrame with Pandas DataFrame #20678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
7f87d25
Adds a conf for Arrow fallback in toPandas/createDataFrame with Panda…
HyukjinKwon 7641fd0
Address comments
HyukjinKwon cfb08a1
Fix some comments
HyukjinKwon 229a5f7
Address comments
HyukjinKwon ed30c20
Address comments
HyukjinKwon af60cb7
Fix a nit
HyukjinKwon b5bea82
Fix a nit
HyukjinKwon 4ccaa81
Fix nits
HyukjinKwon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1986,55 +1986,91 @@ def toPandas(self): | |
| timezone = None | ||
|
|
||
| if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": | ||
| use_arrow = True | ||
| try: | ||
| from pyspark.sql.types import _check_dataframe_convert_date, \ | ||
| _check_dataframe_localize_timestamps, to_arrow_schema | ||
| from pyspark.sql.types import to_arrow_schema | ||
| from pyspark.sql.utils import require_minimum_pyarrow_version | ||
|
|
||
| require_minimum_pyarrow_version() | ||
| import pyarrow | ||
| to_arrow_schema(self.schema) | ||
| tables = self._collectAsArrow() | ||
| if tables: | ||
| table = pyarrow.concat_tables(tables) | ||
| pdf = table.to_pandas() | ||
| pdf = _check_dataframe_convert_date(pdf, self.schema) | ||
| return _check_dataframe_localize_timestamps(pdf, timezone) | ||
| else: | ||
| return pd.DataFrame.from_records([], columns=self.columns) | ||
| except Exception as e: | ||
| msg = ( | ||
| "Note: toPandas attempted Arrow optimization because " | ||
| "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false " | ||
| "to disable this.") | ||
| raise RuntimeError("%s\n%s" % (_exception_message(e), msg)) | ||
| else: | ||
| pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) | ||
|
|
||
| dtype = {} | ||
| if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \ | ||
| .lower() == "true": | ||
| msg = ( | ||
| "toPandas attempted Arrow optimization because " | ||
| "'spark.sql.execution.arrow.enabled' is set to true; however, " | ||
| "failed by the reason below:\n %s\n" | ||
| "Attempts non-optimization as " | ||
| "'spark.sql.execution.arrow.fallback.enabled' is set to " | ||
| "true." % _exception_message(e)) | ||
| warnings.warn(msg) | ||
| use_arrow = False | ||
| else: | ||
| msg = ( | ||
| "toPandas attempted Arrow optimization because " | ||
| "'spark.sql.execution.arrow.enabled' is set to true; however, " | ||
| "failed by the reason below:\n %s\n" | ||
| "For fallback to non-optimization automatically, please set true to " | ||
| "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e)) | ||
| raise RuntimeError(msg) | ||
|
|
||
| # Try to use Arrow optimization when the schema is supported and the required version | ||
| # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. | ||
| if use_arrow: | ||
| try: | ||
| from pyspark.sql.types import _check_dataframe_convert_date, \ | ||
| _check_dataframe_localize_timestamps | ||
| import pyarrow | ||
|
|
||
| tables = self._collectAsArrow() | ||
| if tables: | ||
| table = pyarrow.concat_tables(tables) | ||
| pdf = table.to_pandas() | ||
| pdf = _check_dataframe_convert_date(pdf, self.schema) | ||
| return _check_dataframe_localize_timestamps(pdf, timezone) | ||
| else: | ||
| return pd.DataFrame.from_records([], columns=self.columns) | ||
| except Exception as e: | ||
| # We might have to allow fallback here as well but multiple Spark jobs can | ||
| # be executed. So, simply fail in this case for now. | ||
| msg = ( | ||
| "toPandas attempted Arrow optimization because " | ||
| "'spark.sql.execution.arrow.enabled' is set to true; however, " | ||
| "failed unexpectedly:\n %s\n" | ||
| "Note that 'spark.sql.execution.arrow.fallback.enabled' does " | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 good job having this explanation in the exception |
||
| "not have an effect in such failure in the middle of " | ||
| "computation." % _exception_message(e)) | ||
| raise RuntimeError(msg) | ||
|
|
||
| # Below is toPandas without Arrow optimization. | ||
| pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) | ||
|
|
||
| dtype = {} | ||
| for field in self.schema: | ||
| pandas_type = _to_corrected_pandas_type(field.dataType) | ||
| # SPARK-21766: if an integer field is nullable and has null values, it can be | ||
| # inferred by pandas as float column. Once we convert the column with NaN back | ||
| # to integer type e.g., np.int16, we will hit exception. So we use the inferred | ||
| # float type, not the corrected type from the schema in this case. | ||
| if pandas_type is not None and \ | ||
| not(isinstance(field.dataType, IntegralType) and field.nullable and | ||
| pdf[field.name].isnull().any()): | ||
| dtype[field.name] = pandas_type | ||
|
|
||
| for f, t in dtype.items(): | ||
| pdf[f] = pdf[f].astype(t, copy=False) | ||
|
|
||
| if timezone is None: | ||
| return pdf | ||
| else: | ||
| from pyspark.sql.types import _check_series_convert_timestamps_local_tz | ||
| for field in self.schema: | ||
| pandas_type = _to_corrected_pandas_type(field.dataType) | ||
| # SPARK-21766: if an integer field is nullable and has null values, it can be | ||
| # inferred by pandas as float column. Once we convert the column with NaN back | ||
| # to integer type e.g., np.int16, we will hit exception. So we use the inferred | ||
| # float type, not the corrected type from the schema in this case. | ||
| if pandas_type is not None and \ | ||
| not(isinstance(field.dataType, IntegralType) and field.nullable and | ||
| pdf[field.name].isnull().any()): | ||
| dtype[field.name] = pandas_type | ||
|
|
||
| for f, t in dtype.items(): | ||
| pdf[f] = pdf[f].astype(t, copy=False) | ||
|
|
||
| if timezone is None: | ||
| return pdf | ||
| else: | ||
| from pyspark.sql.types import _check_series_convert_timestamps_local_tz | ||
| for field in self.schema: | ||
| # TODO: handle nested timestamps, such as ArrayType(TimestampType())? | ||
| if isinstance(field.dataType, TimestampType): | ||
| pdf[field.name] = \ | ||
| _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) | ||
| return pdf | ||
| # TODO: handle nested timestamps, such as ArrayType(TimestampType())? | ||
| if isinstance(field.dataType, TimestampType): | ||
| pdf[field.name] = \ | ||
| _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) | ||
| return pdf | ||
|
|
||
| def _collectAsArrow(self): | ||
| """ | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toPandas attempted Arrow optimization because...repeats three times here, maybe we can dedup it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm ... I tried to like make a
"toPandas attempted Arrow optimization because ... %s"and reuse it but seems a little bit overkill.