From 786f24a6b633121792eb991d816ac763ed9c239f Mon Sep 17 00:00:00 2001 From: Alexander Bessonov Date: Tue, 17 Sep 2019 10:14:25 -0400 Subject: [PATCH 1/4] Define internal configuration option spark.shuffle.service.name Allow creation of YarnShuffleService with custom service name --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 6 +++++- .../scala/org/apache/spark/internal/config/package.scala | 3 +++ .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 3 ++- .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 5 +++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c170f99b112c0..04475e4d3300b 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -136,7 +136,11 @@ public class YarnShuffleService extends AuxiliaryService { private DB db; public YarnShuffleService() { - super("spark_shuffle"); + this("spark_shuffle"); + } + + public YarnShuffleService(String serviceName) { + super(serviceName); logger.info("Initializing YARN shuffle service for Spark"); instance = this; } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d142d22929728..8b567a03f52e7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -443,6 +443,9 @@ package object config { private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337) + private[spark] val SHUFFLE_SERVICE_NAME = + ConfigBuilder("spark.shuffle.service.name").stringConf.createWithDefault("spark_shuffle") + private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab") .doc("Location of user's keytab.") .stringConf.createOptional diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7046ad74056fc..d3c69804fbf75 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -115,7 +115,8 @@ private[yarn] class ExecutorRunnable( // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } - ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) } // Send the start request to the ContainerManager diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 8c62069a8dd67..326621e6b9b8c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -42,8 +42,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { override def newYarnConfig(): YarnConfiguration = { val yarnConfig = new YarnConfiguration() - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark2_shuffle") + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark2_shuffle"), classOf[YarnShuffleService].getCanonicalName) yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0") yarnConfig @@ -55,6 +55,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { logInfo("Shuffle service port = " + shuffleServicePort) Map( + SHUFFLE_SERVICE_NAME.key -> "spark2_shuffle", SHUFFLE_SERVICE_ENABLED.key -> "true", SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString, MAX_EXECUTOR_FAILURES.key -> "1" From 30f8c1d79b5b0514455b33bf0e5dd532cff9555a Mon Sep 17 00:00:00 2001 From: Alexander Bessonov Date: Wed, 2 Oct 2019 09:27:14 -0400 Subject: [PATCH 2/4] Add spark.shuffle.service.name to the documentation --- docs/configuration.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 729b1ba7ed2ca..af0d844fd088c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -865,6 +865,13 @@ Apart from these, the following properties are also available, and may be useful configuration and setup documentation for more information. + + spark.shuffle.service.name + spark_shuffle + + Name of the external shuffle service. + + spark.shuffle.service.port 7337 From 27e5c87dfd0d87468ed52d9495709255dfad3956 Mon Sep 17 00:00:00 2001 From: Alexander Bessonov Date: Thu, 3 Oct 2019 08:30:38 -0400 Subject: [PATCH 3/4] Rename spark.shuffle.service.name to spark.yarn.shuffle.service.name and move that option to appropriate place --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 2 +- .../scala/org/apache/spark/internal/config/package.scala | 3 --- docs/configuration.md | 7 ------- docs/running-on-yarn.md | 7 +++++++ .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 3 ++- .../main/scala/org/apache/spark/deploy/yarn/config.scala | 4 ++++ .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 2 +- 7 files changed, 15 insertions(+), 13 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 04475e4d3300b..73e65c09736f6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -139,7 +139,7 @@ public YarnShuffleService() { this("spark_shuffle"); } - public YarnShuffleService(String serviceName) { + protected YarnShuffleService(String serviceName) { super(serviceName); logger.info("Initializing YARN shuffle service for Spark"); instance = this; diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8b567a03f52e7..d142d22929728 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -443,9 +443,6 @@ package object config { private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337) - private[spark] val SHUFFLE_SERVICE_NAME = - ConfigBuilder("spark.shuffle.service.name").stringConf.createWithDefault("spark_shuffle") - private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab") .doc("Location of user's keytab.") .stringConf.createOptional diff --git a/docs/configuration.md b/docs/configuration.md index af0d844fd088c..729b1ba7ed2ca 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -865,13 +865,6 @@ Apart from these, the following properties are also available, and may be useful configuration and setup documentation for more information. - - spark.shuffle.service.name - spark_shuffle - - Name of the external shuffle service. - - spark.shuffle.service.port 7337 diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 418db41216cdb..bb82bf3f23455 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -492,6 +492,13 @@ To use a custom metrics.properties for the application master and executors, upd If it is not set then the YARN application ID is used. + + spark.yarn.shuffle.service.name + spark_shuffle + + Name of the external shuffle service. + + #### Available patterns for SHS custom executor log URL diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index d3c69804fbf75..5853bef5a7af8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.JavaUtils @@ -115,7 +116,7 @@ private[yarn] class ExecutorRunnable( // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } - val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + val serviceName = sparkConf.get(YARN_SHUFFLE_SERVICE_NAME) ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 4c187b2cc68e7..f298c41130a04 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -320,4 +320,8 @@ package object config { private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource." private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource." + + private[yarn] val YARN_SHUFFLE_SERVICE_NAME = ConfigBuilder("spark.yarn.shuffle.service.name") + .stringConf + .createWithDefault("spark_shuffle") } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 326621e6b9b8c..97e0f258f1468 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -55,7 +55,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { logInfo("Shuffle service port = " + shuffleServicePort) Map( - SHUFFLE_SERVICE_NAME.key -> "spark2_shuffle", + YARN_SHUFFLE_SERVICE_NAME.key -> "spark2_shuffle", SHUFFLE_SERVICE_ENABLED.key -> "true", SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString, MAX_EXECUTOR_FAILURES.key -> "1" From 60795d4edd89aeddd4a864246fae845789be6345 Mon Sep 17 00:00:00 2001 From: Alexander Bessonov Date: Fri, 11 Oct 2019 10:25:33 -0400 Subject: [PATCH 4/4] Clarify Spark Shuffle Service name option --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 8 +++++++- docs/running-on-yarn.md | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 73e65c09736f6..76cbc6051f783 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -139,9 +139,15 @@ public YarnShuffleService() { this("spark_shuffle"); } + /** + * Instantiate YarnShuffleService with arbitrary service name. + * Used for tests. + * YARN doesn't pass service name or any parameters to AuxiliaryServices. + * When instantiated by YARN, constructor without arguments would be called. + */ protected YarnShuffleService(String serviceName) { super(serviceName); - logger.info("Initializing YARN shuffle service for Spark"); + logger.info("Initializing YARN shuffle service \"{}\" for Spark", serviceName); instance = this; } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index bb82bf3f23455..35ab29de45c56 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -496,7 +496,8 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.shuffle.service.name spark_shuffle - Name of the external shuffle service. + The name of the external shuffle service. + The external shuffle service itself is configured and started by YARN (see [Configuring the External Shuffle Service](#configuring-the-external-shuffle-service) for details). The name specified here must match the name used in YARN service implementation.