Skip to content

Commit fbe4216

Browse files
ioana-delaneycloud-fan
authored andcommitted
[SPARK-20233][SQL] Apply star-join filter heuristics to dynamic programming join enumeration
## What changes were proposed in this pull request? Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph: ``` T1 D1 - T2 - T3 \ / F1 | D2 star-join: {F1, D1, D2} non-star: {T1, T2, T3} ``` The following join combinations will be generated: ``` level 0: (F1), (D1), (D2), (T1), (T2), (T3) level 1: {F1, D1}, {F1, D2}, {T2, T3} level 2: {F1, D1, D2} level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2} level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 } level 6: {F1, D1, D2, T1, T2, T3} ``` ## How was this patch tested? New test suite ```StarJOinCostBasedReorderSuite.scala```. Author: Ioana Delaney <ioanamdelaney@gmail.com> Closes #17546 from ioana-delaney/starSchemaCBOv3.
1 parent a4293c2 commit fbe4216

4 files changed

Lines changed: 571 additions & 9 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala

Lines changed: 136 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
5454

5555
private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
5656
val (items, conditions) = extractInnerJoins(plan)
57-
// TODO: Compute the set of star-joins and use them in the join enumeration
58-
// algorithm to prune un-optimal plan choices.
5957
val result =
6058
// Do reordering if the number of items is appropriate and join conditions exist.
6159
// We also need to check if costs of all items can be evaluated.
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
150148
case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
151149
}.toMap)
152150

151+
// Build filters from the join graph to be used by the search algorithm.
152+
val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
153+
153154
// Build plans for next levels until the last level has only one plan. This plan contains
154155
// all items that can be joined, so there's no need to continue.
155156
val topOutputSet = AttributeSet(output)
156-
while (foundPlans.size < items.length && foundPlans.last.size > 1) {
157+
while (foundPlans.size < items.length) {
157158
// Build plans for the next level.
158-
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet)
159+
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
159160
}
160161

161162
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
@@ -179,7 +180,8 @@ object JoinReorderDP extends PredicateHelper with Logging {
179180
existingLevels: Seq[JoinPlanMap],
180181
conf: SQLConf,
181182
conditions: Set[Expression],
182-
topOutput: AttributeSet): JoinPlanMap = {
183+
topOutput: AttributeSet,
184+
filters: Option[JoinGraphInfo]): JoinPlanMap = {
183185

184186
val nextLevel = mutable.Map.empty[Set[Int], JoinPlan]
185187
var k = 0
@@ -200,7 +202,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
200202
}
201203

