Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Jul 5, 2020

What changes were proposed in this pull request?

This PR changes the order between initialization for ExecutorPlugin and starting heartbeat thread in Executor.

Why are the changes needed?

In the current master, heartbeat thread in a executor starts after plugin initialization so if the initialization takes long time, heartbeat is not sent to driver and the executor will be removed from cluster.

Does this PR introduce any user-facing change?

Yes. Plugins for executors will be allowed to take long time for initialization.

How was this patch tested?

New testcase.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #124964 has finished for PR 29002 at commit d724179.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Jul 6, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #124993 has finished for PR 29002 at commit d724179.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Jul 6, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #124994 has finished for PR 29002 at commit d724179.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Jul 6, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #124999 has finished for PR 29002 at commit d724179.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Jul 6, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125110 has finished for PR 29002 at commit d724179.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this from a real issue?

Shall we do the same thing for DriverPlugin or we already did?

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125316 has finished for PR 29002 at commit 1ccb1b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Jul 8, 2020

As soon as we heartbeat to driver, we will start getting task assignment, etc - how does the interact with a plugin which has not yet completed initialization ?
Instead of changing order, why not document that inordinate delays in plugin initialization should be avoided and deferred to seperate thread/managed by plugin.

@sarutak
Copy link
Member Author

sarutak commented Jul 8, 2020

@Ngone51

Is this from a real issue?

Shall we do the same thing for DriverPlugin or we already did?

It happens not in the production but an environment for experiment.
I confirmed we don't need to reorder the initialization for DriverPlugin because executors send hearbeat to the driver but the driver doesn't send it to self.

@sarutak
Copy link
Member Author

sarutak commented Jul 8, 2020

@mridulm

As soon as we heartbeat to driver, we will start getting task assignment, etc - how does the interact with a plugin which has not yet completed initialization ?

Yeah that's right.

Instead of changing order, why not document that inordinate delays in plugin initialization should be avoided and deferred to >seperate thread/managed by plugin.

This can be a reasonable compromise. Another solution can be to perform initialization before registering executors and then it's guaranteed that plugins are initialized when tasks start running.
What do you think?

@mridulm
Copy link
Contributor

mridulm commented Jul 8, 2020

This can be a reasonable compromise. Another solution can be to perform initialization before registering executors and then it's guaranteed that plugins are initialized when tasks start running.
What do you think?

Isn't that not what is implicitly handling now ? And causing issues if plugin takes a long time to initialize ?

@Ngone51
Copy link
Member

Ngone51 commented Jul 9, 2020

As soon as we heartbeat to driver, we will start getting task assignment, etc - how does the interact with a plugin which has not yet completed initialization ?

I wonder if this is correct. Executor only gets task assignment after it sends LaunchedExecutor to driver:

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))

After heartbeater started, the executor can send heartbeat to the driver asynchronically and the initialization of the plugin can keep going after heartbeater starting. It doesn't matter if it takes a long time because we'd send heartbeat now but no task assignment since LaunchedExecutor hasn't been sent.

@sarutak
Copy link
Member Author

sarutak commented Jul 9, 2020

ExecutorRegistered is regarded as the first heartbeat message although it's not the real Heartbeat.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala#L114

So If we reorder the initialization, it should be placed before the registration, not after heartbeater started right?

@mridulm
Copy link
Contributor

mridulm commented Jul 9, 2020

You are right @Ngone51, I misread the proposed change.

@Ngone51
Copy link
Member

Ngone51 commented Jul 9, 2020

So If we reorder the initialization, it should be placed before the registration, not after heartbeater started right?

Ideally, that way should work as well. However, it seems plugin initialization relies on some internal instances of Executor. So, if we want to initialize plugin out of the executor, I'm afraid we need more changes.

BTW, I think your current implementation can already work as I mentioned above.

@sarutak
Copy link
Member Author

sarutak commented Jul 9, 2020

Ideally, that way should work as well. However, it seems plugin initialization relies on some internal instances of Executor. So, if > we want to initialize plugin out of the executor, I'm afraid we need more changes.

Yes, if we reorder the initialization with another way, the initialization would be put on executor backends like CoarseGrainedExecutorBackend and we might need more changes as you are afraid.

BTW, I think your current implementation can already work as I mentioned above.

Yeah, I misunderstood. LaunchExecutor is sent after Executor is instansiated so the current implementation works.

@sarutak
Copy link
Member Author

sarutak commented Jul 10, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125591 has finished for PR 29002 at commit 1ccb1b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

