Skip to content

Commit 39542bb

Browse files
hezuojiaodongjoon-hyun
authored andcommitted
[SPARK-34790][CORE] Disable fetching shuffle blocks in batch when io encryption is enabled
### What changes were proposed in this pull request? This patch proposes to disable fetching shuffle blocks in batch when io encryption is enabled. Adaptive Query Execution fetch contiguous shuffle blocks for the same map task in batch to reduce IO and improve performance. However, we found that batch fetching is incompatible with io encryption. ### Why are the changes needed? Before this patch, we set `spark.io.encryption.enabled` to true, then run some queries which coalesced partitions by AEQ, may got following error message: ```14:05:52.638 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3) (11.240.37.88 executor driver): FetchFailed(BlockManagerId(driver, 11.240.37.88, 63574, None), shuffleId=0, mapIndex=0, mapId=0, reduceId=2, message= org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:494) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841) ... 25 more ) ``` ### Does this PR introduce any user-facing change? No ### How was this patch tested? New tests. Closes apache#31898 from hezuojiao/fetch_shuffle_in_batch. Authored-by: hezuojiao <hezuojiao@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 31da907 commit 39542bb

3 files changed

Lines changed: 30 additions & 5 deletions

File tree

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
5151
true
5252
}
5353
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
54+
// SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption.
55+
val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)
5456

5557
val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
56-
(!compressed || codecConcatenation) && !useOldFetchProtocol
58+
(!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption
5759
if (shouldBatchFetch && !doBatchFetch) {
5860
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
5961
"we can not enable the feature because other conditions are not satisfied. " +
6062
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
6163
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
62-
s"$useOldFetchProtocol.")
64+
s"$useOldFetchProtocol, io encryption: $ioEncryption.")
6365
}
6466
doBatchFetch
6567
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,8 @@ object SQLConf {
500500
"reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
501501
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
502502
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
503-
"on a relocatable serializer, the concatenation support codec in use and the new version " +
504-
"shuffle fetch protocol.")
503+
"on a relocatable serializer, the concatenation support codec in use, the new version " +
504+
"shuffle fetch protocol and io encryption is disabled.")
505505
.version("3.0.0")
506506
.booleanConf
507507
.createWithDefault(true)

sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.scalatest.BeforeAndAfterAll
2121

2222
import org.apache.spark.{SparkConf, SparkFunSuite}
23+
import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
2324
import org.apache.spark.internal.config.UI.UI_ENABLED
2425
import org.apache.spark.sql._
2526
import org.apache.spark.sql.execution.adaptive._
@@ -57,15 +58,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
5758
def withSparkSession(
5859
f: SparkSession => Unit,
5960
targetPostShuffleInputSize: Int,
60-
minNumPostShufflePartitions: Option[Int]): Unit = {
61+
minNumPostShufflePartitions: Option[Int],
62+
enableIOEncryption: Boolean = false): Unit = {
6163
val sparkConf =
6264
new SparkConf(false)
6365
.setMaster("local[*]")
6466
.setAppName("test")
6567
.set(UI_ENABLED, false)
68+
.set(IO_ENCRYPTION_ENABLED, enableIOEncryption)
6669
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
6770
.set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
6871
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
72+
.set(SQLConf.FETCH_SHUFFLE_BLOCKS_IN_BATCH.key, "true")
6973
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
7074
.set(
7175
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
@@ -408,6 +412,25 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
408412
}
409413
withSparkSession(test, 100, None)
410414
}
415+
416+
test("SPARK-34790: enable IO encryption in AQE partition coalescing") {
417+
val test: SparkSession => Unit = { spark: SparkSession =>
418+
val ds = spark.range(0, 100, 1, numInputPartitions)
419+
val resultDf = ds.repartition(ds.col("id"))
420+
resultDf.collect()
421+
422+
val finalPlan = resultDf.queryExecution.executedPlan
423+
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
424+
assert(
425+
finalPlan.collect {
426+
case r @ CoalescedShuffleReader() => r
427+
}.isDefinedAt(0))
428+
}
429+
Seq(true, false).foreach { enableIOEncryption =>
430+
// Before SPARK-34790, it will throw an exception when io encryption enabled.
431+
withSparkSession(test, Int.MaxValue, None, enableIOEncryption)
432+
}
433+
}
411434
}
412435

413436
object CoalescedShuffleReader {

0 commit comments

Comments
 (0)