Skip to content

Commit e17dd66

Browse files
committed
Sampling-based RDD with unordered input should be INDETERMINATE.
1 parent 68e29ba commit e17dd66

5 files changed

Lines changed: 56 additions & 12 deletions

File tree

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
265265
} else {
266266
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
267267
}
268-
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
268+
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
269269
}
270270

271271
/**
@@ -295,7 +295,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
295295
} else {
296296
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
297297
}
298-
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
298+
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
299299
}
300300

301301
/**

core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
6767
thisSampler.setSeed(split.seed)
6868
thisSampler.sample(firstParent[T].iterator(split.prev, context))
6969
}
70+
71+
override protected def getOutputDeterministicLevel = {
72+
if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
73+
DeterministicLevel.INDETERMINATE
74+
} else {
75+
super.getOutputDeterministicLevel
76+
}
77+
}
7078
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ abstract class RDD[T: ClassTag](
541541
val sampler = new BernoulliCellSampler[T](lb, ub)
542542
sampler.setSeed(seed + index)
543543
sampler.sample(partition)
544-
}, preservesPartitioning = true)
544+
}, isOrderSensitive = true, preservesPartitioning = true)
545545
}
546546

547547
/**
@@ -854,6 +854,29 @@ abstract class RDD[T: ClassTag](
854854
preservesPartitioning)
855855
}
856856

857+
/**
858+
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
859+
* of the original partition.
860+
*
861+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
862+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
863+
*
864+
* `isOrderSensitive` indicates whether the function is order-sensitive. If it is order
865+
* sensitive, it may return totally different result when the input order
866+
* is changed. Mostly stateful functions are order-sensitive.
867+
*/
868+
private[spark] def mapPartitionsWithIndex[U: ClassTag](
869+
f: (Int, Iterator[T]) => Iterator[U],
870+
preservesPartitioning: Boolean,
871+
isOrderSensitive: Boolean): RDD[U] = withScope {
872+
val cleanedF = sc.clean(f)
873+
new MapPartitionsRDD(
874+
this,
875+
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
876+
preservesPartitioning,
877+
isOrderSensitive = isOrderSensitive)
878+
}
879+
857880
/**
858881
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
859882
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2713,6 +2713,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27132713
.contains("Spark cannot rollback the ShuffleMapStage 1"))
27142714
}
27152715

2716+
test("SPARK-29042: Sampled RDD with unordered input should be indeterminate") {
2717+
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false)
2718+
2719+
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
2720+
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
2721+
2722+
assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED)
2723+
2724+
val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L)
2725+
assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE)
2726+
}
2727+
27162728
private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
27172729
val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
27182730
val shuffleId = shuffleDep.shuffleId

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
103103
(part, (e.srcId, e.dstId, e.attr))
104104
}
105105
.partitionBy(new HashPartitioner(numPartitions))
106-
.mapPartitionsWithIndex( { (pid, iter) =>
107-
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
108-
iter.foreach { message =>
109-
val data = message._2
110-
builder.add(data._1, data._2, data._3)
111-
}
112-
val edgePartition = builder.toEdgePartition
113-
Iterator((pid, edgePartition))
114-
}, preservesPartitioning = true)).cache()
106+
.mapPartitionsWithIndex(
107+
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
108+
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
109+
iter.foreach { message =>
110+
val data = message._2
111+
builder.add(data._1, data._2, data._3)
112+
}
113+
val edgePartition = builder.toEdgePartition
114+
Iterator((pid, edgePartition))
115+
}, preservesPartitioning = true)).cache()
115116
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
116117
}
117118

0 commit comments

Comments
 (0)