-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17729] [SQL] Enable creating hive bucketed tables #17644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17729] [SQL] Enable creating hive bucketed tables #17644
Conversation
|
Test build #75829 has finished for PR 17644 at commit
|
|
Test build #75838 has finished for PR 17644 at commit
|
|
cc @cloud-fan @hvanhovell @sameeragarwal for review |
|
I'll review it after branch 2.2 is cut |
|
Test build #76298 has started for PR 17644 at commit |
|
Jenkins test this please |
|
Test build #76312 has finished for PR 17644 at commit
|
|
@cloud-fan : ping !! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to do this, according to the document of ExternalCatalog.alterTableSchema, the caller will guarantee that the new schema still contains partition columns and bucket columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to do this because one of the unit tests was failing :
spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
Line 246 in 703c42c
| test("alter table schema") { |
In that test case, the table has a bucketing and alter removes the bucketing column. The way I interpret things: if the alter command is not removing the bucketing / sorting columns, we should keep the original bucketing spec. If any one of the bucketing / sorting columns are removed in alter, then the bucketing spec needs to be reset to empty (else we might refer to dangling columns).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does hive allow this? i.e. alter schema without bucket/sort columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately YES. Below I alter a bucketed table to remove the bucketing column from its schema. Modified table still points to old bucket spec. This is bad.
hive> desc formatted bucketed_partitioned_1;
# col_name data_type comment
user_id bigint
name string
# Storage Information
....
Num Buckets: 8
Bucket Columns: [user_id]
Sort Columns: [Order(col:user_id, order:1)]
hive> ALTER TABLE bucketed_partitioned_1 REPLACE COLUMNS ( eid INT, name STRING);
OK
hive> desc formatted bucketed_partitioned_1;
# col_name data_type comment
eid int
name string
# Storage Information
.....
Num Buckets: 8
Bucket Columns: [user_id]
Sort Columns: [Order(col:user_id, order:1)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean hive has a wrong behavior about this? Maybe we can diverge from hive and disallow altering schema without bucket/sort columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah hive behavior is absurd. If the alter table statement removes bucketing columns, we should either disallow such operation (seems harsh) OR remove the bucketing spec (which is what I am doing in this PR).
... disallow altering schema without bucket/sort columns.
I am not sure if I understand this right. Does the current approach in this PR match this or you are proposing something else ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's disallow removing bucketing columns in SessionCatalog.alterTableSchema, to make the logic simplier, we can change the behavior later if we wanna support this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR with this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to do so? I think this PR only covers the write path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think this could cause problems (you can correct me if I am missing anything). Even if bucketing spec is set over here, this PR does not make use of that information in scan operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we are reading hive table as a data source table, so the scan operator is actually FileSourceScanExec, which recognizes bucketing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I have corrected this. Thanks for pointing this out !!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to check this? Doesn't hive metastore already guarantees this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had added this for sanity. As you mentioned, hive metastore already guarantees this. I will remove this.
|
A high-level question: if we propagate the bucketing information of a hive table, can we correctly read this table with Spark SQL? Shall we have a config to change Spark's shuffle hash function to use hive hash? |
|
@cloud-fan : Thanks for reviewing !!
With this PR,
Yeah. We need to add that but I will do that in a separate PR. I had a PR from last year but given that there are many changes in SQL land, I couldn't rebase it. Am working on creating a new PR instead after studying the new changes since last year. |
e3c41bf to
6e6e767
Compare
|
Test build #76462 has finished for PR 17644 at commit
|
|
Test build #76460 has finished for PR 17644 at commit
|
|
Jenkins test this please |
|
Test build #76506 has finished for PR 17644 at commit
|
|
Jenkins test this please |
|
Test build #76515 has started for PR 17644 at commit |
|
Jenkins test this please |
|
Test build #76529 has finished for PR 17644 at commit
|
6e6e767 to
ab12943
Compare
|
Test build #76588 has finished for PR 17644 at commit
|
ab12943 to
7a0be86
Compare
|
Jenkins test this please |
|
Test build #76595 has finished for PR 17644 at commit
|
7a0be86 to
49040e8
Compare
239beee to
10a9fd0
Compare
|
Test build #76882 has finished for PR 17644 at commit
|
10a9fd0 to
d6ce8b5
Compare
…umns is not allowed
d6ce8b5 to
0aa8539
Compare
|
Test build #76884 has finished for PR 17644 at commit
|
|
Test build #76887 has finished for PR 17644 at commit
|
|
|
||
| package org.apache.spark.sql.catalyst.catalog | ||
|
|
||
| import org.apache.spark.sql.AnalysisException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove these unnecessary changes in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| // We can not populate bucketing information for Hive tables as Spark SQL has a different | ||
| // implementation of hash function from Hive. | ||
| bucketSpec = None, | ||
| bucketSpec = bucketSpec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment to say that, for data source tables, we will always overwrite the bucket spec in HiveExternalCatalog with the bucketing information in table properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| throw new AnalysisException(message) | ||
| } else { | ||
| logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " + | ||
| s"$enforceSortingConfig are set to false.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we remove the bucket properties of the table in this case? what does hive do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- In Hive 1.x:
- If enforcing configs are set: data is populated respecting the tables' bucketing spec
- If enforcing configs are NOT set: data is populated and not conformant with the tables' bucketing spec
- In Hive 2.x, these configs are not there and hive would always populate data conformant with table's bucketing.
With Spark, currently the data would be written out in a non conformant way despite of that config being set or not. This PR will go to the model of Hive 1.x. I am working on a next PR which would make things like Hive 2.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so after insertion(if not enforcing), the table is still a buckted table but read it will cause wrong result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hive: It would lead to wrong result.
In spark (over master and also after this PR): the table scan operation does not take bucketing into account so it would be read as a regular table. So, it won't be read "wrong", its just that we wont take advantage of bucketing.
efc8c8b to
bf306da
Compare
|
Test build #76909 has finished for PR 17644 at commit
|
|
Test build #76910 has finished for PR 17644 at commit
|
| // We can not populate bucketing information for Hive tables as Spark SQL has a different | ||
| // implementation of hash function from Hive. | ||
| bucketSpec = None, | ||
| // For data source tables, we will always overwrite the bucket spec in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I made a mistake. This is true even for hive tables.
If the table is written by Spark, we will put bucketing information in table properties, and will always overwrite the bucket spec in hive metastore by the bucketing information in table properties. This means, if we have bucket spec in both hive metastore and table properties, we will trust the one in table properties.
let's update this document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| } | ||
|
|
||
| table.bucketSpec match { | ||
| case Some(bucketSpec) if DDLUtils.isHiveTable(table) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking it more, I think this one is less important.
Our main goal is to allow spark to read bucketed tables written by hive, but not to allow hive to read bucketed tables written by spark.
How about we remove it for now and add it later after more discussion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but not to allow hive to read bucketed tables written by spark.
If Hive is NOT able to read the datasource tables which are bucketed, thats OK as its not compatible with hive. But for hive native tables, the interoperability amongst spark and hive is what I want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok makes sense
|
LGTM except 2 comments |
|
Test build #76941 has finished for PR 17644 at commit
|
|
thanks, merging to master! |
## What changes were proposed in this pull request? Hive allows inserting data to bucketed table without guaranteeing bucketed and sorted-ness based on these two configs : `hive.enforce.bucketing` and `hive.enforce.sorting`. What does this PR achieve ? - Spark will disallow users from writing outputs to hive bucketed tables by default (given that output won't adhere with Hive's semantics). - IF user still wants to write to hive bucketed table, the only resort is to use `hive.enforce.bucketing=false` and `hive.enforce.sorting=false` which means user does NOT care about bucketing guarantees. Changes done in this PR: - Extract table's bucketing information in `HiveClientImpl` - While writing table info to metastore, `HiveClientImpl` now populates the bucketing information in the hive `Table` object - `InsertIntoHiveTable` allows inserts to bucketed table only if both `hive.enforce.bucketing` and `hive.enforce.sorting` are `false` Ability to create bucketed tables will enable adding test cases to Spark while I add more changes related to hive bucketing support. Design doc for hive hive bucketing support : https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit# ## How was this patch tested? - Added test for creating bucketed and sorted table. - Added test to ensure that INSERTs fail if strict bucket / sort is enforced - Added test to ensure that INSERTs can go through if strict bucket / sort is NOT enforced - Added test to validate that bucketing information shows up in output of DESC FORMATTED - Added test to ensure that `SHOW CREATE TABLE` works for hive bucketed tables Author: Tejas Patil <[email protected]> Closes apache#17644 from tejasapatil/SPARK-17729_create_bucketed_table.
## What changes were proposed in this pull request? Hive allows inserting data to bucketed table without guaranteeing bucketed and sorted-ness based on these two configs : `hive.enforce.bucketing` and `hive.enforce.sorting`. What does this PR achieve ? - Spark will disallow users from writing outputs to hive bucketed tables by default (given that output won't adhere with Hive's semantics). - IF user still wants to write to hive bucketed table, the only resort is to use `hive.enforce.bucketing=false` and `hive.enforce.sorting=false` which means user does NOT care about bucketing guarantees. Changes done in this PR: - Extract table's bucketing information in `HiveClientImpl` - While writing table info to metastore, `HiveClientImpl` now populates the bucketing information in the hive `Table` object - `InsertIntoHiveTable` allows inserts to bucketed table only if both `hive.enforce.bucketing` and `hive.enforce.sorting` are `false` Ability to create bucketed tables will enable adding test cases to Spark while I add more changes related to hive bucketing support. Design doc for hive hive bucketing support : https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit# ## How was this patch tested? - Added test for creating bucketed and sorted table. - Added test to ensure that INSERTs fail if strict bucket / sort is enforced - Added test to ensure that INSERTs can go through if strict bucket / sort is NOT enforced - Added test to validate that bucketing information shows up in output of DESC FORMATTED - Added test to ensure that `SHOW CREATE TABLE` works for hive bucketed tables Author: Tejas Patil <[email protected]> Closes apache#17644 from tejasapatil/SPARK-17729_create_bucketed_table.
| val tbl1 = catalog.getTable("db2", "tbl1") | ||
| val newSchema = StructType(Seq( | ||
| StructField("new_field_1", IntegerType), | ||
| StructField("col1", IntegerType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tejasapatil do you still remember why we update it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. This was done because before this PR the test case was removing bucketed columns in the alter operation. We decided to disallow removing bucketing columns and support this if needed in future.
Here is the discussion that we both had about this : #17644 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh. If I revert the change to this test case, it does not fail anymore. This is bad because the table properties still say that its bucketed over col1 but col1 is not in the modified table schema. I am taking a look to see what changed.
What changes were proposed in this pull request?
Hive allows inserting data to bucketed table without guaranteeing bucketed and sorted-ness based on these two configs :
hive.enforce.bucketingandhive.enforce.sorting.What does this PR achieve ?
hive.enforce.bucketing=falseandhive.enforce.sorting=falsewhich means user does NOT care about bucketing guarantees.Changes done in this PR:
HiveClientImplHiveClientImplnow populates the bucketing information in the hiveTableobjectInsertIntoHiveTableallows inserts to bucketed table only if bothhive.enforce.bucketingandhive.enforce.sortingarefalseAbility to create bucketed tables will enable adding test cases to Spark while I add more changes related to hive bucketing support. Design doc for hive hive bucketing support : https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit#
How was this patch tested?
SHOW CREATE TABLEworks for hive bucketed tables