Skip to content

Commit 8399d77

Browse files
committed
address comments
1 parent 15cc872 commit 8399d77

5 files changed

Lines changed: 59 additions & 21 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ object SQLConf {
582582
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
583583

584584
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
585+
.internal()
585586
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
586587
"to produce the partition columns instead of table scans. It applies when all the columns " +
587588
"scanned are partition columns and the query has an aggregate operator that satisfies " +

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
7272
})
7373
}
7474
if (isAllDistinctAgg) {
75+
logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
76+
"Spark will scan partition-level metadata without scanning data files. " +
77+
"This could result in wrong results when with empty partition data."
78+
)
7579
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
7680
} else {
7781
a

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,7 +2422,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
24222422
Row(s"$expected") :: Nil)
24232423
}
24242424

2425-
ignore("SPARK-15752 optimize metadata only query for datasource table") {
2425+
test("SPARK-15752 optimize metadata only query for datasource table") {
24262426
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
24272427
withTable("srcpart_15752") {
24282428
val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
@@ -2968,12 +2968,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
29682968
}
29692969

29702970
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
2971-
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
2972-
withTable("t") {
2973-
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
2974-
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
2975-
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
2976-
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
2971+
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
2972+
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
2973+
withTable("t") {
2974+
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
2975+
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
2976+
if (enableOptimizeMetadataOnlyQuery) {
2977+
// The result is wrong if we enable the configuration.
2978+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
2979+
} else {
2980+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
2981+
}
2982+
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
2983+
}
2984+
2985+
withTempPath { path =>
2986+
val tabLocation = path.getCanonicalPath
2987+
val partLocation1 = tabLocation + "/p=3"
2988+
val partLocation2 = tabLocation + "/p=1"
2989+
// SPARK-23271 empty RDD when saved should write a metadata only file
2990+
val df = spark.emptyDataFrame.select(lit(1).as("col"))
2991+
df.write.parquet(partLocation1)
2992+
val df2 = spark.range(10).toDF("col")
2993+
df2.write.parquet(partLocation2)
2994+
val readDF = spark.read.parquet(tabLocation)
2995+
if (enableOptimizeMetadataOnlyQuery) {
2996+
// The result is wrong if we enable the configuration.
2997+
checkAnswer(readDF.selectExpr("max(p)"), Row(3))
2998+
} else {
2999+
checkAnswer(readDF.selectExpr("max(p)"), Row(1))
3000+
}
3001+
checkAnswer(readDF.selectExpr("max(col)"), Row(9))
3002+
}
29773003
}
29783004
}
29793005
}

sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
5858
}
5959

6060
private def testMetadataOnly(name: String, sqls: String*): Unit = {
61-
ignore(name) {
61+
test(name) {
6262
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
6363
sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
6464
}
@@ -69,7 +69,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
6969
}
7070

7171
private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
72-
ignore(name) {
72+
test(name) {
7373
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
7474
sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
7575
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
8686
assert(message.contains("Table or view not found"))
8787
}
8888

89-
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
90-
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
91-
withTable("t") {
92-
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
93-
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
94-
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
95-
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
96-
}
97-
}
98-
}
99-
10089
test("script") {
10190
assume(TestUtils.testCommandAvailable("/bin/bash"))
10291
assume(TestUtils.testCommandAvailable("echo | sed"))
@@ -1781,7 +1770,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
17811770
}
17821771
}
17831772

1784-
ignore("SPARK-15752 optimize metadata only query for hive table") {
1773+
test("SPARK-15752 optimize metadata only query for hive table") {
17851774
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
17861775
withTable("data_15752", "srcpart_15752", "srctext_15752") {
17871776
val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
@@ -2341,4 +2330,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
23412330
}
23422331
}
23432332

2333+
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
2334+
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
2335+
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
2336+
withTable("t") {
2337+
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
2338+
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
2339+
if (enableOptimizeMetadataOnlyQuery) {
2340+
// The result is wrong if we enable the configuration.
2341+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
2342+
} else {
2343+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
2344+
}
2345+
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
2346+
}
2347+
}
2348+
}
2349+
}
2350+
23442351
}

0 commit comments

Comments
 (0)