-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15616] [SQL] CatalogRelation should fallback to HDFS size of partitions that are involved in Query for JoinSelection. #18193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #77711 has finished for PR 18193 at commit
|
| fsRelation.copy(location = prunedFileIndex)(sparkSession) | ||
| val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) | ||
| val withStats = logicalRelation.catalogTable.map(_.copy( | ||
| stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! I think this is a bug and worth a separated PR to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i have created SPARK-20986. thanks.
| customCheckRules | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, thanks.
|
Test build #77742 has finished for PR 18193 at commit
|
|
Test build #77745 has finished for PR 18193 at commit
|
|
Test build #77764 has finished for PR 18193 at commit
|
| session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
| case filter @ Filter(condition, relation: CatalogRelation) | ||
| if DDLUtils.isHiveTable(relation.tableMeta) && relation.isPartitioned => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's only for hive table? what about data source table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that PruneFileSourcePartitions can be for data source table now.
| } | ||
| } | ||
|
|
||
| case class DeterminePartitionedTableStats( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it kind of a PruneFileSourcePartitions rule for hive tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, DeterminePartitionedTableStats is kind of a PruneFileSourcePartitions rule for Hive tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then shall we give it a better name, like PruneHiveTablePartitions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i will rename it to PruneHiveTablePartitions.
| session.sessionState.conf.sessionLocalTimeZone) | ||
| val hiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) | ||
| val partitions = prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveTable)) | ||
| val sizeInBytes = try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we already have partition level statistics at hive metastore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we already have partition level statistics, But we cannot know total number of partition, so it cannot compute the statistics for pruned partitions.
| partitions.map { partition => | ||
| val fs: FileSystem = partition.getDataLocation.getFileSystem(hadoopConf) | ||
| fs.getContentSummary(partition.getDataLocation).getLength | ||
| }.sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are too many partitions, it will be very slow.
can you add a check that whether the sum is larger than threshold, if true then break.
|
The logic looks similar to |
|
ping @lianhuiwang |
|
Test build #80991 has finished for PR 18193 at commit
|
|
@cloud-fan PruneFileSourcePartitions is kind of a rule for datasource, But now we cannot make hive as a data source. |
|
Test build #80993 has finished for PR 18193 at commit
|
|
retest it please. |
|
Test build #80994 has finished for PR 18193 at commit
|
| session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
| case filter @ Filter(condition, relation: HiveTableRelation) | ||
| if DDLUtils.isHiveTable(relation.tableMeta) && relation.isPartitioned => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DDLUtils.isHiveTable(relation.tableMeta) is no longer needed
| } | ||
| } | ||
|
|
||
| case class PruneHiveTablePartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a todo that we should merge this rule with PruneFileSourcePartitions, after we completely make hive a data source.
| pruningPredicates, | ||
| session.sessionState.conf.sessionLocalTimeZone) | ||
| val hiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) | ||
| val partitions = prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveTable)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to do this? All we need is partition data location, and we can get it by CatalogTablePartition.storage.locationUri
|
@cloud-fan I have address your comments. Thanks. |
|
Test build #81134 has finished for PR 18193 at commit
|
| pruningPredicates, | ||
| session.sessionState.conf.sessionLocalTimeZone) | ||
| val sizeInBytes = try { | ||
| prunedPartitions.map { part => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should first check whether partition.parameters contains SetupConst.RAW_DATA_SIZE and SetupConst.TOTAL_SIZE) or not. If partition.parameters contains the size of the partition, use it instead of getConetSummary of hdfs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cenyuhai yes,Good idea. I will add it.Thanks.
|
Test build #81937 has finished for PR 18193 at commit
|
|
Test build #81939 has finished for PR 18193 at commit
|
| val sizeInBytes = try { | ||
| prunedPartitions.map { part => | ||
| val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) | ||
| val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should first use rawDataSize, because 1MB orc file is equal to 5MB textfile...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cenyuhai Yes,I think what you said is right.Thanks.
|
Test build #81968 has finished for PR 18193 at commit
|
|
Test build #88712 has finished for PR 18193 at commit
|
|
@cloud-fan @lianhuiwang @gatorsmile This fix is useful, is there any update on this? |
|
@advancedxy do you want to take over it? The PR looks good but we probably need to fix some tests. |
Ok, I will add it to my backlog. Will take a look at this while/once #18324 is resolved by me. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Currently if some partitions of a partitioned table are used in join operation we rely on Metastore returned size of table to calculate if we can convert the operation to Broadcast join.
if Filter can prune some partitions, Hive can prune partition before determining to use broadcast joins according to HDFS size of partitions that are involved in Query.So sparkSQL needs it that can improve join's performance for partitioned table.
How was this patch tested?
add unit tests.