Skip to content

Commit 13bd3ac

Browse files
committed
[SPARK-15743][SQL] Prevent saving with all-column partitioning
1 parent ca70ab2 commit 13bd3ac

File tree

5 files changed

+61
-21
lines changed

5 files changed

+61
-21
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
8-
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10-
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
16-
*/
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
1717

1818
package org.apache.spark.sql.execution.datasources
1919

@@ -432,7 +432,7 @@ case class DataSource(
432432
}
433433

434434
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
435-
PartitioningUtils.validatePartitionColumnDataTypes(
435+
PartitioningUtils.validatePartitionColumnDataTypesAndCount(
436436
data.schema, partitionColumns, caseSensitive)
437437

438438
// If we are appending to a table that already exists, make sure the partitioning matches

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ private[sql] object PartitioningUtils {
339339
private val upCastingOrder: Seq[DataType] =
340340
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
341341

342-
def validatePartitionColumnDataTypes(
342+
def validatePartitionColumnDataTypesAndCount(
343343
schema: StructType,
344344
partitionColumns: Seq[String],
345345
caseSensitive: Boolean): Unit = {
@@ -350,6 +350,10 @@ private[sql] object PartitioningUtils {
350350
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
351351
}
352352
}
353+
354+
if (partitionColumns.size == schema.fields.size) {
355+
throw new AnalysisException(s"Cannot use all columns for partition columns")
356+
}
353357
}
354358

355359
def partitionColumnsSchema(
@@ -359,7 +363,7 @@ private[sql] object PartitioningUtils {
359363
val equality = columnNameEquality(caseSensitive)
360364
StructType(partitionColumns.map { col =>
361365
schema.find(f => equality(f.name, col)).getOrElse {
362-
throw new RuntimeException(s"Partition column $col not found in schema $schema")
366+
throw new AnalysisException(s"Partition column $col not found in schema $schema")
363367
}
364368
}).asNullable
365369
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
154154
// OK
155155
}
156156

157-
PartitioningUtils.validatePartitionColumnDataTypes(
157+
PartitioningUtils.validatePartitionColumnDataTypesAndCount(
158158
r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
159159

160160
// Get all input data source relations of the query.
@@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
205205
// OK
206206
}
207207

208-
PartitioningUtils.validatePartitionColumnDataTypes(
208+
PartitioningUtils.validatePartitionColumnDataTypesAndCount(
209209
c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
210210

211211
for {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class FileStreamSinkWriter(
9191
hadoopConf: Configuration,
9292
options: Map[String, String]) extends Serializable with Logging {
9393

94-
PartitioningUtils.validatePartitionColumnDataTypes(
94+
PartitioningUtils.validatePartitionColumnDataTypesAndCount(
9595
data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
9696

9797
private val serializableConf = new SerializableConfiguration(hadoopConf)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.test.SharedSQLContext
22+
23+
class PartitioningUtilsSuite extends SharedSQLContext {
24+
25+
test("prevent all column partitioning") {
26+
withTempDir { dir =>
27+
val path = dir.getCanonicalPath
28+
intercept[AnalysisException] {
29+
spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
30+
}
31+
intercept[AnalysisException] {
32+
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
33+
}
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)