-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54134][SQL] Optimize Arrow memory usage #52747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
309d66f to
14ff1f9
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
Outdated
Show resolved
Hide resolved
b0320db to
9f57f37
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
Outdated
Show resolved
Hide resolved
77fc1b2 to
0e402d0
Compare
f1e7921 to
c3d5a77
Compare
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @viirya and @cloud-fan .
Merged to master/4.1.
### What changes were proposed in this pull request? This patch proposes some changes to optimize memory usage on Arrow in Spark. It compress Arrow IPC data when serializing. ### Why are the changes needed? We have encountered OOM when loading data and processing them in PySpark through `toArrow` or `toPandas`. The same data could be loaded by PyArrow directly but fails to load through `toArrow` or `toPandas` into PySpark due to OOM issues. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Manually test it locally. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.13 Closes #52747 from viirya/release_buffers. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 02ba89f) Signed-off-by: Dongjoon Hyun <[email protected]>
|
Thank you @cloud-fan @dongjoon-hyun |
allisonwang-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this!
| val codecType = new Lz4CompressionCodec().getCodecType() | ||
| factory.createCodec(codecType) | ||
| case other => | ||
| throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be SparkException.internalError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, that would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change to SparkException when I extend this to pandas udf.
|
|
||
| val ARROW_EXECUTION_COMPRESSION_CODEC = | ||
| buildConf("spark.sql.execution.arrow.compressionCodec") | ||
| .doc("Compression codec used to compress Arrow IPC data when transferring data " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this optimization take effect in pandas udf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no, it is currently applied on toArrow and toPandas which is on the reported issue. It should be also available to arrow udf and pandas udf. I will try to extend this to such cases.
| case "zstd" => | ||
| val factory = CompressionCodec.Factory.INSTANCE | ||
| val codecType = new ZstdCompressionCodec().getCodecType() | ||
| factory.createCodec(codecType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great that we can have an option to add compression levels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, we can add compression level option together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to add the option in #52925 along with Pandas UDF support.
### What changes were proposed in this pull request? This is an extension to #52747. In #52747, we add the support of Arrow compression to `toArrow` and `toPandas` to reduce memory usage. We would like to extend the memory optimization feature to Pandas UDF case. ### Why are the changes needed? To optimize memory usage for Pandas UDF case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.14 Closes #52925 from viirya/arrow_compress_udf. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This is an extension to #52747. In #52747, we add the support of Arrow compression to `toArrow` and `toPandas` to reduce memory usage. We would like to extend the memory optimization feature to Pandas UDF case. ### Why are the changes needed? To optimize memory usage for Pandas UDF case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.14 Closes #52925 from viirya/arrow_compress_udf. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 96ed48d) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This is an extension to apache#52747. In apache#52747, we add the support of Arrow compression to `toArrow` and `toPandas` to reduce memory usage. We would like to extend the memory optimization feature to Pandas UDF case. ### Why are the changes needed? To optimize memory usage for Pandas UDF case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.14 Closes apache#52925 from viirya/arrow_compress_udf. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This patch proposes some changes to optimize memory usage on Arrow in Spark. It compress Arrow IPC data when serializing. ### Why are the changes needed? We have encountered OOM when loading data and processing them in PySpark through `toArrow` or `toPandas`. The same data could be loaded by PyArrow directly but fails to load through `toArrow` or `toPandas` into PySpark due to OOM issues. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Manually test it locally. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.13 Closes apache#52747 from viirya/release_buffers. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This is an extension to apache#52747. In apache#52747, we add the support of Arrow compression to `toArrow` and `toPandas` to reduce memory usage. We would like to extend the memory optimization feature to Pandas UDF case. ### Why are the changes needed? To optimize memory usage for Pandas UDF case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.14 Closes apache#52925 from viirya/arrow_compress_udf. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This patch proposes some changes to optimize memory usage on Arrow in Spark. It compress Arrow IPC data when serializing.
Why are the changes needed?
We have encountered OOM when loading data and processing them in PySpark through
toArrowortoPandas. The same data could be loaded by PyArrow directly but fails to load throughtoArrowortoPandasinto PySpark due to OOM issues.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests. Manually test it locally.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code v2.0.13