Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ object StaticSQLConf {
.intConf
.createWithDefault(1000)

val BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD -> BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry for this error,i have fixed that

buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism to fetch and broadcast the table. " +
"If we encounter memory issue like frequently full GC or OOM when broadcast table " +
"we can decrease this number in order to reduce memory usage. " +
"Notice the number should be carefully chosen since decreasing parallelism might " +
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold should be positive.")
Copy link
Member

@viirya viirya May 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a maximum limit (128) for this config. The error message doesn't reflect that. Once an invalid value like 200 is set, it can't figure out what goes wrong with it, if not reading this code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let's improve this one before getting this in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @HyukjinKwon @viirya Thanks!

.createWithDefault(128)

val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.HashedRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.util.{SparkFatalException, ThreadUtils}

/**
Expand Down Expand Up @@ -157,5 +157,6 @@ case class BroadcastExchangeExec(

object BroadcastExchangeExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the previous indentation was correct.

SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @caneGuy seems like the last nit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @HyukjinKwon @kiszk thanks!

}