Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.LinkedHashSet
import org.apache.avro.{Schema, SchemaNormalization}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.internal.config._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -56,6 +56,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

private val settings = new ConcurrentHashMap[String, String]()

private val reader = new ConfigReader(new SparkConfigProvider(settings))
reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})

if (loadDefaults) {
loadFromSystemProperties(false)
}
Expand Down Expand Up @@ -248,7 +253,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
* - This will throw an exception is the config is not optional and the value is not set.
*/
private[spark] def get[T](entry: ConfigEntry[T]): T = {
entry.readFrom(settings, getenv)
entry.readFrom(reader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,9 @@ import org.apache.spark.SparkConf
/**
* An entry contains all meta information for a configuration.
*
* Config options created using this feature support variable expansion. If the config value
* contains variable references of the form "${prefix:variableName}", the reference will be replaced
* with the value of the variable depending on the prefix. The prefix can be one of:
*
* - no prefix: if the config key starts with "spark", looks for the value in the Spark config
* - system: looks for the value in the system properties
* - env: looks for the value in the environment
*
* So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
* configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
* environment variable.
*
* For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
* will also consider the default value when it exists.
*
* If the reference cannot be resolved, the original string will be retained.
* When applying variable substitution to config values, only references starting with "spark." are
* considered in the default namespace. For known Spark configuration keys (i.e. those created using
* `ConfigBuilder`), references will also consider the default value when it exists.
*
* Variable expansion is also applied to the default values of config entries that have a default
* value declared as a string.
Expand Down Expand Up @@ -72,21 +59,14 @@ private[spark] abstract class ConfigEntry[T] (

def defaultValueString: String

def readFrom(conf: JMap[String, String], getenv: String => String): T
def readFrom(reader: ConfigReader): T

def defaultValue: Option[T] = None

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}

protected def readAndExpand(
conf: JMap[String, String],
getenv: String => String,
usedRefs: Set[String] = Set()): Option[String] = {
Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
}

}

private class ConfigEntryWithDefault[T] (
Expand All @@ -102,8 +82,8 @@ private class ConfigEntryWithDefault[T] (

override def defaultValueString: String = stringConverter(_defaultValue)

def readFrom(conf: JMap[String, String], getenv: String => String): T = {
readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultValue)
}

}
Expand All @@ -121,12 +101,9 @@ private class ConfigEntryWithDefaultString[T] (

override def defaultValueString: String = _defaultValue

def readFrom(conf: JMap[String, String], getenv: String => String): T = {
Option(conf.get(key))
.orElse(Some(_defaultValue))
.map(ConfigEntry.expand(_, conf, getenv, Set()))
.map(valueConverter)
.get
def readFrom(reader: ConfigReader): T = {
val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
valueConverter(value)
}

}
Expand All @@ -146,8 +123,8 @@ private[spark] class OptionalConfigEntry[T](

override def defaultValueString: String = "<undefined>"

override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
readAndExpand(conf, getenv).map(rawValueConverter)
override def readFrom(reader: ConfigReader): Option[T] = {
reader.get(key).map(rawValueConverter)
}

}
Expand All @@ -164,62 +141,21 @@ private class FallbackConfigEntry[T] (

override def defaultValueString: String = s"<value of ${fallback.key}>"

override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
override def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
}

}

private object ConfigEntry {
private[spark] object ConfigEntry {

private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r

def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
}

def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)

/**
* Expand the `value` according to the rules explained in ConfigEntry.
*/
def expand(
value: String,
conf: JMap[String, String],
getenv: String => String,
usedRefs: Set[String]): String = {
REF_RE.replaceAllIn(value, { m =>
val prefix = m.group(1)
val name = m.group(2)
val replacement = prefix match {
case null =>
require(!usedRefs.contains(name), s"Circular reference in $value: $name")
if (name.startsWith("spark.")) {
Option(findEntry(name))
.flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
.orElse(Option(conf.get(name)))
.orElse(defaultValueString(name))
} else {
None
}
case "system" => sys.props.get(name)
case "env" => Option(getenv(name))
case _ => None
}
Regex.quoteReplacement(replacement.getOrElse(m.matched))
})
}

private def defaultValueString(key: String): Option[String] = {
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
case _ => None
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.config

import java.util.{Map => JMap}

/**
* A source of configuration values.
*/
private[spark] trait ConfigProvider {

def get(key: String): Option[String]

}

private[spark] class EnvProvider extends ConfigProvider {

override def get(key: String): Option[String] = sys.env.get(key)

}

private[spark] class SystemProvider extends ConfigProvider {

override def get(key: String): Option[String] = sys.props.get(key)

}

private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvider {

override def get(key: String): Option[String] = Option(conf.get(key))

}

/**
* A config provider that only reads Spark config keys, and considers default values for known
* configs when fetching configuration values.
*/
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {

import ConfigEntry._

override def get(key: String): Option[String] = {
if (key.startsWith("spark.")) {
Option(conf.get(key)).orElse(defaultValueString(key))
} else {
None
}
}

private def defaultValueString(key: String): Option[String] = {
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
case _ => None
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.config

import java.util.{Map => JMap}
import java.util.regex.Pattern

import scala.collection.mutable.HashMap
import scala.util.matching.Regex

private object ConfigReader {

private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r

}

/**
* A helper class for reading config entries and performing variable substitution.
*
* If a config value contains variable references of the form "${prefix:variableName}", the
* reference will be replaced with the value of the variable depending on the prefix. By default,
* the following prefixes are handled:
*
* - no prefix: use the default config provider
* - system: looks for the value in the system properties
* - env: looks for the value in the environment
*
* Different prefixes can be bound to a `ConfigProvider`, which is used to read configuration
* values from the data source for the prefix, and both the system and env providers can be
* overridden.
*
* If the reference cannot be resolved, the original string will be retained.
*
* @param conf The config provider for the default namespace (no prefix).
*/
private[spark] class ConfigReader(conf: ConfigProvider) {

def this(conf: JMap[String, String]) = this(new MapProvider(conf))

private val bindings = new HashMap[String, ConfigProvider]()
bind(null, conf)
bindEnv(new EnvProvider())
bindSystem(new SystemProvider())

/**
* Binds a prefix to a provider. This method is not thread-safe and should be called
* before the instance is used to expand values.
*/
def bind(prefix: String, provider: ConfigProvider): ConfigReader = {
bindings(prefix) = provider
this
}

def bind(prefix: String, values: JMap[String, String]): ConfigReader = {
bind(prefix, new MapProvider(values))
}

def bindEnv(provider: ConfigProvider): ConfigReader = bind("env", provider)

def bindSystem(provider: ConfigProvider): ConfigReader = bind("system", provider)

/**
* Reads a configuration key from the default provider, and apply variable substitution.
*/
def get(key: String): Option[String] = conf.get(key).map(substitute)

/**
* Perform variable substitution on the given input string.
*/
def substitute(input: String): String = substitute(input, Set())

private def substitute(input: String, usedRefs: Set[String]): String = {
if (input != null) {
ConfigReader.REF_RE.replaceAllIn(input, { m =>
val prefix = m.group(1)
val name = m.group(2)
val ref = if (prefix == null) name else s"$prefix:$name"
require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")

val replacement = bindings.get(prefix)
.flatMap(_.get(name))
.map { v => substitute(v, usedRefs + ref) }
.getOrElse(m.matched)
Regex.quoteReplacement(replacement)
})
} else {
input
}
}

}
Loading