Skip to content

Commit 75a06aa

Browse files
author
Marcelo Vanzin
committed
[SPARK-16272][CORE] Allow config values to reference conf, env, system props.
This allows configuration to be more flexible, for example, when the cluster does not have a homogeneous configuration (e.g. packages are installed on different paths in different nodes). By allowing one to reference the environment from the conf, it becomes possible to work around those in certain cases. As part of the implementation, ConfigEntry now keeps track of all "known" configs (i.e. those created through the use of ConfigBuilder), since that list is used by the resolution code. This duplicates some code in SQLConf, which could potentially be merged with this now. It will also make it simpler to implement some missing features such as filtering which configs show up in the UI or in event logs - which are not part of this change. Another change is in the way ConfigEntry reads config data; it now takes a string map and a function that reads env variables, so that it can be called both from SparkConf and SQLConf. This makes it so both places follow the same read path, instead of having to replicate certain logic in SQLConf. There are still a couple of methods in SQLConf that peek into fields of ConfigEntry directly, though. Tested via unit tests, and by using the new variable expansion functionality in a shell session with a custom spark.sql.hive.metastore.jars value. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14022 from vanzin/SPARK-16272.
1 parent e651900 commit 75a06aa

5 files changed

Lines changed: 236 additions & 38 deletions

File tree

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
248248
* - This will throw an exception is the config is not optional and the value is not set.
249249
*/
250250
private[spark] def get[T](entry: ConfigEntry[T]): T = {
251-
entry.readFrom(this)
251+
entry.readFrom(settings, getenv)
252252
}
253253

254254
/**

core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,20 +116,25 @@ private[spark] class TypedConfigBuilder[T](
116116

117117
/** Creates a [[ConfigEntry]] that has a default value. */
118118
def createWithDefault(default: T): ConfigEntry[T] = {
119-
val transformedDefault = converter(stringConverter(default))
120-
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
121-
stringConverter, parent._doc, parent._public)
122-
parent._onCreate.foreach(_(entry))
123-
entry
119+
// Treat "String" as a special case, so that both createWithDefault and createWithDefaultString
120+
// behave the same w.r.t. variable expansion of default values.
121+
if (default.isInstanceOf[String]) {
122+
createWithDefaultString(default.asInstanceOf[String])
123+
} else {
124+
val transformedDefault = converter(stringConverter(default))
125+
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
126+
stringConverter, parent._doc, parent._public)
127+
parent._onCreate.foreach(_(entry))
128+
entry
129+
}
124130
}
125131

126132
/**
127133
* Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
128134
* [[String]] and must be a valid value for the entry.
129135
*/
130136
def createWithDefaultString(default: String): ConfigEntry[T] = {
131-
val typedDefault = converter(default)
132-
val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
137+
val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
133138
parent._doc, parent._public)
134139
parent._onCreate.foreach(_(entry))
135140
entry

core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala

Lines changed: 123 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,35 @@
1717

1818
package org.apache.spark.internal.config
1919

20+
import java.util.{Map => JMap}
21+
22+
import scala.util.matching.Regex
23+
2024
import org.apache.spark.SparkConf
2125