test("SPARK-32175: Plugin initialization should start after heartbeater started") {
val tempDir = Utils.createTempDir()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean it up at the end of the test? (though I know it will be cleaned by shutdown hook anyway.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to use withTempDir.

val importStatements =
"""
|import java.util.Map;
|import org.apache.spark.api.plugin.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the qualified class name to avoid adding the new parameter preClassDefinitionBlock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's better.

implementsClasses: Seq[String] = Seq.empty,
extraCodeBody: String = ""): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val implementsText = implementsClasses.map(", " + _).mkString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe?

Suggested change
val implementsText = implementsClasses.map(", " + _).mkString
val implementsText =
s"implements ${implementsClasses ++ Seq("java.io.Serializable").mkString(", ")}"

Copy link
Member Author

@sarutak sarutak Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. To be much simpler, I'll make it (implementsClasses :+ "java.io.Serializable").mkString(", ").


metricsPoller.start()

// Plugins need to load using a class loader that includes the executor's user classpath
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comments to explain why we need to initialize plugin after heartbeater?

@sarutak
Copy link
Member Author

sarutak commented Jul 13, 2020

@Ngone51 Thanks for the feedback. I'll fix them.

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125789 has finished for PR 29002 at commit dba4c91.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • |public class $className $extendsText $implementsText

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126105 has finished for PR 29002 at commit d768385.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Jul 21, 2020

cc: @vanzin @tgravescs

@tgravescs
Copy link
Contributor

Yeah I think this is ok, especially since normally if your plugin is less then heartbeat time there shouldn't be any difference. I would like to test this with a couple scenarios though.

When we first put this in, I brought up that the plugin could block other executor functionality by running a long time, one thought was we could put plugin into another thread so it didn't block the executor. I think the thought at the time was if you have a long running executor plugin, the user should handle that by doing it in another thread or something. Now you could make the argument you have something long running in the plugin and you want to block the executor for other initialization. The heartbeat is 10seconds by default and I thought the timeout was the network timeout by default (120s), which seems like a really long time. I guess if you change those timeouts, but even then seems like a long time.

@tgravescs
Copy link
Contributor

sorry had stuff come up internally, I'll look more today.

@tgravescs
Copy link
Contributor

ok looked at this a bit more and tested a few scenarios, I think this is fine, I would like it put right after heartbeater unless we have reason and would be nice to get test to run faster.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126520 has finished for PR 29002 at commit 0b8da96.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member Author

sarutak commented Jul 25, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126533 has finished for PR 29002 at commit 0b8da96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good pending Jenkins

@tgravescs
Copy link
Contributor

it looks like arrow tests failing which shouldn't be related, kicked once more

@sarutak
Copy link
Member Author

sarutak commented Jul 29, 2020

It's strange that the reason of the R test failure implies that deprecated function is used. But it's resolved in #29252 .
I'll rebase this change.

2020-07-26T03:10:50.2710895Z test_sparkSQL_arrow.R:39: error: createDataFrame/collect Arrow optimization
2020-07-26T03:10:50.2711900Z (converted from warning) Use 'read_ipc_stream' or 'read_feather' instead.
2020-07-26T03:10:50.2712135Z Backtrace:
2020-07-26T03:10:50.2712459Z   1. base::tryCatch(...) tests/fulltests/test_sparkSQL_arrow.R:39:2
2020-07-26T03:10:50.2712724Z   7. SparkR::collect(createDataFrame(mtcars))
2020-07-26T03:10:50.2712988Z   8. SparkR:::.local(x, ...)
2020-07-26T03:10:50.2713221Z  11. arrow::read_arrow(readRaw(conn))
2020-07-26T03:10:50.2713946Z  12. base::.Deprecated(msg = "Use 'read_ipc_stream' or 'read_feather' instead.")
2020-07-26T03:10:50.2714178Z  13. base::warning(...)
2020-07-26T03:10:50.2714462Z  14. base::withRestarts(...)
2020-07-26T03:10:50.2714735Z  15. base:::withOneRestart(expr, restarts[[1L]])
2020-07-26T03:10:50.2714999Z  16. base:::doWithOneRestart(return(expr), restart)

@SparkQA
Copy link

SparkQA commented Jul 29, 2020

Test build #126742 has finished for PR 29002 at commit f106fe9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jul 29, 2020

LGTM

@asfgit asfgit closed this in 9be0883 Jul 29, 2020
asfgit pushed a commit that referenced this pull request Jul 29, 2020
…Plugin and starting heartbeat thread

### What changes were proposed in this pull request?

This PR changes the order between initialization for ExecutorPlugin and starting heartbeat thread in Executor.

### Why are the changes needed?

In the current master, heartbeat thread in a executor starts after plugin initialization so if the initialization takes long time, heartbeat is not sent to driver and the executor will be removed from cluster.

### Does this PR introduce _any_ user-facing change?

Yes. Plugins for executors will be allowed to take long time for initialization.

### How was this patch tested?

New testcase.

Closes #29002 from sarutak/fix-heartbeat-issue.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit 9be0883)
Signed-off-by: Thomas Graves <[email protected]>
@tgravescs
Copy link
Contributor

finally, tests pass, merged to master and branch-3.0. thanks @sarutak

@HyukjinKwon
Copy link
Member

Would you guys mind taking a look for the flaky tests added here? It seems failing in GitHub Actions as below:

[info] - SPARK-32175: Plugin initialization should start after heartbeater started *** FAILED *** (32 seconds, 514 milliseconds)
[info]   The code passed to failAfter did not complete within 30000000000 nanoseconds. (SparkSubmitSuite.scala:1438)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at java.lang.Thread.getStackTrace(Thread.java:1559)
[info]   at org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
[info]   at org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
[info]   at org.apache.spark.deploy.SparkSubmitSuite$.failAfterImpl(SparkSubmitSuite.scala:1419)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
[info]   at org.apache.spark.deploy.SparkSubmitSuite$.failAfter(SparkSubmitSuite.scala:1419)
[info]   at org.apache.spark.deploy.SparkSubmitSuite$.runSparkSubmit(SparkSubmitSuite.scala:1438)
[info]   at org.apache.spark.executor.ExecutorSuite.$anonfun$new$17(ExecutorSuite.scala:469)
[info]   at org.apache.spark.executor.ExecutorSuite.$anonfun$new$17$adapted(ExecutorSuite.scala:407)
[info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:170)
[info]   at org.apache.spark.executor.ExecutorSuite.$anonfun$new$16(ExecutorSuite.scala:407)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)

https://github.com/apache/spark/runs/928145740
https://github.com/apache/spark/runs/928147695
https://github.com/apache/spark/runs/928059788
https://github.com/apache/spark/runs/927624834
https://github.com/apache/spark/runs/927483940
https://github.com/apache/spark/runs/925705459
https://github.com/apache/spark/runs/925169167
https://github.com/apache/spark/runs/925406406
https://github.com/apache/spark/runs/923441051

The failure seems pretty frequent.

@sarutak
Copy link
Member Author

sarutak commented Jul 30, 2020

@HyukjinKwon I'll look into that. Should we ignore the test first?

@tgravescs
Copy link
Contributor

Personally I would be fine with just removing the test. I'm not sure how much benefit it really adds for the time it takes to run the test and if its being flaky I'm guessing you are going to have to increase the times.

@sarutak
Copy link
Member Author

sarutak commented Jul 30, 2020

Yeah, I wonder we need to increase the times. The testcase is to detect regressions which can happen in the future but I've left the comment about the order so removing the test can be fine.
@HyukjinKwon , what do you think?

@HyukjinKwon
Copy link
Member

Sure, removing sounds fine and simpler to me.

@sarutak
Copy link
Member Author

sarutak commented Jul 31, 2020

O.K, I'll make a PR for removing it.

asfgit pushed a commit that referenced this pull request Jul 31, 2020
### What changes were proposed in this pull request?

This PR removes a test added in SPARK-32175(#29002).

### Why are the changes needed?

That test is flaky. It can be mitigated by increasing the timeout but it would rather be simpler to remove the test.
See also the [discussion](#29002 (comment)).

### Does this PR introduce _any_ user-facing change?

No.

Closes #29314 from sarutak/remove-flaky-test.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
asfgit pushed a commit that referenced this pull request Jul 31, 2020
### What changes were proposed in this pull request?

This PR removes a test added in SPARK-32175(#29002).

### Why are the changes needed?

That test is flaky. It can be mitigated by increasing the timeout but it would rather be simpler to remove the test.
See also the [discussion](#29002 (comment)).

### Does this PR introduce _any_ user-facing change?

No.

Closes #29314 from sarutak/remove-flaky-test.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
(cherry picked from commit 9d7b1d9)
Signed-off-by: Kousuke Saruta <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants