-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26601][SQL] Make broadcast-exchange thread pool configurable #23519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
985d53c
51a6ba0
d9e4cf6
50ebf3a
ae382c9
19e17ac
e00ebd5
3587a9a
5b37092
3bd77aa
4ff2b94
c01152d
09b0548
985f966
3f80071
115fecf
27759b7
9669569
ac2ec82
bafc7ac
abc937b
33b5039
a77505d
b45ff02
5ca45e8
7296999
8a54492
1b75f3b
954ef96
2ebb79b
cf133e6
819e5ea
e92088d
670bc55
06d5b17
190814e
8f17078
01301d0
dc3b35c
272428d
38f0307
4915cb3
06af625
47fbe49
b08805e
650b879
d89aa38
c0632ce
6f8c0e5
1b575ef
ede35c8
0b3abef
c2d0d70
e341864
30d94ff
34db5f5
8503aa3
64cc9e5
ace2364
6d9c54b
6c18d8d
9a30e23
421227a
00d144f
0b6e954
df5c075
dcaaebf
06b857c
057c46e
b0c16d2
121def8
869cd14
708b248
bbeffc1
ad4f649
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,4 +126,13 @@ object StaticSQLConf { | |
| .intConf | ||
| .createWithDefault(1000) | ||
|
|
||
| val MAX_BROADCAST_EXCHANGE_THREADNUMBER = | ||
| buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber") | ||
| .doc("The maximum degree of parallelism to fetch and broadcast the table.If we " + | ||
|
||
| "encounter memory issue when broadcast table we can decrease this number." + | ||
|
||
| "Notice the number should be carefully chosen since decrease parallelism will " + | ||
|
||
| "cause longer waiting for other broadcasting.And increase parallelism may " + | ||
|
||
| "cause memory problem.") | ||
| .intConf | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz check |
||
| .createWithDefault(128) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,15 +17,18 @@ | |
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import scala.concurrent.{Future, TimeoutException} | ||
| import scala.concurrent.duration._ | ||
| import scala.util.Random | ||
|
|
||
| import org.apache.spark.sql.{Dataset, Row} | ||
| import org.apache.spark.sql.{Dataset, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} | ||
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, IdentityBroadcastMode, SinglePartition} | ||
| import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} | ||
| import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| class ExchangeSuite extends SparkPlanTest with SharedSQLContext { | ||
| import testImplicits._ | ||
|
|
@@ -132,4 +135,33 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext { | |
| val projection2 = cached.select("_1", "_3").queryExecution.executedPlan | ||
| assert(!projection1.sameResult(projection2)) | ||
| } | ||
|
|
||
| test("SPARK-26601: Make broadcast-exchange thread pool configurable") { | ||
| val previousNumber = SparkSession.getActiveSession.get.sparkContext.conf | ||
| .get(StaticSQLConf.MAX_BROADCAST_EXCHANGE_THREADNUMBER) | ||
|
|
||
| SparkSession.getActiveSession.get.sparkContext.conf. | ||
| set(StaticSQLConf.MAX_BROADCAST_EXCHANGE_THREADNUMBER, 1) | ||
| assert(SQLConf.get.getConf(StaticSQLConf.MAX_BROADCAST_EXCHANGE_THREADNUMBER) === 1) | ||
|
|
||
| Future { | ||
| Thread.sleep(5*1000) | ||
| } (BroadcastExchangeExec.executionContext) | ||
|
|
||
| val f = Future {} (BroadcastExchangeExec.executionContext) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't have to test Java's thread executors. Can you just check if |
||
| intercept[TimeoutException] { | ||
| ThreadUtils.awaitResult(f, 3 seconds) | ||
| } | ||
|
|
||
| var executed = false | ||
| val ef = Future { | ||
| executed = true | ||
| } (BroadcastExchangeExec.executionContext) | ||
| ThreadUtils.awaitResult(ef, 3 seconds) | ||
| assert(executed) | ||
|
|
||
| // for other test | ||
| SparkSession.getActiveSession.get.sparkContext.conf. | ||
| set(StaticSQLConf.MAX_BROADCAST_EXCHANGE_THREADNUMBER, previousNumber) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok @maropu Thanks