Skip to content

Conversation

@suyanNone
Copy link
Contributor

When we traverse RDD, to generate Stage DAG, Spark will skip to judge the stage was added into shuffleIdToStage or not in some condition: get shuffleDep from getAncestorShuffleDependency

Before:
before1
before2

After:
after
after2

@suyanNone suyanNone changed the title [SPARK][SPARK-10842]Eliminate create duplicate stage while generate job dag [SPARK][SPARK-10842]Eliminate creating duplicate stage while generate job dag Sep 26, 2015
@SparkQA
Copy link

SparkQA commented Sep 26, 2015

Test build #43051 has finished for PR 8923 at commit 315887a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Sep 29, 2015

Test build #43084 has finished for PR 8923 at commit 315887a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Oct 14, 2015

Test build #43709 has finished for PR 8923 at commit 315887a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 14, 2015

Test build #43714 has finished for PR 8923 at commit 6a68113.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #43772 has finished for PR 8923 at commit 6a68113.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Oct 15, 2015

This looks good, but can you add a test case? Mostly I'd like to understand the dependency structure that leads to this (perhaps there are other corner cases related to it we should look at). I was expecting to see one stage as a dependency for multiple later stages, or something like that, but I don't see that even in your "After" image. A test case will also help prevent future regressions

@markhamstra
Copy link
Contributor

I prefer to fix this by changing the result type of getAncestorShuffleDependencies(rdd: RDD[_]) to be Set[ShuffleDependency[_, _, _]] and the result of that method from parents to parents.toSet. It was never the intent, nor is it useful, for getAncestorShuffleDependencies to produce duplicates.

@suyanNone
Copy link
Contributor Author

@markhamstra @squito
I need re-construct and re-run the test case to confirm that problem...

R1  --|              
         |   --->R3 | --------> R4 --|
         |                             |---->R6--->R7
R2  --|         R3 |--------->R5 --|

We get finalRDD: R7 shufflesdeps, the first is R6

for R6, not added into shuffleToMapStage, get his all not added into shuffleIdToMapStage AncestersDeps (R4(R3(R1, R2)), R5(R3(R1, R2))), in Stack it somthing like(R4, R3, R1, R2, R5, R3, R1, R2), for each dep, it will create a new ShuffleMapStage. so the deps from R1->R3 and R2 ->R3, will create twice.

eh... @markhamstra, change the return type from Stack to Set sounds like good, but the shuffleDep object, I not sure it is unique for each shuffleId, I need to check that. If I confirm that uniq, I'd like to change the val parents = new Stack[ShuffleDependency[_, _, _]] to new Set.

@markhamstra
Copy link
Contributor

@suyanNone Yes, do check uniqueness, and if that looks fine, then there isn't really a reason to use a Stack instead of Set for parents.

@squito
Copy link
Contributor

squito commented Nov 3, 2015

I see, so this comes from a "diamond" dependency. I'm not seeing that in your "after" image though -- am I just missing it with too many criss-crossing lines? I'm wondering if there is another bug w/ either the listener events or the visualization.

I'm actually wondering if there is any point in building up the parents structure at all -- you could also just directly add to shuffleToMapStage inside getAncestorShuffleDependencies. eg. just inline this https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L286 to https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L417. That is the only use of getAncestorShuffleDependencies, what do you think aboutjust renaming it to registerAncestorShuffleDependencies and having it do both? I think it would actually be easier to follow.

@markhamstra
Copy link
Contributor

@squito I guess the only issue with that is whether we will ever have need in the future to get the ancestor shuffle dependencies without registering them. I doubt it, so I think I'm in favor of simplifying and inlining the code. @kayousterhout ?

@squito
Copy link
Contributor

squito commented Nov 4, 2015

I spent a bit of time reproducing this. It is actually rather tricky. (a) the shared dependency has to be a shared ShuffleDependency, but not a shared RDD, since the rdds are tracked in visited: Set[RDD]. (b) you need one more stage after the end of the diamond, b/c getShuffleMapStage "unrolls" the loop one level, handling the final stage directly without passing it off to getAncestorShuffleDependencies. (So its a diamond with a little tail ... a lollipop dependency?). In any case, here is what I came up with. On master this does produce an extra stage; with this fix here it avoids the duplication, and the SparkListenerJobStart does have the correct relationship between everything. This is really just the repro, though, I didn't write the checks (just took a look with some printlns ...)

  test("shared shuffle stages with diamond dependencies", ActiveTag) {
    val rddA = new MyRDD(sc, 3, Nil).setName("rddA")
    val diamondInput = new ShuffleDependency(rddA, new HashPartitioner(2))
    val rddTop = new MyRDD(sc, 2, List(diamondInput)).setName("rddTop")
    val diamondTop = new ShuffleDependency(rddTop, new HashPartitioner(10))
    val rddBottom = new MyRDD(sc, 2, List(diamondInput)).setName("rddBottom")
    val diamondBottom = new ShuffleDependency(rddBottom, new HashPartitioner(10))
    val rddC = new MyRDD(sc, 2, List(diamondTop, diamondBottom)).setName("rddC")
    val lastDep = new ShuffleDependency(rddC, new HashPartitioner(3))
    val rddD = new MyRDD(sc, 3, List(lastDep)).setName("rddD")
    submit(rddD, Array(0, 1, 2))
}

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45850 has finished for PR 8923 at commit 6a68113.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45861 has started for PR 8923 at commit a1aa8be.

@shaneknapp
Copy link
Contributor

i will retrigger the build after jenkins maintenance is done.

@shaneknapp
Copy link
Contributor

jenkins, test this please

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45866 has finished for PR 8923 at commit a1aa8be.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45867 has finished for PR 8923 at commit a1aa8be.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return an immutable Set.

@SparkQA
Copy link

SparkQA commented Nov 16, 2015

Test build #45975 has finished for PR 8923 at commit d747d51.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #47042 has finished for PR 8923 at commit 66ae245.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Dec 2, 2015

@suyanNone still needs the test case

@suyanNone
Copy link
Contributor Author

Revert set to Stack, and add test case.
Revert set to Stack, for we should build map stage from bottom to up(Stack), not a random(Set structure).

@SparkQA
Copy link

SparkQA commented Apr 26, 2016

Test build #56984 has finished for PR 8923 at commit 513d0d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ueshin added a commit to ueshin/apache-spark that referenced this pull request May 4, 2016
@suyanNone
Copy link
Contributor Author

already merge into #12655, mark this closed

@suyanNone suyanNone closed this May 12, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants