File tree Expand file tree Collapse file tree 2 files changed +14
-0
lines changed
core/src/main/scala/org/apache/spark Expand file tree Collapse file tree 2 files changed +14
-0
lines changed Original file line number Diff line number Diff line change @@ -2564,4 +2564,11 @@ package object config {
25642564 .version(" 3.5.0" )
25652565 .stringConf
25662566 .createOptional
2567+
2568+ private [spark] val RDD_MAX_PARTITIONS = ConfigBuilder (" spark.carmel.stage.max.partitions" )
2569+ .doc(" The maximum number of partitions allowed when constructing a RDD." )
2570+ .version(" 3.5.0" )
2571+ .intConf
2572+ .checkValue(_ > 0 , " The maximum number of partitions must be greater than 0." )
2573+ .createWithDefault(Int .MaxValue )
25672574}
Original file line number Diff line number Diff line change @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
2222import scala .reflect .ClassTag
2323
2424import org .apache .spark ._
25+ import org .apache .spark .internal .config .RDD_MAX_PARTITIONS
2526import org .apache .spark .util .Utils
2627
2728private [spark]
@@ -55,8 +56,14 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
5556
5657 val numPartitionsInRdd2 = rdd2.partitions.length
5758
59+ val MAX_PARTITIONS_IN_STAGE = conf.get(RDD_MAX_PARTITIONS )
60+
5861 override def getPartitions : Array [Partition ] = {
5962 // create the cross product split
63+ if (rdd1.partitions.length * rdd2.partitions.length > MAX_PARTITIONS_IN_STAGE ) {
64+ throw new SparkException (s " Cartesian operation's partitions have " +
65+ s " reached max limitation $MAX_PARTITIONS_IN_STAGE" )
66+ }
6067 val array = new Array [Partition ](rdd1.partitions.length * rdd2.partitions.length)
6168 for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
6269 val idx = s1.index * numPartitionsInRdd2 + s2.index
You can’t perform that action at this time.
0 commit comments