2226
/**
2327
* An entry contains all meta information for a configuration.
2428
*
29+
* Config options created using this feature support variable expansion. If the config value
30+
* contains variable references of the form "${prefix:variableName}", the reference will be replaced
31+
* with the value of the variable depending on the prefix. The prefix can be one of:
32+
*
33+
* - no prefix: if the config key starts with "spark", looks for the value in the Spark config
34+
* - system: looks for the value in the system properties
35+
* - env: looks for the value in the environment
36+
*
37+
* So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
38+
* configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
39+
* environment variable.
40+
*
41+
* For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
42+
* will also consider the default value when it exists.
43+
*
44+
* If the reference cannot be resolved, the original string will be retained.
45+
*
46+
* Variable expansion is also applied to the default values of config entries that have a default
47+
* value declared as a string.
48+
*
2549
* @param key the key for the configuration
2650
* @param defaultValue the default value for the configuration
2751
* @param valueConverter how to convert a string to the value. It should throw an exception if the
@@ -42,17 +66,27 @@ private[spark] abstract class ConfigEntry[T] (
4266
val doc: String,
4367
val isPublic: Boolean) {
4468

69+
import ConfigEntry._
70+
71+
registerEntry(this)
72+
4573
def defaultValueString: String
4674

47-
def readFrom(conf: SparkConf): T
75+
def readFrom(conf: JMap[String, String], getenv: String => String): T
4876

49-
// This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
50-
// use readFrom().
5177
def defaultValue: Option[T] = None
5278

5379
override def toString: String = {
5480
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
5581
}
82+
83+
protected def readAndExpand(
84+
conf: JMap[String, String],
85+
getenv: String => String,
86+
usedRefs: Set[String] = Set()): Option[String] = {
87+
Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
88+
}
89+
5690
}
5791

5892
private class ConfigEntryWithDefault[T] (
@@ -68,12 +102,36 @@ private class ConfigEntryWithDefault[T] (
68102

69103
override def defaultValueString: String = stringConverter(_defaultValue)
70104

71-
override def readFrom(conf: SparkConf): T = {
72-
conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
105+
def readFrom(conf: JMap[String, String], getenv: String => String): T = {
106+
readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
73107
}
74108

75109
}
76110

111+
private class ConfigEntryWithDefaultString[T] (
112+
key: String,
113+
_defaultValue: String,
114+
valueConverter: String => T,
115+
stringConverter: T => String,
116+
doc: String,
117+
isPublic: Boolean)
118+
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
119+
120+
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
121+
122+
override def defaultValueString: String = _defaultValue
123+
124+
def readFrom(conf: JMap[String, String], getenv: String => String): T = {
125+
Option(conf.get(key))
126+
.orElse(Some(_defaultValue))
127+
.map(ConfigEntry.expand(_, conf, getenv, Set()))
128+
.map(valueConverter)
129+
.get
130+
}
131+
132+
}
133+
134+
77135
/**
78136
* A config entry that does not have a default value.
79137
*/
@@ -88,7 +146,9 @@ private[spark] class OptionalConfigEntry[T](
88146

89147
override def defaultValueString: String = "<undefined>"
90148

91-
override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
149+
override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
150+
readAndExpand(conf, getenv).map(rawValueConverter)
151+
}
92152

93153
}
94154

@@ -99,13 +159,67 @@ private class FallbackConfigEntry[T] (
99159
key: String,
100160
doc: String,
101161
isPublic: Boolean,
102-
private val fallback: ConfigEntry[T])
162+
private[config] val fallback: ConfigEntry[T])
103163
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
104164

105165
override def defaultValueString: String = s"<value of ${fallback.key}>"
106166

107-
override def readFrom(conf: SparkConf): T = {
108-
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
167+
override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
168+
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
169+
}
170+
171+
}
172+
173+
private object ConfigEntry {
174+
175+
private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
176+
177+
private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
178+
179+
def registerEntry(entry: ConfigEntry[_]): Unit = {
180+
val existing = knownConfigs.putIfAbsent(entry.key, entry)
181+
require(existing == null, s"Config entry ${entry.key} already registered!")
182+
}
183+
184+
def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
185+
186+
/**
187+
* Expand the `value` according to the rules explained in ConfigEntry.
188+
*/
189+
def expand(
190+
value: String,
191+
conf: JMap[String, String],
192+
getenv: String => String,
193+
usedRefs: Set[String]): String = {
194+
REF_RE.replaceAllIn(value, { m =>
195+
val prefix = m.group(1)
196+
val name = m.group(2)
197+
val replacement = prefix match {
198+
case null =>
199+
require(!usedRefs.contains(name), s"Circular reference in $value: $name")
200+
if (name.startsWith("spark.")) {
201+
Option(findEntry(name))
202+
.flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
203+
.orElse(Option(conf.get(name)))
204+
.orElse(defaultValueString(name))
205+
} else {
206+
None
207+
}
208+
case "system" => sys.props.get(name)
209+
case "env" => Option(getenv(name))
210+
case _ => None
211+
}
212+
Regex.quoteReplacement(replacement.getOrElse(m.matched))
213+
})
214+
}
215+
216+
private def defaultValueString(key: String): Option[String] = {
217+
findEntry(key) match {
218+
case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
219+
case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
220+
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
221+
case _ => None
222+
}
109223
}
110224

111225
}

0 commit comments

Comments
 (0)