diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 57108dcedcf0..88607d508de2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] - for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + val groupSize = 500; + for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize); + y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize); + i <- x; j <- y) yield (i, j) } override def getDependencies: Seq[Dependency[_]] = List(