-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance #22275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
087564e
6073ed9
0d77b00
7d19977
6457e42
bf2feec
725cd47
7dc92c8
8045fac
00c7b8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4923,6 +4923,34 @@ def test_timestamp_dst(self): | |
| self.assertPandasEqual(pdf, df_from_python.toPandas()) | ||
| self.assertPandasEqual(pdf, df_from_pandas.toPandas()) | ||
|
|
||
| def test_toPandas_batch_order(self): | ||
|
|
||
| def delay_first_part(partition_index, iterator): | ||
| if partition_index == 0: | ||
| time.sleep(0.1) | ||
| return iterator | ||
|
|
||
| # Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python | ||
| def run_test(num_records, num_parts, max_records, use_delay=False): | ||
| df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") | ||
| if use_delay: | ||
| df = df.rdd.mapPartitionsWithIndex(delay_first_part).toDF() | ||
| with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): | ||
| pdf, pdf_arrow = self._toPandas_arrow_toggle(df) | ||
| self.assertPandasEqual(pdf, pdf_arrow) | ||
|
|
||
| cases = [ | ||
| (1024, 512, 2), # Use large num partitions for more likely collecting out of order | ||
| (64, 8, 2, True), # Use delay in first partition to force collecting out of order | ||
| (64, 64, 1), # Test single batch per partition | ||
| (64, 1, 64), # Test single partition, single batch | ||
| (64, 1, 8), # Test single partition, multiple batches | ||
| (30, 7, 2), # Test different sized partitions | ||
| ] | ||
|
||
|
|
||
| for case in cases: | ||
| run_test(*case) | ||
|
|
||
|
|
||
| class EncryptionArrowTests(ArrowTests): | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this :)