-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23243][Core] Fix RDD.repartition() data correctness issue #22112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
883c3b3
0f815d0
5aaec48
ca3c369
b45b984
e92177b
4840ee2
9291c62
92179b8
b564f4e
23ce9a3
33fe289
4e25b45
35ba626
dbdc21c
91f057d
dd7474b
cba588d
77440e7
5371d21
a313403
aa06d1e
31a62bd
ee52bfb
9a3b8f4
df162ad
8952d08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -462,8 +462,9 @@ abstract class RDD[T: ClassTag]( | |
|
|
||
| // include a shuffle step so that our upstream tasks are still distributed | ||
| new CoalescedRDD( | ||
| new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), | ||
| new HashPartitioner(numPartitions)), | ||
| new ShuffledRDD[Int, T, T]( | ||
| mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true), | ||
| new HashPartitioner(numPartitions)), | ||
| numPartitions, | ||
| partitionCoalescer).values | ||
| } else { | ||
|
|
@@ -807,16 +808,21 @@ abstract class RDD[T: ClassTag]( | |
| * serializable and don't require closure cleaning. | ||
| * | ||
| * @param preservesPartitioning indicates whether the input function preserves the partitioner, | ||
| * which should be `false` unless this is a pair RDD and the input function doesn't modify | ||
| * the keys. | ||
| * which should be `false` unless this is a pair RDD and the input | ||
| * function doesn't modify the keys. | ||
| * @param isOrderSensitive whether or not the function is order-sensitive. If it's order | ||
| * sensitive, it may return totally different result when the input order | ||
| * is changed. Mostly stateful functions are order-sensitive. | ||
| */ | ||
| private[spark] def mapPartitionsWithIndexInternal[U: ClassTag]( | ||
| f: (Int, Iterator[T]) => Iterator[U], | ||
| preservesPartitioning: Boolean = false): RDD[U] = withScope { | ||
| preservesPartitioning: Boolean = false, | ||
| isOrderSensitive: Boolean = false): RDD[U] = withScope { | ||
| new MapPartitionsRDD( | ||
| this, | ||
| (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), | ||
| preservesPartitioning) | ||
| preservesPartitioning = preservesPartitioning, | ||
| isOrderSensitive = isOrderSensitive) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1636,6 +1642,16 @@ abstract class RDD[T: ClassTag]( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Return whether this RDD is reliably checkpointed and materialized. | ||
| */ | ||
| private[rdd] def isReliablyCheckpointed: Boolean = { | ||
| checkpointData match { | ||
| case Some(reliable: ReliableRDDCheckpointData[_]) if reliable.isCheckpointed => true | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets the name of the directory to which this RDD was checkpointed. | ||
| * This is not defined if the RDD is checkpointed locally. | ||
|
|
@@ -1865,6 +1881,63 @@ abstract class RDD[T: ClassTag]( | |
| // RDD chain. | ||
| @transient protected lazy val isBarrier_ : Boolean = | ||
| dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) | ||
|
|
||
| /** | ||
| * Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]] | ||
| * for the definition. | ||
| * | ||
| * By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is DETERMINATE. For | ||
| * RDDs with parents, we will generate a deterministic level candidate per parent according to | ||
| * the dependency. The deterministic level of the current RDD is the deterministic level | ||
| * candidate that is deterministic least. Please override [[getOutputDeterministicLevel]] to | ||
| * provide custom logic of calculating output deterministic level. | ||
| */ | ||
| // TODO: make it public so users can set deterministic level to their custom RDDs. | ||
| // TODO: this can be per-partition. e.g. UnionRDD can have different deterministic level for | ||
| // different partitions. | ||
| private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = { | ||
| if (isReliablyCheckpointed) { | ||
| DeterministicLevel.DETERMINATE | ||
| } else { | ||
| getOutputDeterministicLevel | ||
| } | ||
| } | ||
|
|
||
| @DeveloperApi | ||
| protected def getOutputDeterministicLevel: DeterministicLevel.Value = { | ||
| val deterministicLevelCandidates = dependencies.map { | ||
| // The shuffle is not really happening, treat it like narrow dependency and assume the output | ||
| // deterministic level of current RDD is same as parent. | ||
| case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => | ||
| dep.rdd.outputDeterministicLevel | ||
|
|
||
| case dep: ShuffleDependency[_, _, _] => | ||
|
||
| if (dep.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) { | ||
| // If map output was indeterminate, shuffle output will be indeterminate as well | ||
| DeterministicLevel.INDETERMINATE | ||
| } else if (dep.keyOrdering.isDefined && dep.aggregator.isDefined) { | ||
| // if aggregator specified (and so unique keys) and key ordering specified - then | ||
| // consistent ordering. | ||
| DeterministicLevel.DETERMINATE | ||
| } else { | ||
| // In Spark, the reducer fetches multiple remote shuffle blocks at the same time, and | ||
| // the arrival order of these shuffle blocks are totally random. Even if the parent map | ||
| // RDD is DETERMINATE, the reduce RDD is always UNORDERED. | ||
| DeterministicLevel.UNORDERED | ||
| } | ||
|
|
||
| // For narrow dependency, assume the output deterministic level of current RDD is same as | ||
| // parent. | ||
| case dep => dep.rdd.outputDeterministicLevel | ||
| } | ||
|
|
||
| if (deterministicLevelCandidates.isEmpty) { | ||
| // By default we assume the root RDD is determinate. | ||
| DeterministicLevel.DETERMINATE | ||
| } else { | ||
| deterministicLevelCandidates.maxBy(_.id) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
@@ -1918,3 +1991,18 @@ object RDD { | |
| new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * The deterministic level of RDD's output (i.e. what `RDD#compute` returns). This explains how | ||
| * the output will diff when Spark reruns the tasks for the RDD. There are 3 deterministic levels: | ||
| * 1. DETERMINATE: The RDD output is always the same data set in the same order after a rerun. | ||
| * 2. UNORDERED: The RDD output is always the same data set but the order can be different | ||
| * after a rerun. | ||
| * 3. INDETERMINATE. The RDD output can be different after a rerun. | ||
| * | ||
| * Note that, the output of an RDD usually relies on the parent RDDs. When the parent RDD's output | ||
| * is INDETERMINATE, it's very likely the RDD's output is also INDETERMINATE. | ||
| */ | ||
| private[spark] object DeterministicLevel extends Enumeration { | ||
| val DETERMINATE, UNORDERED, INDETERMINATE = Value | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,7 +40,7 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.internal.config | ||
| import org.apache.spark.network.util.JavaUtils | ||
| import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} | ||
| import org.apache.spark.rdd.{RDD, RDDCheckpointData} | ||
| import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData} | ||
| import org.apache.spark.rpc.RpcTimeout | ||
| import org.apache.spark.storage._ | ||
| import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat | ||
|
|
@@ -1487,6 +1487,63 @@ private[spark] class DAGScheduler( | |
| failedStages += failedStage | ||
| failedStages += mapStage | ||
| if (noResubmitEnqueued) { | ||
| // If the map stage is INDETERMINATE, which means the map tasks may return | ||
| // different result when re-try, we need to re-try all the tasks of the failed | ||
| // stage and its succeeding stages, because the input data will be changed after the | ||
| // map tasks are re-tried. | ||
| // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is | ||
| // guaranteed to be determinate, so the input data of the reducers will not change | ||
| // even if the map tasks are re-tried. | ||
| if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) { | ||
| // It's a little tricky to find all the succeeding stages of `failedStage`, because | ||
| // each stage only know its parents not children. Here we traverse the stages from | ||
| // the leaf nodes (the result stages of active jobs), and rollback all the stages | ||
| // in the stage chains that connect to the `failedStage`. To speed up the stage | ||
| // traversing, we collect the stages to rollback first. If a stage needs to | ||
| // rollback, all its succeeding stages need to rollback to. | ||
| val stagesToRollback = scala.collection.mutable.HashSet(failedStage) | ||
|
||
|
|
||
| def collectStagesToRollback(stageChain: List[Stage]): Unit = { | ||
| if (stagesToRollback.contains(stageChain.head)) { | ||
| stageChain.drop(1).foreach(s => stagesToRollback += s) | ||
| } else { | ||
| stageChain.head.parents.foreach { s => | ||
| collectStagesToRollback(s :: stageChain) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def generateErrorMessage(stage: Stage): String = { | ||
| "A shuffle map stage with indeterminate output was failed and retried. " + | ||
| s"However, Spark cannot rollback the $stage to re-process the input data, " + | ||
| "and has to fail this job. Please eliminate the determinism by checkpointing " + | ||
|
||
| "the RDD before repartition and try again." | ||
| } | ||
|
|
||
| activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) | ||
|
|
||
| stagesToRollback.foreach { | ||
| case mapStage: ShuffleMapStage => | ||
| val numMissingPartitions = mapStage.findMissingPartitions().length | ||
| if (numMissingPartitions < mapStage.numTasks) { | ||
| // TODO: support to rollback shuffle files. | ||
| // Currently the shuffle writing is "first write wins", so we can't re-run a | ||
| // shuffle map stage and overwrite existing shuffle files. We have to finish | ||
| // SPARK-8029 first. | ||
| abortStage(mapStage, generateErrorMessage(mapStage), None) | ||
| } | ||
|
|
||
| case resultStage: ResultStage if resultStage.activeJob.isDefined => | ||
| val numMissingPartitions = resultStage.findMissingPartitions().length | ||
| if (numMissingPartitions < resultStage.numTasks) { | ||
|
||
| // TODO: support to rollback result tasks. | ||
| abortStage(resultStage, generateErrorMessage(resultStage), None) | ||
| } | ||
|
|
||
| case _ => | ||
| } | ||
|
||
| } | ||
|
|
||
| // We expect one executor failure to trigger many FetchFailures in rapid succession, | ||
| // but all of those task failures can typically be handled by a single resubmission of | ||
| // the failed stage. We avoid flooding the scheduler's event queue with resubmit | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will mean all rdd's which are directly or indirectly reading from an unsorted shuffle output are not 'idempotent'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that is expected, unless the computing function sorts the input data. For this case, we can override the
isIdempotent.