Skip to content

Conversation

@fuwhu
Copy link
Contributor

@fuwhu fuwhu commented Dec 9, 2019

What changes were proposed in this pull request?

Add optimizer rule PruneHiveTablePartitions pruning hive table partitions based on filters on partition columns.
Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy.

Why are the changes needed?

In JoinSelection strategy, spark use the "plan.stats.sizeInBytes" to decide whether the plan is suitable for broadcast join.
Currently, "plan.stats.sizeInBytes" does not take "pruned partitions" into account, so it may miss some broadcast join and take sort-merge join instead, which will definitely impact join performance.
This PR aim at taking "pruned partitions" into account for hive table in "plan.stats.sizeInBytes" and then improve performance by using broadcast join if possible.

Does this PR introduce any user-facing change?

no

How was this patch tested?

Added unit tests.

This is based on #25919, credits should go to @lianhuiwang and @advancedxy.

@fuwhu
Copy link
Contributor Author

fuwhu commented Dec 9, 2019

@wangyum @cloud-fan
Could you please help review ?

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Test build #115119 has finished for PR 26805 at commit 0b60978.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PruneHiveTablePartitions(session: SparkSession)

@fuwhu
Copy link
Contributor Author

fuwhu commented Dec 13, 2019

gently cc: @cloud-fan @maropu

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115934 has finished for PR 26805 at commit 4e1aba9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PruneHiveTablePartitions(session: SparkSession)

@fuwhu
Copy link
Contributor Author

fuwhu commented Dec 30, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115946 has finished for PR 26805 at commit 4e1aba9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PruneHiveTablePartitions(session: SparkSession)

@fuwhu
Copy link
Contributor Author

fuwhu commented Dec 31, 2019

@cloud-fan @maropu
Could you help review? Or I can close it if this change is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move it to a separated file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

def func(
    para1: T,
    para2: T...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the data source table have the same problem?

Copy link
Contributor Author

@fuwhu fuwhu Jan 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in PruneFileSourcePartitions, it also may lead to calculating size of large number of partitions through hdfs.
I will create a follow-up PR to refine it after this PR finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a common problem, let's leave it here and open a new PR to fix it completely later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, will create new PR for it. Already removed from this PR. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call SessionCatalog.listPartitionsByFilter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do it in DetermineTableStats?

Copy link
Contributor Author

@fuwhu fuwhu Jan 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DetermineTableStats is Analyzer rule, while the pruned partitions and the size of them must be calculated
after filter push-down optimizers executed. So we can not put this part in DetermineTableStats now.
But I will check whether the DetermineTableStats can be moved to optimization phase and put after PruneHiveTablePartitions. If any idea/suggestion, please share. thanks.

@SparkQA
Copy link

SparkQA commented Jan 4, 2020

Test build #116107 has finished for PR 26805 at commit a51e946.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fuwhu
Copy link
Contributor Author

fuwhu commented Jan 4, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 4, 2020

Test build #116111 has finished for PR 26805 at commit a51e946.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2020

Test build #116130 has finished for PR 26805 at commit d715a04.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2020

Test build #116141 has finished for PR 26805 at commit 4b8d39d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fuwhu fuwhu changed the title [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions [SPARK-15616][SQL][WIP] Add optimizer rule PruneHiveTablePartitions Jan 8, 2020
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to keep other stats after we prune partitions?

Copy link
Contributor Author

@fuwhu fuwhu Jan 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be inconsistent, eg. rowCount and sizeInBytes may be inconsistent after this rule.
So restored to creating new CatalogStatistics instance. But by doing so, some statistics may be lost which should not impact accuracy.

@fuwhu fuwhu changed the title [SPARK-15616][SQL][WIP] Add optimizer rule PruneHiveTablePartitions [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions Jan 9, 2020
@fuwhu
Copy link
Contributor Author

fuwhu commented Jan 9, 2020

Did some refining, @cloud-fan please help review again. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it have to be an external table?

Copy link
Contributor Author

@fuwhu fuwhu Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually not necessary, already changed to managed table. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking the accurate file size can be flaky. Can we just check that the first size is more than 3 times larger than the second size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will change it.

fuwhu added 14 commits January 21, 2020 10:27
…ions based on filters on partition columns.

Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy.
…firstly

if HIVE_METASTORE_PARTITION_PRUNING enabled, and then prune again using partition filters.
… any more

since HiveExternalCatalog.listPartitionsByFilter can already return exactly what we want.
@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117147 has finished for PR 26805 at commit ce20439.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few code style issues.

0L
}
}
if (sizeOfPartitions.forall(s => s>0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit forall(_ > 0)


for (part <- Seq(1, 2, 3, 4)) {
sql(s"""
|INSERT OVERWRITE TABLE test PARTITION (p='$part')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use two-space indentation like PruneFileSourcePartitionsSuite.

|INSERT OVERWRITE TABLE test PARTITION (p='$part')
|select col from temp""".stripMargin)
}
val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: p > 0

|select col from temp""".stripMargin)
}
val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed
val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed
val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed
assert(Optimize.execute(analyzed1).stats.sizeInBytes/4 ===
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the code style, thanks a lot. :)

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117166 has finished for PR 26805 at commit b1798d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in cfb1706 Jan 21, 2020
@fuwhu
Copy link
Contributor Author

fuwhu commented Jan 22, 2020

Thank you all for review and help.

@fuwhu fuwhu deleted the SPARK-15616 branch January 22, 2020 02:31
@fuwhu fuwhu restored the SPARK-15616 branch January 22, 2020 02:31
@fuwhu fuwhu deleted the SPARK-15616 branch January 22, 2020 02:31
import org.apache.spark.sql.internal.SQLConf

/**
* TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fuwhu We need a description about the rule. Could you submit a follow-up PR to add the descriptions to both PruneHiveTablePartitions and PruneFileSourcePartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, so you mean just add class description in PruneHiveTablePartitions.scala and PruneFileSourcePartitions.scala file ? Or need to add comment in some doc ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classdoc is good enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile @cloud-fan classdoc added in #27535 , please help review, thanks.

fuwhu added a commit to fuwhu/spark that referenced this pull request Oct 26, 2020
Add optimizer rule PruneHiveTablePartitions pruning hive table partitions based on filters on partition columns.
Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy.

In JoinSelection strategy, spark use the "plan.stats.sizeInBytes" to decide whether the plan is suitable for broadcast join.
Currently, "plan.stats.sizeInBytes" does not take "pruned partitions" into account, so it may miss some broadcast join and take sort-merge join instead, which will definitely impact join performance.
This PR aim at taking "pruned partitions" into account for hive table in "plan.stats.sizeInBytes" and then improve performance by using broadcast join if possible.

no

Added unit tests.

This is based on apache#25919, credits should go to lianhuiwang and advancedxy.

Closes apache#26805 from fuwhu/SPARK-15616.

Authored-by: fuwhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants