Skip to content
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,5 +372,5 @@ private[spark] object SparkConf {
/**
* Return whether the given config is a Spark port config.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you'll need to update the docs here to say something like:

Return true if the given config matches either `spark.*.port` or `spark.port.*`.

*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are misunderstanding what this method does. It looks for spark.*.port intentionally and should not match spark.port.maxRetries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should send spark.port.maxRetries to the executor before it is launched then the config will be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that, but this currently matches something like spark.portable.mushroom which has nothing to do with Spark ports. Maybe instead you want to do something like:

name.matches("spark\\..*\\.port") | name.startsWith("spark.port.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I changed but used a more obvious way.

}
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo(s"Running Spark version $SPARK_VERSION")

private[spark] val conf = config.clone()
val portRetriesConf = conf.getOption("spark.port.maxRetries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use conf.getOption(...).foreach { portRetriesConf => [...] } but I'm not sure that it's a huge win.

if (portRetriesConf.isDefined) {
System.setProperty("spark.port.maxRetries", portRetriesConf.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't changing from SparkConf to system properties break the ability to set this configuration via SparkConf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand your point. User still could set this configuration via SparkConf as we just read its value from SparkConf and set it to system properties here.

}
conf.validateSettings()

/**
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1691,15 +1691,12 @@ private[spark] object Utils extends Logging {
/**
* Default maximum number of retries when binding to a port before giving up.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not "Default" anymore

*/
val portMaxRetries: Int = {
lazy val portMaxRetries: Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this lazy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should initialize this val until it is used but not from the beginning of initialization of Utils class. Defining portMaxRetries as a function could solve this issue either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just switch this to a def, then do we need the system properties changes, too? Or will just making this a def be sufficient? I'm just a bit wary / skeptical of code that uses system properties because I just spent a bunch of time cleaning up uses of properties in tests: #3739.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swithing to def is not enough. For some port using in SparkEnv, for instance the BlockManager's and 'http file server' port, they will not be able to read spark.port.maxRetries before env in SparkEnv is set.

if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that this takes in a conf, is it possible to replace these sys.props with conf? Will the tests still pass (I think they should)?

} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that changing this to read from System properties rather than the SparkEnv will break standalone mode here. Have you confirmed that spark.port.maxRetries in standalone mode still works with this change?

In general we should prefer using SparkEnv over System properties because they have all kind of gotchas. There's a reason we created SparkEnv in the first place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should read from the conf, not from sys.props directly

}
}

Expand All @@ -1719,6 +1716,7 @@ private[spark] object Utils extends Logging {
serviceName: String = "",
maxRetries: Int = portMaxRetries): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: need extra space in service$serviceString).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceString is already prefixed previously.

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to not log this. It'll get fairly noisy if there are many retries.

for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ trait ExecutorRunnableUtil extends Logging {
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
filter { case (k, v) =>
k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is underindented relative to filter; I'd move the filter { case (k, v) => to the previous line, and the matching brace to the next line.

}.
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

sparkConf.getAkkaConf.
Expand Down