Skip to content

Commit 0e8e4bb

Browse files
author
Marcelo Vanzin
committed
Allow separate configs for all network opts in file download client.
But use the rpc config as the default. Also now properly tested on a real cluster, and verified idle connections are closed.
1 parent cfd01bd commit 0e8e4bb

1 file changed

Lines changed: 11 additions & 3 deletions

File tree

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,20 @@ private[netty] class NettyRpcEnv(
343343
private def downloadClient(host: String, port: Int): TransportClient = {
344344
if (fileDownloadFactory == null) synchronized {
345345
if (fileDownloadFactory == null) {
346+
val module = "files"
347+
val prefix = "spark.rpc.io."
346348
val clone = conf.clone()
347-
conf.getOption("spark.files.maxDownloadClients").foreach { v =>
348-
clone.set("spark.rpc.io.numConnectionsPerPeer", v)
349+
350+
// Copy any RPC configuration that is not overridden in the spark.files namespace.
351+
conf.getAll.foreach { case (key, value) =>
352+
if (key.startsWith(prefix)) {
353+
val opt = key.substring(prefix.length())
354+
clone.setIfMissing(s"spark.$module.io.$opt", value)
355+
}
349356
}
357+
350358
val ioThreads = clone.getInt("spark.files.io.threads", 1)
351-
val downloadConf = SparkTransportConf.fromSparkConf(clone, "rpc", ioThreads)
359+
val downloadConf = SparkTransportConf.fromSparkConf(clone, module, ioThreads)
352360
val downloadContext = new TransportContext(downloadConf, new NoOpRpcHandler(), true)
353361
fileDownloadFactory = downloadContext.createClientFactory(createClientBootstraps())
354362
}

0 commit comments

Comments
 (0)