-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15616][SQL] Hive table supports partition pruning in JoinSelection #25919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan. |
|
ok to test |
|
add to whitelist |
| predicate.references.subsetOf(partitionSet) | ||
| } | ||
| val conf = session.sessionState.conf | ||
| if (pruningPredicates.nonEmpty && conf.fallBackToHdfsForStatsEnabled && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to check conf.fallBackToHdfsForStatsEnabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only get size from HDFS if conf.fallBackToHdfsForStatsEnabled? Since it could be a time-consuming operation.
Though, this condition should probably be pushed down to before the CommandUtils.calculateLocationSize call
|
Test build #111297 has finished for PR 25919 at commit
|
c054d22 to
e744da5
Compare
|
@cloud-fan Now pruned partitions are cached in HiveTableRelation, what do you think about current approach ? |
|
Test build #112331 has finished for PR 25919 at commit
|
|
Test build #112332 has finished for PR 25919 at commit
|
|
Test build #112334 has finished for PR 25919 at commit
|
|
Test build #112337 has finished for PR 25919 at commit
|
|
Test build #112356 has finished for PR 25919 at commit
|
| tableStats: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation { | ||
| tableStats: Option[Statistics] = None, | ||
| @transient normalizedFilters: Seq[Expression] = Nil, | ||
| @transient prunedPartitions: Seq[CatalogTablePartition] = Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we distinguish 0 partitions after pruning, and not being partition pruned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have another field called normalizedFilters, when it's empty(Nil), then the prunedPartitions are not pruned, otherwise it could be 0 partitions after pruning when prunedPartitions = Nil
| case class PruneHiveTablePartitions( | ||
| session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we follow PruneFileSourcePartitions? I think we should also support Filter(Project(HiveScan))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
| normalizedFilters) | ||
| val isFiltersEqual = normalizedFilters.zip(relation.normalizedFilters) | ||
| .forall { case (e1, e2) => e1.semanticEquals(e2) } | ||
| if (isFiltersEqual) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we doing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only under exactly matched pruning filters, we can simply get partitions from HiveTableRelation
| val withStats = relation.tableMeta.copy( | ||
| stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) | ||
| val prunedHiveTableRelation = relation.copy(tableMeta = withStats, | ||
| normalizedFilters = pruningPredicates, prunedPartitions = prunedPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep pruningPredicates? IIUC the approach should be very simply:
- this rule only changes
HiveTableRelationto hold an optional partition list. - the
HiveTableScanExecwill get the partition list fromHiveTableRelationor calllistPartitionsByFilter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to SPARK-24085, the pruningPredicates(we eliminate the subquery) could be different than the filters passed to HiveTableScan. So I keep the pruningPredicates, and only retrieves the prunedPartitions when HiveTableScanExec's pruningPartitionPredict matches exactly with HiveTableRelation's normalizedFilters.
The simplified solution occurred to me first, then I thought the filters could be different for some reason, and SPARK-24085 is an example, hence the proposed solution here.
1. don't store pruningFilters in HiveTableRelation 2. follow PruneFilSourcePartitions's style to extract projections, predicates and hive relation 3. skip partition pruning if scalar subquery is involved.
|
Test build #112679 has finished for PR 25919 at commit
|
|
Test build #112707 has finished for PR 25919 at commit
|
|
retest it please |
|
Gently ping @cloud-fan |
|
retest this please |
|
still |
|
Test build #114331 has finished for PR 25919 at commit
|
I think it's ready for review. |
| val normalizedFilters = partitionPruningPred.map(_.transform { | ||
| case a: AttributeReference => originalAttributes(a) | ||
| }) | ||
| sparkSession.sessionState.catalog.listPartitionsByFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @maropu @advancedxy
Since the rawPartitions are called by "prunePartitions(rawPartitions)" in doExecute method, it seems prunePartitions will filter out all irrelevant partitions using "boundPruningPred". Then why we still need to call listpartitionsByFilter here ?
Could you please help me understand this ? thanks a lot in advance.
| !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) | ||
| } | ||
| // SPARK-24085: scalar subquery should be skipped for partition pruning | ||
| val hasScalarSubquery = pruningPredicates.exists(SubqueryExpression.hasSubquery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It skips all subqueries instead of scalar subqueries.
| rawDataSize.get | ||
| } else if (totalSize.isDefined && totalSize.get > 0L) { | ||
| totalSize.get | ||
| } else if (conf.fallBackToHdfsForStatsEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per the doc of the conf "spark.sql.statistics.fallBackToHdfs", it is only for non-partitioned hive table :
"This flag is effective only for non-partitioned Hive tables."
|
closed in favor of #26805 |
### What changes were proposed in this pull request? Add optimizer rule PruneHiveTablePartitions pruning hive table partitions based on filters on partition columns. Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy. ### Why are the changes needed? In JoinSelection strategy, spark use the "plan.stats.sizeInBytes" to decide whether the plan is suitable for broadcast join. Currently, "plan.stats.sizeInBytes" does not take "pruned partitions" into account, so it may miss some broadcast join and take sort-merge join instead, which will definitely impact join performance. This PR aim at taking "pruned partitions" into account for hive table in "plan.stats.sizeInBytes" and then improve performance by using broadcast join if possible. ### Does this PR introduce any user-facing change? no ### How was this patch tested? Added unit tests. This is based on #25919, credits should go to lianhuiwang and advancedxy. Closes #26805 from fuwhu/SPARK-15616. Authored-by: fuwhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Add optimizer rule PruneHiveTablePartitions pruning hive table partitions based on filters on partition columns. Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy. In JoinSelection strategy, spark use the "plan.stats.sizeInBytes" to decide whether the plan is suitable for broadcast join. Currently, "plan.stats.sizeInBytes" does not take "pruned partitions" into account, so it may miss some broadcast join and take sort-merge join instead, which will definitely impact join performance. This PR aim at taking "pruned partitions" into account for hive table in "plan.stats.sizeInBytes" and then improve performance by using broadcast join if possible. no Added unit tests. This is based on apache#25919, credits should go to lianhuiwang and advancedxy. Closes apache#26805 from fuwhu/SPARK-15616. Authored-by: fuwhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
A new optimizer strategy called
PruneHiveTablePartitionsis added, which calculates table size as the total size of pruned partitions. Thus, Spark planner can pick upBroadcastJoinif the size of pruned partitions is under broadcast join threshold.Why are the changes needed?
This is a performance improvement.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests.
This is based on #18193, credits should go to @lianhuiwang.