diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index e12f05bbc71d..d11fdfbf7d69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -132,6 +132,19 @@ object StaticSQLConf { .intConf .createWithDefault(1000) + val BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD = + buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold") + .internal() + .doc("The maximum degree of parallelism to fetch and broadcast the table. " + + "If we encounter memory issue like frequently full GC or OOM when broadcast table " + + "we can decrease this number in order to reduce memory usage. " + + "Notice the number should be carefully chosen since decreasing parallelism might " + + "cause longer waiting for other broadcasting. Also, increasing parallelism may " + + "cause memory problem.") + .intConf + .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].") + .createWithDefault(128) + val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length") .doc("Threshold of SQL length beyond which it will be truncated before adding to " + "event. Defaults to no truncation. If set to 0, callsite will be logged instead.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index b9972b8cf017..aa0dd1d62840 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.joins.HashedRelation import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.util.{SparkFatalException, ThreadUtils} /** @@ -157,5 +157,6 @@ case class BroadcastExchangeExec( object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) + ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", + SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD))) }