202204
otherSideCandidates.foreach { otherSidePlan =>
203-
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
205+
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput, filters) match {
204206
case Some(newJoinPlan) =>
205207
// Check if it's the first plan for the item set, or it's a better plan than
206208
// the existing one due to lower cost.
@@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging {
218220
}
219221

220222
/**
221-
* Builds a new JoinPlan when both conditions hold:
223+
* Builds a new JoinPlan if the following conditions hold:
222224
* - the sets of items contained in left and right sides do not overlap.
223225
* - there exists at least one join condition involving references from both sides.
226+
* - if star-join filter is enabled, allow the following combinations:
227+
* 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
228+
* 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
229+
* 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
230+
*
224231
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
225232
* @param otherJoinPlan The other side JoinPlan for building a new join node.
226233
* @param conf SQLConf for statistics computation.
227234
* @param conditions The overall set of join conditions.
228235
* @param topOutput The output attributes of the final plan.
236+
* @param filters Join graph info to be used as filters by the search algorithm.
229237
* @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
230238
*/
231239
private def buildJoin(
232240
oneJoinPlan: JoinPlan,
233241
otherJoinPlan: JoinPlan,
234242
conf: SQLConf,
235243
conditions: Set[Expression],
236-
topOutput: AttributeSet): Option[JoinPlan] = {
244+
topOutput: AttributeSet,
245+
filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
237246

238247
if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
239248
// Should not join two overlapping item sets.
240249
return None
241250
}
242251

252+
if (filters.isDefined) {
253+
// Apply star-join filter, which ensures that tables in a star schema relationship
254+
// are planned together. The star-filter will eliminate joins among star and non-star
255+
// tables until the star joins are built. The following combinations are allowed:
256+
// 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
257+
// 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
258+
// 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
259+
val isValidJoinCombination =
260+
JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
261+
filters.get)
262+
if (!isValidJoinCombination) return None
263+
}
264+
243265
val onePlan = oneJoinPlan.plan
244266
val otherPlan = otherJoinPlan.plan
245267
val joinConds = conditions
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging {
327349
case class Cost(card: BigInt, size: BigInt) {
328350
def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
329351
}
352+
353+
/**
354+
* Implements optional filters to reduce the search space for join enumeration.
355+
*
356+
* 1) Star-join filters: Plan star-joins together since they are assumed
357+
* to have an optimal execution based on their RI relationship.
358+
* 2) Cartesian products: Defer their planning later in the graph to avoid
359+
* large intermediate results (expanding joins, in general).
360+
* 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing
361+
* intermediate results.
362+
*
363+
* Filters (2) and (3) are not implemented.
364+
*/
365+
object JoinReorderDPFilters extends PredicateHelper {
366+
/**
367+
* Builds join graph information to be used by the filtering strategies.
368+
* Currently, it builds the sets of star/non-star joins.
369+
* It can be extended with the sets of connected/unconnected joins, which
370+
* can be used to filter Cartesian products.
371+
*/
372+
def buildJoinGraphInfo(
373+
conf: SQLConf,
374+
items: Seq[LogicalPlan],
375+
conditions: Set[Expression],
376+
itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
377+
378+
if (conf.joinReorderDPStarFilter) {
379+
// Compute the tables in a star-schema relationship.
380+
val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
381+
val nonStarJoin = items.filterNot(starJoin.contains(_))
382+
383+
if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
384+
val itemMap = itemIndex.toMap
385+
Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))
386+
} else {
387+
// Nothing interesting to return.
388+
None
389+
}
390+
} else {
391+
// Star schema filter is not enabled.
392+
None
393+
}
394+
}
395+
396+
/**
397+
* Applies the star-join filter that eliminates join combinations among star
398+
* and non-star tables until the star join is built.
399+
*
400+
* Given the oneSideJoinPlan/otherSideJoinPlan, which represent all the plan
401+
* permutations generated by the DP join enumeration, and the star/non-star plans,
402+
* the following plan combinations are allowed:
403+
* 1. (oneSideJoinPlan U otherSideJoinPlan) is a subset of star-join
404+
* 2. star-join is a subset of (oneSideJoinPlan U otherSideJoinPlan)
405+
* 3. (oneSideJoinPlan U otherSideJoinPlan) is a subset of non star-join
406+
*
407+
* It assumes the sets are disjoint.
408+
*
409+
* Example query graph:
410+
*
411+
* t1 d1 - t2 - t3
412+
* \ /
413+
* f1
414+
* |
415+
* d2
416+
*
417+
* star: {d1, f1, d2}
418+
* non-star: {t2, t1, t3}
419+
*
420+
* level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
421+
* level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
422+
* level 2: {d2 f1 d1 }
423+
* level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
424+
* level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
425+
* level 5: {d1 t3 t2 f1 t1 d2 }
426+
*
427+
* @param oneSideJoinPlan One side of the join represented as a set of plan ids.
428+
* @param otherSideJoinPlan The other side of the join represented as a set of plan ids.
429+
* @param filters Star and non-star plans represented as sets of plan ids
430+
*/
431+
def starJoinFilter(
432+
oneSideJoinPlan: Set[Int],
433+
otherSideJoinPlan: Set[Int],
434+
filters: JoinGraphInfo) : Boolean = {
435+
val starJoins = filters.starJoins
436+
val nonStarJoins = filters.nonStarJoins
437+
val join = oneSideJoinPlan.union(otherSideJoinPlan)
438+
439+
// Disjoint sets
440+
oneSideJoinPlan.intersect(otherSideJoinPlan).isEmpty &&
441+
// Either star or non-star is empty
442+
(starJoins.isEmpty || nonStarJoins.isEmpty ||
443+
// Join is a subset of the star-join
444+
join.subsetOf(starJoins) ||
445+
// Star-join is a subset of join
446+
starJoins.subsetOf(join) ||
447+
// Join is a subset of non-star
448+
join.subsetOf(nonStarJoins))
449+
}
450+
}
451+
452+
/**
453+
* Helper class that keeps information about the join graph as sets of item/plan ids.
454+
* It currently stores the star/non-star plans. It can be
455+
* extended with the set of connected/unconnected plans.
456+
*/
457+
case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {
7676

7777
val emptyStarJoinPlan = Seq.empty[LogicalPlan]
7878

79-
if (!conf.starSchemaDetection || input.size < 2) {
79+
if (input.size < 2) {
8080
emptyStarJoinPlan
8181
} else {
8282
// Find if the input plans are eligible for star join detection.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,12 @@ object SQLConf {
736736
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
737737
.createWithDefault(0.7)
738738

739+
val JOIN_REORDER_DP_STAR_FILTER =
740+
buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
741+
.doc("Applies star-join filter heuristics to cost based join enumeration.")
742+
.booleanConf
743+
.createWithDefault(false)
744+
739745
val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
740746
.doc("When true, it enables join reordering based on star schema detection. ")
741747
.booleanConf
@@ -1011,6 +1017,8 @@ class SQLConf extends Serializable with Logging {
10111017

10121018
def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)
10131019

1020+
def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
1021+
10141022
def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
10151023

10161024
def sortMergeJoinExecBufferSpillThreshold: Int =

0 commit comments

Comments
 (0)