Skip to content

Commit bec0d0e

Browse files
lianchengpwendell
authored andcommitted
[SPARK-3007][SQL] Adds dynamic partitioning support
PR #2226 was reverted because it broke Jenkins builds for unknown reason. This debugging PR aims to fix the Jenkins build. This PR also fixes two bugs: 1. Compression configurations in `InsertIntoHiveTable` are disabled by mistake The `FileSinkDesc` object passed to the writer container doesn't have compression related configurations. These configurations are not taken care of until `saveAsHiveFile` is called. This PR moves compression code forward, right after instantiation of the `FileSinkDesc` object. 1. `PreInsertionCasts` doesn't take table partitions into account In `castChildOutput`, `table.attributes` only contains non-partition columns, thus for partitioned table `childOutputDataTypes` never equals to `tableOutputDataTypes`. This results funny analyzed plan like this: ``` == Analyzed Logical Plan == InsertIntoTable Map(partcol1 -> None, partcol2 -> None), false MetastoreRelation default, dynamic_part_table, None Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] ... (repeats 99 times) ... Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] Project [1 AS c_0#1164,1 AS c_1#1165,1 AS c_2#1166] Filter (key#1170 = 150) MetastoreRelation default, src, None ``` Awful though this logical plan looks, it's harmless because all projects will be eliminated by optimizer. Guess that's why this issue hasn't been caught before. Author: Cheng Lian <[email protected]> Author: baishuo(白硕) <[email protected]> Author: baishuo <[email protected]> Closes #2616 from liancheng/dp-fix and squashes the following commits: 21935b6 [Cheng Lian] Adds back deleted trailing space f471c4b [Cheng Lian] PreInsertionCasts should take table partitions into account a132c80 [Cheng Lian] Fixes output compression 9c6eb2d [Cheng Lian] Adds tests to verify dynamic partitioning folder layout 0eed349 [Cheng Lian] Addresses @yhuai's comments 26632c3 [Cheng Lian] Adds more tests 9227181 [Cheng Lian] Minor refactoring c47470e [Cheng Lian] Refactors InsertIntoHiveTable to a Command 6fb16d7 [Cheng Lian] Fixes typo in test name, regenerated golden answer files d53daa5 [Cheng Lian] Refactors dynamic partitioning support b821611 [baishuo] pass check style 997c990 [baishuo] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name 761ecf2 [baishuo] modify according micheal's advice 207c6ac [baishuo] modify for some bad indentation caea6fb [baishuo] modify code to pass scala style checks b660e74 [baishuo] delete a empty else branch cd822f0 [baishuo] do a little modify 8e7268c [baishuo] update file after test 3f91665 [baishuo(白硕)] Update Cast.scala 8ad173c [baishuo(白硕)] Update InsertIntoHiveTable.scala 051ba91 [baishuo(白硕)] Update Cast.scala d452eb3 [baishuo(白硕)] Update HiveQuerySuite.scala 37c603b [baishuo(白硕)] Update InsertIntoHiveTable.scala 98cfb1f [baishuo(白硕)] Update HiveCompatibilitySuite.scala 6af73f4 [baishuo(白硕)] Update InsertIntoHiveTable.scala adf02f1 [baishuo(白硕)] Update InsertIntoHiveTable.scala 1867e23 [baishuo(白硕)] Update SparkHadoopWriter.scala 6bb5880 [baishuo(白硕)] Update HiveQl.scala
1 parent fbe8e98 commit bec0d0e

15 files changed

+450
-306
lines changed

sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,23 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
220220
*/
221221
override def whiteList = Seq(
222222
"add_part_exist",
223+
"dynamic_partition_skip_default",
224+
"infer_bucket_sort_dyn_part",
225+
"load_dyn_part1",
226+
"load_dyn_part2",
227+
"load_dyn_part3",
228+
"load_dyn_part4",
229+
"load_dyn_part5",
230+
"load_dyn_part6",
231+
"load_dyn_part7",
232+
"load_dyn_part8",
233+
"load_dyn_part9",
234+
"load_dyn_part10",
235+
"load_dyn_part11",
236+
"load_dyn_part12",
237+
"load_dyn_part13",
238+
"load_dyn_part14",
239+
"load_dyn_part14_win",
223240
"add_part_multiple",
224241
"add_partition_no_whitelist",
225242
"add_partition_with_whitelist",

sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 0 additions & 195 deletions
This file was deleted.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
144144
val childOutputDataTypes = child.output.map(_.dataType)
145145
// Only check attributes, not partitionKeys since they are always strings.
146146
// TODO: Fully support inserting into partitioned tables.
147-
val tableOutputDataTypes = table.attributes.map(_.dataType)
147+
val tableOutputDataTypes =
148+
table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType)
148149

149150
if (childOutputDataTypes == tableOutputDataTypes) {
150151
p

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -806,11 +806,6 @@ private[hive] object HiveQl {
806806
cleanIdentifier(key.toLowerCase) -> None
807807
}.toMap).getOrElse(Map.empty)
808808

809-
if (partitionKeys.values.exists(p => p.isEmpty)) {
810-
throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
811-
s"dynamic partitioning.")
812-
}
813-
814809
InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
815810

816811
case a: ASTNode =>

0 commit comments

Comments
 (0)