-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work #3841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
62ec336
191face
396c226
29b751b
f450cd1
67bcb46
bc6e1ec
61a370d
7cdfd98
2d86d65
8cdf96d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -372,5 +372,5 @@ private[spark] object SparkConf { | |
| /** | ||
| * Return whether the given config is a Spark port config. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then you'll need to update the docs here to say something like: |
||
| */ | ||
| def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") | ||
| def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are misunderstanding what this method does. It looks for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should send
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that, but this currently matches something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I changed but used a more obvious way. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1692,15 +1692,12 @@ private[spark] object Utils extends Logging { | |
| /** | ||
| * Default maximum number of retries when binding to a port before giving up. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not "Default" anymore |
||
| */ | ||
| val portMaxRetries: Int = { | ||
| def portMaxRetries(conf: SparkConf): Int = { | ||
| if (sys.props.contains("spark.testing")) { | ||
| // Set a higher number of retries for tests... | ||
| sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that this takes in a conf, is it possible to replace these |
||
| } else { | ||
| Option(SparkEnv.get) | ||
| .flatMap(_.conf.getOption("spark.port.maxRetries")) | ||
| .map(_.toInt) | ||
| .getOrElse(16) | ||
| conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1709,17 +1706,20 @@ private[spark] object Utils extends Logging { | |
| * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). | ||
| * | ||
| * @param startPort The initial port to start the service on. | ||
| * @param maxRetries Maximum number of retries to attempt. | ||
| * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. | ||
| * @param startService Function to start service on a given port. | ||
| * This is expected to throw java.net.BindException on port collision. | ||
| * @param conf Used to get maximum number of retries. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| * @param serviceName Name of the service. | ||
| */ | ||
| def startServiceOnPort[T]( | ||
| startPort: Int, | ||
| startService: Int => (T, Int), | ||
| serviceName: String = "", | ||
| maxRetries: Int = portMaxRetries): (T, Int) = { | ||
| conf: SparkConf, | ||
| serviceName: String = "" | ||
| ): (T, Int) = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move this up 1 line |
||
| val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" | ||
| val maxRetries = portMaxRetries(conf) | ||
| logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo: need extra space in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to not log this. It'll get fairly noisy if there are many retries. |
||
| for (offset <- 0 to maxRetries) { | ||
| // Do not increment port if startPort is 0, which is treated as a special port | ||
| val tryPort = if (startPort == 0) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -148,8 +148,9 @@ class ExecutorRunnable( | |
| // registers with the Scheduler and transfers the spark configs. Since the Executor backend | ||
| // uses Akka to connect to the scheduler, the akka settings are needed as well as the | ||
| // authentication settings. | ||
| sparkConf.getAll. | ||
| filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. | ||
| sparkConf.getAll.filter { case (k, v) => | ||
| k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries") | ||
| }. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note your code, but shouldn't this just be the following for code reuse? |
||
| foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } | ||
|
|
||
| sparkConf.getAkkaConf. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pass this a conf that already exists instead of creating a new one. This requires changing the constructors of
HttpServerandHttpFileServer. I believe there is already a conf everywhere else.