Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ antlr4-runtime/4.13.1//antlr4-runtime-4.13.1.jar
aopalliance-repackaged/3.0.6//aopalliance-repackaged-3.0.6.jar
arpack/3.0.4//arpack-3.0.4.jar
arpack_combined_all/0.1//arpack_combined_all-0.1.jar
arrow-compression/18.3.0//arrow-compression-18.3.0.jar
arrow-format/18.3.0//arrow-format-18.3.0.jar
arrow-memory-core/18.3.0//arrow-memory-core-18.3.0.jar
arrow-memory-netty-buffer-patch/18.3.0//arrow-memory-netty-buffer-patch-18.3.0.jar
Expand Down
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2517,6 +2517,25 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-compression</artifactId>
<version>${arrow.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
Expand Down
75 changes: 75 additions & 0 deletions python/pyspark/sql/tests/arrow/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,81 @@ def test_createDataFrame_arrow_fixed_size_list(self):
df = self.spark.createDataFrame(t)
self.assertIsInstance(df.schema["fsl"].dataType, ArrayType)

def test_toPandas_with_compression_codec(self):
# Test toPandas() with different compression codec settings
df = self.spark.createDataFrame(self.data, schema=self.schema)
expected = self.create_pandas_data_frame()

for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
pdf = df.toPandas()
assert_frame_equal(expected, pdf)

def test_toArrow_with_compression_codec(self):
# Test toArrow() with different compression codec settings
import pyarrow.compute as pc

t_in = self.create_arrow_table()

# Convert timezone-naive local timestamp column in input table to UTC
# to enable comparison to UTC timestamp column in output table
timezone = self.spark.conf.get("spark.sql.session.timeZone")
t_in = t_in.set_column(
t_in.schema.get_field_index("8_timestamp_t"),
"8_timestamp_t",
pc.assume_timezone(t_in["8_timestamp_t"], timezone),
)
t_in = t_in.cast(
t_in.schema.set(
t_in.schema.get_field_index("8_timestamp_t"),
pa.field("8_timestamp_t", pa.timestamp("us", tz="UTC")),
)
)

df = self.spark.createDataFrame(self.data, schema=self.schema)

for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
t_out = df.toArrow()
self.assertTrue(t_out.equals(t_in))

def test_toPandas_with_compression_codec_large_dataset(self):
# Test compression with a larger dataset to verify memory savings
# Create a dataset with repetitive data that compresses well
from pyspark.sql.functions import lit, col

df = self.spark.range(10000).select(
col("id"),
lit("test_string_value_" * 10).alias("str_col"),
(col("id") % 100).alias("mod_col"),
)

for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
pdf = df.toPandas()
self.assertEqual(len(pdf), 10000)
self.assertEqual(pdf.columns.tolist(), ["id", "str_col", "mod_col"])

def test_toArrow_with_compression_codec_large_dataset(self):
# Test compression with a larger dataset for toArrow
from pyspark.sql.functions import lit, col

df = self.spark.range(10000).select(
col("id"),
lit("test_string_value_" * 10).alias("str_col"),
(col("id") % 100).alias("mod_col"),
)

for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
with self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
t = df.toArrow()
self.assertEqual(t.num_rows, 10000)
self.assertEqual(t.column_names, ["id", "str_col", "mod_col"])


@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3958,6 +3958,20 @@ object SQLConf {
"than zero and less than INT_MAX.")
.createWithDefaultString("64MB")

val ARROW_EXECUTION_COMPRESSION_CODEC =
buildConf("spark.sql.execution.arrow.compressionCodec")
.doc("Compression codec used to compress Arrow IPC data when transferring data " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this optimization take effect in pandas udf?

Copy link
Member Author

@viirya viirya Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, it is currently applied on toArrow and toPandas which is on the reported issue. It should be also available to arrow udf and pandas udf. I will try to extend this to such cases.

"between JVM and Python processes (e.g., toPandas, toArrow). This can significantly " +
"reduce memory usage and network bandwidth when transferring large datasets. " +
"Supported codecs: 'none' (no compression), 'zstd' (Zstandard), 'lz4' (LZ4). " +
"Note that compression may add CPU overhead but can provide substantial memory savings " +
"especially for datasets with high compression ratios.")
.version("4.1.0")
.stringConf
.transform(_.toLowerCase(java.util.Locale.ROOT))
.checkValues(Set("none", "zstd", "lz4"))
.createWithDefault("none")

val ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH =
buildConf("spark.sql.execution.arrow.transformWithStateInPySpark.maxStateRecordsPerBatch")
.doc("When using TransformWithState in PySpark (both Python Row and Pandas), limit " +
Expand Down Expand Up @@ -7287,6 +7301,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def arrowMaxBytesPerBatch: Long = getConf(ARROW_EXECUTION_MAX_BYTES_PER_BATCH)

def arrowCompressionCodec: String = getConf(ARROW_EXECUTION_COMPRESSION_CODEC)

def arrowTransformWithStateInPySparkMaxStateRecordsPerBatch: Int =
getConf(ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH)

Expand Down
4 changes: 4 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@
<artifactId>bcpkix-jdk18on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-compression</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import java.nio.channels.{Channels, ReadableByteChannel}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.arrow.compression.{Lz4CompressionCodec, ZstdCompressionCodec}
import org.apache.arrow.flatbuf.MessageHeader
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.compression.{CompressionCodec, NoCompressionCodec}
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter, ReadChannel, WriteChannel}
import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer}

Expand All @@ -37,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
Expand Down Expand Up @@ -92,8 +95,25 @@ private[sql] object ArrowConverters extends Logging {
ArrowUtils.rootAllocator.newChildAllocator(
s"to${this.getClass.getSimpleName}", 0, Long.MaxValue)

private val root = VectorSchemaRoot.create(arrowSchema, allocator)
protected val unloader = new VectorUnloader(root)
protected val root = VectorSchemaRoot.create(arrowSchema, allocator)

// Create compression codec based on config
private val compressionCodecName = SQLConf.get.arrowCompressionCodec
private val codec = compressionCodecName match {
case "none" => NoCompressionCodec.INSTANCE
case "zstd" =>
val factory = CompressionCodec.Factory.INSTANCE
val codecType = new ZstdCompressionCodec().getCodecType()
factory.createCodec(codecType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great that we can have an option to add compression levels

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, we can add compression level option together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to add the option in #52925 along with Pandas UDF support.

case "lz4" =>
val factory = CompressionCodec.Factory.INSTANCE
val codecType = new Lz4CompressionCodec().getCodecType()
factory.createCodec(codecType)
case other =>
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be SparkException.internalError

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that would be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change to SparkException when I extend this to pandas udf.

s"Unsupported Arrow compression codec: $other. Supported values: none, zstd, lz4")
}
protected val unloader = new VectorUnloader(root, true, codec, true)
protected val arrowWriter = ArrowWriter.create(root)

Option(context).foreach {_.addTaskCompletionListener[Unit] { _ =>
Expand Down