Skip to content

Commit 8c91ff9

Browse files
committed
requiredOrdering for InsertIntoHiveTable to only consider static partitions
1 parent 1008b2e commit 8c91ff9

1 file changed

Lines changed: 9 additions & 4 deletions

File tree

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,19 @@ case class InsertIntoHiveTable(
7171
ifPartitionNotExists: Boolean,
7272
outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
7373

74+
7475
/**
75-
* For partitioned tables, `requiredOrdering` is over partition columns of table
76+
* For partitioned tables, `requiredOrdering` is over static partition columns of table
7677
*/
7778
override def requiredOrdering: Seq[Seq[SortOrder]] = {
7879
if (table.partitionColumnNames.nonEmpty) {
79-
val partitionAttributes = table.partitionColumnNames.map { name =>
80-
query.resolve(name :: Nil, SparkSession.getActiveSession.get.sessionState.analyzer.resolver)
81-
.getOrElse {
80+
val numDynamicPartitions = partition.values.count(_.isEmpty)
81+
val partitionAttributes = table.partitionColumnNames.takeRight(numDynamicPartitions).map {
82+
name =>
83+
query.resolve(
84+
name :: Nil,
85+
SparkSession.getActiveSession.get.sessionState.analyzer.resolver
86+
).getOrElse {
8287
throw new AnalysisException(
8388
s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
8489
}.asInstanceOf[Attribute]

0 commit comments

Comments
 (0)