-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27592][SQL] Set the bucketed data source table SerDe correctly #24486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #104987 has finished for PR 24486 at commit
|
|
retest this please |
|
Test build #104994 has finished for PR 24486 at commit
|
|
Test build #105125 has finished for PR 24486 at commit
|
|
retest this please |
|
Test build #105126 has finished for PR 24486 at commit
|
| // our bucketing is un-compatible with hive(different hash function). | ||
| // but downstream(Hive/Presto) still can read it as not bucketed table. | ||
| // We set the SerDe correctly and bucketing_version to spark. | ||
| // The downstream decides how to read it by themselves, a similar implementation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the impact if the downstream makes a wrong decision?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. It's not bucketed table at Hive side. Related code:
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
Lines 444 to 459 in f9776e3
| if (bucketSpec.isDefined) { | |
| val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get | |
| properties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) | |
| properties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) | |
| bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => | |
| properties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) | |
| } | |
| if (sortColumnNames.nonEmpty) { | |
| properties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString) | |
| sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => | |
| properties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol) | |
| } | |
| } | |
| } |
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
Lines 976 to 990 in 33f3c48
| table.bucketSpec match { | |
| case Some(bucketSpec) if DDLUtils.isHiveTable(table) => | |
| hiveTable.setNumBuckets(bucketSpec.numBuckets) | |
| hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava) | |
| if (bucketSpec.sortColumnNames.nonEmpty) { | |
| hiveTable.setSortCols( | |
| bucketSpec.sortColumnNames | |
| .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC)) | |
| .toList | |
| .asJava | |
| ) | |
| } | |
| case _ => | |
| } |
|
Test build #105253 has finished for PR 24486 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you manually tested it? To read the Spark bucketed table in Hive side as non-bucketed table?
Yes. I have tested it. Note that we should set |
| (None, message) | ||
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + | ||
| "But Hive can read it as not bucketed table." | ||
| (Some(newHiveCompatibleMetastoreTable(serde)), message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set bucketSpec = None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary:
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
Lines 1009 to 1023 in fee695d
| table.bucketSpec match { | |
| case Some(bucketSpec) if DDLUtils.isHiveTable(table) => | |
| hiveTable.setNumBuckets(bucketSpec.numBuckets) | |
| hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava) | |
| if (bucketSpec.sortColumnNames.nonEmpty) { | |
| hiveTable.setSortCols( | |
| bucketSpec.sortColumnNames | |
| .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC)) | |
| .toList | |
| .asJava | |
| ) | |
| } | |
| case _ => | |
| } |
| // It's not a bucketed table at Hive side | ||
| val client = | ||
| spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client | ||
| val hiveSide = client.runSqlHive("DESC FORMATTED t") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check the results of the read path.
|
Test build #105723 has finished for PR 24486 at commit
|
|
Test build #105725 has finished for PR 24486 at commit
|
|
ping @cloud-fan |
|
|
||
| } | ||
|
|
||
| test("Set the bucketed data source table SerDe correctly") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's include the jira id in test name.
|
LGTM |
|
Test build #109147 has finished for PR 24486 at commit
|
|
retest this please |
|
Test build #109149 has finished for PR 24486 at commit
|
|
thanks, merging to master! |
| |CLUSTERED BY (c1) | ||
| |SORTED BY (c1) | ||
| |INTO 2 BUCKETS | ||
| |AS SELECT 1 AS c1, 2 AS c2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one row is hard to prove Hive can read it correctly. Could you improve the tests?
In addition, try to create a partitioned and bucked table and see whether they are readable by Hive.
You can create a separate test suite for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changes were proposed in this pull request?
Hive using incorrect InputFormat(
org.apache.hadoop.mapred.SequenceFileInputFormat) to read Spark's Parquet bucketed data source table.Spark side:
Hive side:
So it's non-bucketed table at Hive side. This pr set the
SerDecorrectly so Hive can read these tables.Related code:
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
Lines 976 to 990 in 33f3c48
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
Lines 444 to 459 in f9776e3
How was this patch tested?
unit tests