Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ object StaticSQLConf {
.intConf
.createWithDefault(1000)

val BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD -> BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry for this error,i have fixed that

buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.doc("The maximum degree of parallelism to fetch and broadcast the table. " +
"If we encounter memory issue like frequently full GC or OOM when broadcast table " +
"we can decrease this number in order to reduce memory usage. " +
"Notice the number should be carefully chosen since decreasing parallelism might " +
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0, "The threshold should be positive.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this number cannot be more than 128, let's add that condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for checking thres <= 128, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it .internal() so that we can remove this away if there's anything wrong found with this configuration later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just want to comment like that. I also think this is advanced and should not be exposed to users in general. +1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

.createWithDefault(128)

val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.HashedRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.util.{SparkFatalException, ThreadUtils}

/**
Expand Down Expand Up @@ -157,5 +157,6 @@ case class BroadcastExchangeExec(

object BroadcastExchangeExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the previous indentation was correct.

SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Indentation.

As SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD) is the 2nd parameter of ThreadUtils.newDaemonCachedThreadPool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @attilapiros done!

}