From 409a6438585c79e20648a23849560ead09743325 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jan 2017 11:31:17 -0800 Subject: [PATCH 1/6] [SPARK-17874][core] Add SSL port configuration. Make the SSL port configuration explicit, instead of deriving it from the non-SSL port, but retain the existing functionality in case anyone depends on it. The change starts the HTTPS and HTTP connectors separately, so that it's possible to use independent ports for each. For that to work, the initialization of the server needs to be shuffled around a bit. The change also makes it so the initialization of both connectors is similar, and end up using the same Scheduler - previously only the HTTP connector would use the correct one. Also removed some outdated documentation about a SSL config namespace that was removed a long time ago. Tested with unit tests and by running spark-shell with SSL configs. --- .../scala/org/apache/spark/SSLOptions.scala | 9 ++ .../org/apache/spark/ui/JettyUtils.scala | 153 +++++++++++------- .../scala/org/apache/spark/util/Utils.scala | 11 +- .../org/apache/spark/SSLOptionsSuite.scala | 2 + .../scala/org/apache/spark/ui/UISuite.scala | 28 +++- docs/configuration.md | 16 +- docs/security.md | 4 - 7 files changed, 153 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 5f14102c3c366..49f0476793dd1 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -34,6 +34,8 @@ import org.apache.spark.internal.Logging * * @param enabled enables or disables SSL; if it is set to false, the rest of the * settings are disregarded + * @param port the port where to bind the SSL server; if not defined, it will be + * based on the non-SSL port for the same service. * @param keyStore a path to the key-store file * @param keyStorePassword a password to access the key-store file * @param keyPassword a password to access the private key in the key-store @@ -47,6 +49,7 @@ import org.apache.spark.internal.Logging */ private[spark] case class SSLOptions( enabled: Boolean = false, + port: Option[Int] = None, keyStore: Option[File] = None, keyStorePassword: Option[String] = None, keyPassword: Option[String] = None, @@ -164,6 +167,11 @@ private[spark] object SSLOptions extends Logging { def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) + val port = conf.getOption(s"$ns.port").map(_.toInt) + port.foreach { p => + require(p >= 0, "Port number must be a positive value.") + } + val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_)) .orElse(defaults.flatMap(_.keyStore)) @@ -198,6 +206,7 @@ private[spark] object SSLOptions extends Logging { new SSLOptions( enabled, + port, keyStore, keyStorePassword, keyPassword, diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 35c3c8d00f99b..7c117151505c7 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -27,7 +27,7 @@ import scala.xml.Node import org.eclipse.jetty.client.api.Response import org.eclipse.jetty.proxy.ProxyServlet -import org.eclipse.jetty.server.{HttpConnectionFactory, Request, Server, ServerConnector} +import org.eclipse.jetty.server._ import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.servlets.gzip.GzipHandler @@ -283,81 +283,110 @@ private[spark] object JettyUtils extends Logging { gzipHandler } - // Bind to the given port, or throw a java.net.BindException if the port is occupied - def connect(currentPort: Int): (Server, Int) = { - val pool = new QueuedThreadPool - if (serverName.nonEmpty) { - pool.setName(serverName) - } - pool.setDaemon(true) - - val server = new Server(pool) - val connectors = new ArrayBuffer[ServerConnector] - // Create a connector on port currentPort to listen for HTTP requests - val httpConnector = new ServerConnector( - server, - null, - // Call this full constructor to set this, which forces daemon threads: - new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true), - null, - -1, - -1, - new HttpConnectionFactory()) - httpConnector.setPort(currentPort) - connectors += httpConnector - - sslOptions.createJettySslContextFactory().foreach { factory => - // If the new port wraps around, do not try a privileged port. - val securePort = - if (currentPort != 0) { - (currentPort + 400 - 1024) % (65536 - 1024) + 1024 - } else { - 0 - } - val scheme = "https" - // Create a connector on port securePort to listen for HTTPS requests - val connector = new ServerConnector(server, factory) - connector.setPort(securePort) + // Start the server first, with no connectors. + val pool = new QueuedThreadPool + if (serverName.nonEmpty) { + pool.setName(serverName) + } + pool.setDaemon(true) - connectors += connector + val server = new Server(pool) - // redirect the HTTP requests to HTTPS port - collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) - } + val errorHandler = new ErrorHandler() + errorHandler.setShowStacks(true) + errorHandler.setServer(server) + server.addBean(errorHandler) + + server.setHandler(collection) + + // Executor used to create daemon threads for the Jetty connectors. + val serverExecutor = new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true) + + try { + server.start() - gzipHandlers.foreach(collection.addHandler) // As each acceptor and each selector will use one thread, the number of threads should at // least be the number of acceptors and selectors plus 1. (See SPARK-13776) var minThreads = 1 - connectors.foreach { connector => + + def newConnector( + connectionFactories: Array[ConnectionFactory], + port: Int): (ServerConnector, Int) = { + val connector = new ServerConnector( + server, + null, + serverExecutor, + null, + -1, + -1, + connectionFactories: _*) + connector.setPort(port) + connector.start() + // Currently we only use "SelectChannelConnector" // Limit the max acceptor number to 8 so that we don't waste a lot of threads connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8)) connector.setHost(hostName) // The number of selectors always equals to the number of acceptors minThreads += connector.getAcceptors * 2 + + (connector, connector.getLocalPort()) } - server.setConnectors(connectors.toArray) - pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) - val errorHandler = new ErrorHandler() - errorHandler.setShowStacks(true) - errorHandler.setServer(server) - server.addBean(errorHandler) - server.setHandler(collection) - try { - server.start() - (server, httpConnector.getLocalPort) - } catch { - case e: Exception => - server.stop() - pool.stop() - throw e + // If SSL is configured, create the secure connector first. + val securePort = sslOptions.createJettySslContextFactory().map { factory => + val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0) + val secureServerName = if (serverName.nonEmpty) s"$serverName (HTTPS)" else serverName + + def sslConnect(currentPort: Int): (ServerConnector, Int) = { + val connectionFactories = AbstractConnectionFactory.getFactories(factory, + new HttpConnectionFactory()) + newConnector(connectionFactories, currentPort) + } + + val (connector, boundPort) = Utils.startServiceOnPort[ServerConnector](securePort, + sslConnect, conf, secureServerName) + server.addConnector(connector) + boundPort + } + + // Bind the HTTP port. + def httpConnect(currentPort: Int): (ServerConnector, Int) = { + newConnector(Array(new HttpConnectionFactory()), currentPort) + } + + val (httpConnector, httpPort) = Utils.startServiceOnPort[ServerConnector](port, httpConnect, + conf, serverName) + + // If SSL is configured, then configure redirection in the HTTP connector. + securePort.foreach { p => + val redirector = createRedirectHttpsHandler(p, "https") + collection.addHandler(redirector) + redirector.start() } - } - val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName) - ServerInfo(server, boundPort, collection) + // Add all the known handlers and install the connectors. + handlers.foreach { h => + val gzipHandler = new GzipHandler() + gzipHandler.setHandler(h) + collection.addHandler(gzipHandler) + gzipHandler.start() + } + server.addConnector(httpConnector) + + pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) + ServerInfo(server, httpPort, securePort, collection) + } catch { + case e: Exception => + server.stop() + if (serverExecutor.isStarted()) { + serverExecutor.stop() + } + if (pool.isStarted()) { + pool.stop() + } + throw e + } } private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { @@ -375,8 +404,7 @@ private[spark] object JettyUtils extends Logging { val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort, baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) - response.encodeRedirectURL(httpsURI) - response.sendRedirect(httpsURI) + response.sendRedirect(response.encodeRedirectURL(httpsURI)) baseRequest.setHandled(true) } }) @@ -442,6 +470,7 @@ private[spark] object JettyUtils extends Logging { private[spark] case class ServerInfo( server: Server, boundPort: Int, + securePort: Option[Int], rootHandler: ContextHandlerCollection) { def stop(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2c1d331b9ab18..c225e1a0cc1bf 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2202,6 +2202,14 @@ private[spark] object Utils extends Logging { } } + /** + * Returns the user port to try when trying to bind a service. Handles wrapping and skipping + * privileged ports. + */ + def userPort(base: Int, offset: Int): Int = { + (base + offset - 1024) % (65536 - 1024) + 1024 + } + /** * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). @@ -2229,8 +2237,7 @@ private[spark] object Utils extends Logging { val tryPort = if (startPort == 0) { startPort } else { - // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 + userPort(startPort, offset) } try { val (service, port) = startService(tryPort) diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala index 2b8b1805bc83f..6fc7cea6ee94a 100644 --- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala @@ -103,6 +103,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll { val conf = new SparkConf conf.set("spark.ssl.enabled", "true") conf.set("spark.ssl.ui.enabled", "false") + conf.set("spark.ssl.ui.port", "4242") conf.set("spark.ssl.keyStore", keyStorePath) conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.ui.keyStorePassword", "12345") @@ -118,6 +119,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll { val opts = SSLOptions.parse(conf, "spark.ssl.ui", defaults = Some(defaultOpts)) assert(opts.enabled === false) + assert(opts.port === Some(4242)) assert(opts.trustStore.isDefined === true) assert(opts.trustStore.get.getName === "truststore") assert(opts.trustStore.get.getAbsolutePath === trustStorePath) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 68c7657cb315b..105be0de40412 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ +import org.apache.spark.util.Utils class UISuite extends SparkFunSuite { @@ -52,13 +53,16 @@ class UISuite extends SparkFunSuite { (conf, new SecurityManager(conf).getSSLOptions("ui")) } - private def sslEnabledConf(): (SparkConf, SSLOptions) = { + private def sslEnabledConf(sslPort: Option[Int] = None): (SparkConf, SSLOptions) = { val keyStoreFilePath = getTestResourcePath("spark.keystore") val conf = new SparkConf() .set("spark.ssl.ui.enabled", "true") .set("spark.ssl.ui.keyStore", keyStoreFilePath) .set("spark.ssl.ui.keyStorePassword", "123456") .set("spark.ssl.ui.keyPassword", "123456") + sslPort.foreach { p => + conf.set("spark.ssl.ui.port", p.toString) + } (conf, new SecurityManager(conf).getSSLOptions("ui")) } @@ -227,6 +231,28 @@ class UISuite extends SparkFunSuite { assert(newHeader === null) } + test("specify both http and https ports separately") { + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + try { + socket = new ServerSocket(0) + + // Make sure the SSL port lies way outside the "http + 400" range used as the default. + val baseSslPort = Utils.userPort(socket.getLocalPort(), 10000) + val (conf, sslOptions) = sslEnabledConf(sslPort = Some(baseSslPort)) + + serverInfo = JettyUtils.startJettyServer("0.0.0.0", socket.getLocalPort() + 1, + sslOptions, Seq[ServletContextHandler](), conf, "server1") + + val notAllowed = Utils.userPort(serverInfo.boundPort, 400) + assert(serverInfo.securePort.isDefined) + assert(serverInfo.securePort.get != Utils.userPort(serverInfo.boundPort, 400)) + } finally { + stopServer(serverInfo) + closeSocket(socket) + } + } + def stopServer(info: ServerInfo): Unit = { if (info != null && info.server != null) info.server.stop } diff --git a/docs/configuration.md b/docs/configuration.md index 7a11a983d5972..73476c08bef57 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1780,12 +1780,26 @@ Apart from these, the following properties are also available, and may be useful Configuration for details on hierarchical SSL configuration for services. + + spark.ssl.[namespace].port + None + + The port where the SSL service will listen on. + +
The port must be defined within a namespace configuration; see + SSL Configuration for the available + namespaces. + +
When not set, the SSL port will be derived from the non-SSL port for the + same service. A value of "0" will make the service bind to an ephemeral port. + + spark.ssl.enabledAlgorithms Empty A comma separated list of ciphers. The specified ciphers must be supported by JVM. - The reference list of protocols one can find on + The reference list of protocols one can found on this page. Note: If not set, it will use the default cipher suites of JVM. diff --git a/docs/security.md b/docs/security.md index baadfefbec826..f99e9f005a4ae 100644 --- a/docs/security.md +++ b/docs/security.md @@ -48,10 +48,6 @@ component-specific configuration namespaces used to override the default setting Config Namespace Component - - spark.ssl.fs - HTTP file server and broadcast server - spark.ssl.ui Spark application Web UI From 6b7ff08f8e0930566324d6877b18253d3952ac3c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jan 2017 16:14:58 -0800 Subject: [PATCH 2/6] Undo broken doc change. --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 73476c08bef57..06d9eb847478e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1799,7 +1799,7 @@ Apart from these, the following properties are also available, and may be useful Empty A comma separated list of ciphers. The specified ciphers must be supported by JVM. - The reference list of protocols one can found on + The reference list of protocols one can find on this page. Note: If not set, it will use the default cipher suites of JVM. From a429135e1db940d95bed95c8357a80ba3a79de62 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jan 2017 17:58:39 -0800 Subject: [PATCH 3/6] Comment fix. --- core/src/main/scala/org/apache/spark/SSLOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 49f0476793dd1..29163e7f30546 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -169,7 +169,7 @@ private[spark] object SSLOptions extends Logging { val port = conf.getOption(s"$ns.port").map(_.toInt) port.foreach { p => - require(p >= 0, "Port number must be a positive value.") + require(p >= 0, "Port number must be a non-negative value.") } val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_)) From ced09ced6bf3c85543b717bce2023705aaf5cc6e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 18 Jan 2017 15:47:50 -0800 Subject: [PATCH 4/6] Minor cleanup. --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 7c117151505c7..36668e435775f 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -337,10 +337,10 @@ private[spark] object JettyUtils extends Logging { val securePort = sslOptions.createJettySslContextFactory().map { factory => val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0) val secureServerName = if (serverName.nonEmpty) s"$serverName (HTTPS)" else serverName + val connectionFactories = AbstractConnectionFactory.getFactories(factory, + new HttpConnectionFactory()) def sslConnect(currentPort: Int): (ServerConnector, Int) = { - val connectionFactories = AbstractConnectionFactory.getFactories(factory, - new HttpConnectionFactory()) newConnector(connectionFactories, currentPort) } @@ -365,14 +365,15 @@ private[spark] object JettyUtils extends Logging { redirector.start() } - // Add all the known handlers and install the connectors. + server.addConnector(httpConnector) + + // Add all the known handlers now that connectors are configured. handlers.foreach { h => val gzipHandler = new GzipHandler() gzipHandler.setHandler(h) collection.addHandler(gzipHandler) gzipHandler.start() } - server.addConnector(httpConnector) pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) ServerInfo(server, httpPort, securePort, collection) From f6825d1ee93926fc3eb14b028a5d883caf0784ae Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 26 Jan 2017 14:58:10 -0800 Subject: [PATCH 5/6] Updates after merge with SPARK-19220. --- .../scala/org/apache/spark/ui/JettyUtils.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 4d22082703539..7909821db954b 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -279,14 +279,6 @@ private[spark] object JettyUtils extends Logging { addFilters(handlers, conf) - val gzipHandlers = handlers.map { h => - h.setVirtualHosts(Array("@" + SPARK_CONNECTOR_NAME)) - - val gzipHandler = new GzipHandler - gzipHandler.setHandler(h) - gzipHandler - } - // Start the server first, with no connectors. val pool = new QueuedThreadPool if (serverName.nonEmpty) { @@ -301,6 +293,7 @@ private[spark] object JettyUtils extends Logging { errorHandler.setServer(server) server.addBean(errorHandler) + val collection = new ContextHandlerCollection server.setHandler(collection) // Executor used to create daemon threads for the Jetty connectors. @@ -379,6 +372,7 @@ private[spark] object JettyUtils extends Logging { // Add all the known handlers now that connectors are configured. handlers.foreach { h => + h.setVirtualHosts(toVirtualHosts(SPARK_CONNECTOR_NAME)) val gzipHandler = new GzipHandler() gzipHandler.setHandler(h) collection.addHandler(gzipHandler) @@ -403,7 +397,7 @@ private[spark] object JettyUtils extends Logging { private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { val redirectHandler: ContextHandler = new ContextHandler redirectHandler.setContextPath("/") - redirectHandler.setVirtualHosts(Array("@" + REDIRECT_CONNECTOR_NAME)) + redirectHandler.setVirtualHosts(toVirtualHosts(REDIRECT_CONNECTOR_NAME)) redirectHandler.setHandler(new AbstractHandler { override def handle( target: String, @@ -477,6 +471,8 @@ private[spark] object JettyUtils extends Logging { new URI(scheme, authority, path, query, null).toString } + def toVirtualHosts(connectors: String*): Array[String] = connectors.map("@" + _).toArray + } private[spark] case class ServerInfo( @@ -486,7 +482,7 @@ private[spark] case class ServerInfo( private val rootHandler: ContextHandlerCollection) { def addHandler(handler: ContextHandler): Unit = { - handler.setVirtualHosts(Array("@" + JettyUtils.SPARK_CONNECTOR_NAME)) + handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME)) rootHandler.addHandler(handler) if (!handler.isStarted()) { handler.start() From a3f551b7e5d58b0f2933a9a48e7e928171e152b2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 6 Feb 2017 11:51:03 -0800 Subject: [PATCH 6/6] Clarify what "spark.ssl.fs" means. --- docs/security.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/security.md b/docs/security.md index 38365486476fa..42a09a9148d40 100644 --- a/docs/security.md +++ b/docs/security.md @@ -48,6 +48,10 @@ component-specific configuration namespaces used to override the default setting Config Namespace Component + + spark.ssl.fs + File download client (used to download jars and files from HTTPS-enabled servers). + spark.ssl.ui Spark application Web UI