Skip to content

Commit d34590c

Browse files
committed
[SPARK-31441][PYSPARK][SQL][2.4] Support duplicated column names for toPandas with arrow execution
### What changes were proposed in this pull request? This is to backport #28210. This PR is adding support duplicated column names for `toPandas` with Arrow execution. ### Why are the changes needed? When we execute `toPandas()` with Arrow execution, it fails if the column names have duplicates. ```py >>> spark.sql("select 1 v, 1 v").toPandas() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/path/to/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 2132, in toPandas pdf = table.to_pandas() File "pyarrow/array.pxi", line 441, in pyarrow.lib._PandasConvertible.to_pandas File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 653, in table_to_blockmanager columns = _deserialize_column_index(table, all_columns, column_indexes) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 704, in _deserialize_column_index columns = _flatten_single_level_multiindex(columns) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 937, in _flatten_single_level_multiindex raise ValueError('Found non-unique column index') ValueError: Found non-unique column index ``` ### Does this PR introduce any user-facing change? Yes, previously we will face an error above, but after this PR, we will see the result: ```py >>> spark.sql("select 1 v, 1 v").toPandas() v v 0 1 1 ``` ### How was this patch tested? Added and modified related tests. Closes #28221 from ueshin/issues/SPARK-31441/2.4/to_pandas. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Takuya UESHIN <[email protected]>
1 parent 49abdc4 commit d34590c

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2127,10 +2127,14 @@ def toPandas(self):
21272127
from pyspark.sql.types import _check_dataframe_convert_date, \
21282128
_check_dataframe_localize_timestamps
21292129
import pyarrow
2130-
batches = self._collectAsArrow()
2130+
# Rename columns to avoid duplicated column names.
2131+
tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
2132+
batches = self.toDF(*tmp_column_names)._collectAsArrow()
21312133
if len(batches) > 0:
21322134
table = pyarrow.Table.from_batches(batches)
21332135
pdf = table.to_pandas()
2136+
# Rename back to the original column names.
2137+
pdf.columns = self.columns
21342138
pdf = _check_dataframe_convert_date(pdf, self.schema)
21352139
return _check_dataframe_localize_timestamps(pdf, timezone)
21362140
else:

python/pyspark/sql/tests.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3296,6 +3296,19 @@ def test_to_pandas(self):
32963296
self.assertEquals(types[4], np.object) # datetime.date
32973297
self.assertEquals(types[5], 'datetime64[ns]')
32983298

3299+
@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
3300+
def test_to_pandas_with_duplicated_column_names(self):
3301+
import numpy as np
3302+
3303+
sql = "select 1 v, 1 v"
3304+
for arrowEnabled in [False, True]:
3305+
with self.sql_conf({"spark.sql.execution.arrow.enabled": arrowEnabled}):
3306+
df = self.spark.sql(sql)
3307+
pdf = df.toPandas()
3308+
types = pdf.dtypes
3309+
self.assertEquals(types.iloc[0], np.int32)
3310+
self.assertEquals(types.iloc[1], np.int32)
3311+
32993312
@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
33003313
def test_to_pandas_on_cross_join(self):
33013314
import numpy as np
@@ -3307,12 +3320,14 @@ def test_to_pandas_on_cross_join(self):
33073320
select explode(sequence(1, 3)) v
33083321
) t2
33093322
"""
3310-
with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
3311-
df = self.spark.sql(sql)
3312-
pdf = df.toPandas()
3313-
types = pdf.dtypes
3314-
self.assertEquals(types.iloc[0], np.int32)
3315-
self.assertEquals(types.iloc[1], np.int32)
3323+
for arrowEnabled in [False, True]:
3324+
with self.sql_conf({"spark.sql.crossJoin.enabled": True,
3325+
"spark.sql.execution.arrow.enabled": arrowEnabled}):
3326+
df = self.spark.sql(sql)
3327+
pdf = df.toPandas()
3328+
types = pdf.dtypes
3329+
self.assertEquals(types.iloc[0], np.int32)
3330+
self.assertEquals(types.iloc[1], np.int32)
33163331

33173332
@unittest.skipIf(_have_pandas, "Required Pandas was found.")
33183333
def test_to_pandas_required_pandas_not_found(self):

0 commit comments

Comments
 (0)