diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 5c4c053293e0..73f36fb161c6 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -15,6 +15,7 @@ antlr4-runtime/4.13.1//antlr4-runtime-4.13.1.jar
aopalliance-repackaged/3.0.6//aopalliance-repackaged-3.0.6.jar
arpack/3.0.4//arpack-3.0.4.jar
arpack_combined_all/0.1//arpack_combined_all-0.1.jar
+arrow-compression/18.3.0//arrow-compression-18.3.0.jar
arrow-format/18.3.0//arrow-format-18.3.0.jar
arrow-memory-core/18.3.0//arrow-memory-core-18.3.0.jar
arrow-memory-netty-buffer-patch/18.3.0//arrow-memory-netty-buffer-patch-18.3.0.jar
diff --git a/pom.xml b/pom.xml
index 118573b79061..9373952fc7f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2517,6 +2517,25 @@
+
+ org.apache.arrow
+ arrow-compression
+ ${arrow.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ io.netty
+ netty-common
+
+
+
org.apache.arrow
arrow-memory-netty
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py b/python/pyspark/sql/tests/arrow/test_arrow.py
index be7dd2febc94..af08f8c8c101 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -1810,6 +1810,81 @@ def test_createDataFrame_arrow_fixed_size_list(self):
df = self.spark.createDataFrame(t)
self.assertIsInstance(df.schema["fsl"].dataType, ArrayType)
+ def test_toPandas_with_compression_codec(self):
+ # Test toPandas() with different compression codec settings
+ df = self.spark.createDataFrame(self.data, schema=self.schema)
+ expected = self.create_pandas_data_frame()
+
+ for codec in ["none", "zstd", "lz4"]:
+ with self.subTest(compressionCodec=codec):
+ with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ pdf = df.toPandas()
+ assert_frame_equal(expected, pdf)
+
+ def test_toArrow_with_compression_codec(self):
+ # Test toArrow() with different compression codec settings
+ import pyarrow.compute as pc
+
+ t_in = self.create_arrow_table()
+
+ # Convert timezone-naive local timestamp column in input table to UTC
+ # to enable comparison to UTC timestamp column in output table
+ timezone = self.spark.conf.get("spark.sql.session.timeZone")
+ t_in = t_in.set_column(
+ t_in.schema.get_field_index("8_timestamp_t"),
+ "8_timestamp_t",
+ pc.assume_timezone(t_in["8_timestamp_t"], timezone),
+ )
+ t_in = t_in.cast(
+ t_in.schema.set(
+ t_in.schema.get_field_index("8_timestamp_t"),
+ pa.field("8_timestamp_t", pa.timestamp("us", tz="UTC")),
+ )
+ )
+
+ df = self.spark.createDataFrame(self.data, schema=self.schema)
+
+ for codec in ["none", "zstd", "lz4"]:
+ with self.subTest(compressionCodec=codec):
+ with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ t_out = df.toArrow()
+ self.assertTrue(t_out.equals(t_in))
+
+ def test_toPandas_with_compression_codec_large_dataset(self):
+ # Test compression with a larger dataset to verify memory savings
+ # Create a dataset with repetitive data that compresses well
+ from pyspark.sql.functions import lit, col
+
+ df = self.spark.range(10000).select(
+ col("id"),
+ lit("test_string_value_" * 10).alias("str_col"),
+ (col("id") % 100).alias("mod_col"),
+ )
+
+ for codec in ["none", "zstd", "lz4"]:
+ with self.subTest(compressionCodec=codec):
+ with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ pdf = df.toPandas()
+ self.assertEqual(len(pdf), 10000)
+ self.assertEqual(pdf.columns.tolist(), ["id", "str_col", "mod_col"])
+
+ def test_toArrow_with_compression_codec_large_dataset(self):
+ # Test compression with a larger dataset for toArrow
+ from pyspark.sql.functions import lit, col
+
+ df = self.spark.range(10000).select(
+ col("id"),
+ lit("test_string_value_" * 10).alias("str_col"),
+ (col("id") % 100).alias("mod_col"),
+ )
+
+ for codec in ["none", "zstd", "lz4"]:
+ with self.subTest(compressionCodec=codec):
+ with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ t = df.toArrow()
+ self.assertEqual(t.num_rows, 10000)
+ self.assertEqual(t.column_names, ["id", "str_col", "mod_col"])
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 330a6499c5c7..5f02d2e24f10 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3958,6 +3958,20 @@ object SQLConf {
"than zero and less than INT_MAX.")
.createWithDefaultString("64MB")
+ val ARROW_EXECUTION_COMPRESSION_CODEC =
+ buildConf("spark.sql.execution.arrow.compressionCodec")
+ .doc("Compression codec used to compress Arrow IPC data when transferring data " +
+ "between JVM and Python processes (e.g., toPandas, toArrow). This can significantly " +
+ "reduce memory usage and network bandwidth when transferring large datasets. " +
+ "Supported codecs: 'none' (no compression), 'zstd' (Zstandard), 'lz4' (LZ4). " +
+ "Note that compression may add CPU overhead but can provide substantial memory savings " +
+ "especially for datasets with high compression ratios.")
+ .version("4.1.0")
+ .stringConf
+ .transform(_.toLowerCase(java.util.Locale.ROOT))
+ .checkValues(Set("none", "zstd", "lz4"))
+ .createWithDefault("none")
+
val ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH =
buildConf("spark.sql.execution.arrow.transformWithStateInPySpark.maxStateRecordsPerBatch")
.doc("When using TransformWithState in PySpark (both Python Row and Pandas), limit " +
@@ -7287,6 +7301,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def arrowMaxBytesPerBatch: Long = getConf(ARROW_EXECUTION_MAX_BYTES_PER_BATCH)
+ def arrowCompressionCodec: String = getConf(ARROW_EXECUTION_COMPRESSION_CODEC)
+
def arrowTransformWithStateInPySparkMaxStateRecordsPerBatch: Int =
getConf(ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH)
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 39d8c3995441..cf7494212cd5 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -279,6 +279,10 @@
bcpkix-jdk18on
test
+
+ org.apache.arrow
+ arrow-compression
+
target/scala-${scala.binary.version}/classes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 3072a12e3d58..7f260bd2efd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -23,9 +23,11 @@ import java.nio.channels.{Channels, ReadableByteChannel}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
+import org.apache.arrow.compression.{Lz4CompressionCodec, ZstdCompressionCodec}
import org.apache.arrow.flatbuf.MessageHeader
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
+import org.apache.arrow.vector.compression.{CompressionCodec, NoCompressionCodec}
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter, ReadChannel, WriteChannel}
import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer}
@@ -37,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
@@ -92,8 +95,25 @@ private[sql] object ArrowConverters extends Logging {
ArrowUtils.rootAllocator.newChildAllocator(
s"to${this.getClass.getSimpleName}", 0, Long.MaxValue)
- private val root = VectorSchemaRoot.create(arrowSchema, allocator)
- protected val unloader = new VectorUnloader(root)
+ protected val root = VectorSchemaRoot.create(arrowSchema, allocator)
+
+ // Create compression codec based on config
+ private val compressionCodecName = SQLConf.get.arrowCompressionCodec
+ private val codec = compressionCodecName match {
+ case "none" => NoCompressionCodec.INSTANCE
+ case "zstd" =>
+ val factory = CompressionCodec.Factory.INSTANCE
+ val codecType = new ZstdCompressionCodec().getCodecType()
+ factory.createCodec(codecType)
+ case "lz4" =>
+ val factory = CompressionCodec.Factory.INSTANCE
+ val codecType = new Lz4CompressionCodec().getCodecType()
+ factory.createCodec(codecType)
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unsupported Arrow compression codec: $other. Supported values: none, zstd, lz4")
+ }
+ protected val unloader = new VectorUnloader(root, true, codec, true)
protected val arrowWriter = ArrowWriter.create(root)
Option(context).foreach {_.addTaskCompletionListener[Unit] { _ =>