Skip to content

Commit 72bde20

Browse files
committed
Use has been resolved Attribute instead of create new one.
1 parent 5e4d738 commit 72bde20

8 files changed

Lines changed: 56 additions & 98 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ package object expressions {
177177
// Collect matching attributes given a name and a lookup.
178178
def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = {
179179
candidates.toSeq.flatMap(_.collect {
180-
case a if resolver(a.name, name) => a.withName(name)
180+
case a if resolver(a.name, name) => a
181181
})
182182
}
183183

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -326,33 +326,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
326326
case a => a
327327
}
328328

329-
private def removeSpecialRedundantAliases(
330-
plan: LogicalPlan,
331-
currentNextAttrPairs: mutable.Buffer[(Attribute, Attribute)],
332-
newNode: LogicalPlan,
333-
blacklist: AttributeSet): LogicalPlan = {
334-
// Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate
335-
// keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this
336-
// case we use the the first mapping (which should be provided by the first child).
337-
val mapping = AttributeMap(currentNextAttrPairs)
338-
339-
// Create a an expression cleaning function for nodes that can actually produce redundant
340-
// aliases, use identity otherwise.
341-
val clean: Expression => Expression = plan match {
342-
case _: Project => removeRedundantAlias(_, blacklist)
343-
case _: Aggregate => removeRedundantAlias(_, blacklist)
344-
case _: Window => removeRedundantAlias(_, blacklist)
345-
case _ => identity[Expression]
346-
}
347-
348-
// Transform the expressions.
349-
newNode.mapExpressions { expr =>
350-
clean(expr.transform {
351-
case a: Attribute => mapping.getOrElse(a, a)
352-
})
353-
}
354-
}
355-
356329
/**
357330
* Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to
358331
* prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self)
@@ -374,23 +347,12 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
374347
val newRight = removeRedundantAliases(right, blacklist ++ newLeft.outputSet)
375348
val mapping = AttributeMap(
376349
createAttributeMapping(left, newLeft) ++
377-
createAttributeMapping(right, newRight))
350+
createAttributeMapping(right, newRight))
378351
val newCondition = condition.map(_.transform {
379352
case a: Attribute => mapping.getOrElse(a, a)
380353
})
381354
Join(newLeft, newRight, joinType, newCondition)
382355

383-
case command: Command =>
384-
// Add child.outputSet to blacklist otherwise
385-
// the schema written in the file may not match the schema of the table.
386-
val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
387-
val newNode = command.mapChildren { child =>
388-
val newChild = removeRedundantAliases(child, blacklist ++ child.outputSet)
389-
currentNextAttrPairs ++= createAttributeMapping(child, newChild)
390-
newChild
391-
}
392-
removeSpecialRedundantAliases(plan, currentNextAttrPairs, newNode, blacklist)
393-
394356
case _ =>
395357
// Remove redundant aliases in the subtree(s).
396358
val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
@@ -399,7 +361,27 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
399361
currentNextAttrPairs ++= createAttributeMapping(child, newChild)
400362
newChild
401363
}
402-
removeSpecialRedundantAliases(plan, currentNextAttrPairs, newNode, blacklist)
364+
365+
// Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate
366+
// keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this
367+
// case we use the the first mapping (which should be provided by the first child).
368+
val mapping = AttributeMap(currentNextAttrPairs)
369+
370+
// Create a an expression cleaning function for nodes that can actually produce redundant
371+
// aliases, use identity otherwise.
372+
val clean: Expression => Expression = plan match {
373+
case _: Project => removeRedundantAlias(_, blacklist)
374+
case _: Aggregate => removeRedundantAlias(_, blacklist)
375+
case _: Window => removeRedundantAlias(_, blacklist)
376+
case _ => identity[Expression]
377+
}
378+
379+
// Transform the expressions.
380+
newNode.mapExpressions { expr =>
381+
clean(expr.transform {
382+
case a: Attribute => mapping.getOrElse(a, a)
383+
})
384+
}
403385
}
404386
}
405387

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
8787
val parsedPlan = AppendData.byName(table, query)
8888
val expectedPlan = AppendData.byName(table,
8989
Project(Seq(
90-
Alias(Cast(toLower(X), FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
90+
Alias(Cast(X, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
9191
Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
9292
query))
9393

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
2525
import org.apache.spark.sql.execution.SparkPlan
2626
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
2727
import org.apache.spark.sql.execution.datasources.FileFormatWriter
28-
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
28+
import org.apache.spark.sql.execution.metric.SQLMetric
2929
import org.apache.spark.util.SerializableConfiguration
3030

3131
/**

sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ struct<col1:int,col2:int,col3:int,sum_col2:bigint>
102102
-- !query 6
103103
SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 ASC NULLS FIRST, COL2
104104
-- !query 6 schema
105-
struct<COL1:int,COL2:int,COL3:int>
105+
struct<col1:int,col2:int,col3:int>
106106
-- !query 6 output
107107
6 10 NULL
108108
6 13 NULL
@@ -118,7 +118,7 @@ struct<COL1:int,COL2:int,COL3:int>
118118
-- !query 7
119119
SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 NULLS LAST, COL2
120120
-- !query 7 schema
121-
struct<COL1:int,COL2:int,COL3:int>
121+
struct<col1:int,col2:int,col3:int>
122122
-- !query 7 output
123123
6 7 4
124124
6 11 4
@@ -134,7 +134,7 @@ struct<COL1:int,COL2:int,COL3:int>
134134
-- !query 8
135135
SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS FIRST, COL2
136136
-- !query 8 schema
137-
struct<COL1:int,COL2:int,COL3:int>
137+
struct<col1:int,col2:int,col3:int>
138138
-- !query 8 output
139139
6 10 NULL
140140
6 13 NULL
@@ -150,7 +150,7 @@ struct<COL1:int,COL2:int,COL3:int>
150150
-- !query 9
151151
SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS LAST, COL2
152152
-- !query 9 schema
153-
struct<COL1:int,COL2:int,COL3:int>
153+
struct<col1:int,col2:int,col3:int>
154154
-- !query 9 output
155155
6 9 10
156156
6 12 10

sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ struct<>
183183
-- !query 20
184184
SELECT p.`(KEY)?+.+`, b, testdata2.`(b)?+.+` FROM testData p join testData2 ON p.key = testData2.a WHERE key < 3
185185
-- !query 20 schema
186-
struct<value1:string,value2:string,b:int,A:int,c:int,d:int>
186+
struct<value1:string,value2:string,B:int,A:int,c:int,d:int>
187187
-- !query 20 output
188188
1 11 1 1 1 2
189189
1 11 2 1 1 2
@@ -194,7 +194,7 @@ struct<value1:string,value2:string,b:int,A:int,c:int,d:int>
194194
-- !query 21
195195
SELECT p.`(key)?+.+`, b, testdata2.`(b)?+.+` FROM testData p join testData2 ON p.key = testData2.a WHERE key < 3
196196
-- !query 21 schema
197-
struct<value1:string,value2:string,b:int,A:int,c:int,d:int>
197+
struct<value1:string,value2:string,B:int,A:int,c:int,d:int>
198198
-- !query 21 output
199199
1 11 1 1 1 2
200200
1 11 2 1 1 2

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2853,55 +2853,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
28532853
}
28542854
}
28552855

2856-
test("SPARK-25135: insert table may all null when select from view") {
2857-
withTempDir { dir =>
2858-
val path = dir.getCanonicalPath
2859-
val cnt = 30
2860-
spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id as bigint) as col2")
2861-
.write.mode(SaveMode.Overwrite).parquet(path)
2862-
withTable("table1", "table2", "table3", "table4") {
2863-
spark.sql(s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$path'")
2864-
2865-
withView("view1", "view2") {
2866-
spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20")
2867-
2868-
spark.sql("CREATE TABLE table2 (COL1 BIGINT, COL2 BIGINT) using parquet")
2869-
spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view1")
2870-
assert(spark.table("table2").count() === cnt)
2871-
checkAnswer(spark.table("table1"), spark.table("table2"))
2872-
2873-
spark.sql("CREATE TABLE table3 (COL1 BIGINT) using parquet")
2874-
spark.sql("INSERT OVERWRITE TABLE table3 select COL1 from view1")
2875-
assert(spark.table("table3").count() === cnt)
2876-
checkAnswer(spark.table("table1").select("COL1"), spark.table("table3"))
2877-
2878-
spark.sql("CREATE TABLE table4 (COL1 BIGINT, COL2 BIGINT, COL3 BIGINT) using parquet")
2879-
spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL1, COL2 from view1")
2880-
assert(spark.table("table4").count() === cnt)
2881-
checkAnswer(spark.table("table1").select("col1", "col1", "col2"), spark.table("table4"))
2882-
2883-
spark.sql("INSERT OVERWRITE TABLE table4 select 1, COL1, COL2 from view1")
2884-
assert(spark.table("table4").count() === cnt)
2885-
checkAnswer(spark.table("table1").selectExpr("1", "col1", "col2"), spark.table("table4"))
2886-
2887-
assertThrows[AnalysisException] {
2888-
spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL3, COL2 from view1")
2889-
}
2890-
2891-
spark.sql("CREATE TEMP VIEW view2 as select col1, 1 as col2 from view1")
2892-
2893-
spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view2")
2894-
assert(spark.table("table2").count() === cnt)
2895-
checkAnswer(spark.table("table1").selectExpr("col1", "1"), spark.table("table2"))
2896-
2897-
spark.sql("INSERT OVERWRITE TABLE table2 select col1, COL2 from view2")
2898-
assert(spark.table("table2").count() === cnt)
2899-
checkAnswer(spark.table("table1").selectExpr("col1", "1"), spark.table("table2"))
2900-
}
2901-
}
2902-
}
2903-
}
2904-
29052856
test("SPARK-25144 'distinct' causes memory leak") {
29062857
val ds = List(Foo(Some("bar"))).toDS
29072858
val result = ds.flatMap(_.bar).distinct

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,31 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
891891
}
892892
}
893893
}
894+
895+
test("SPARK-25135: insert parquet table may all null when select from view") {
896+
withTempDir { dir =>
897+
val path = dir.getCanonicalPath
898+
val cnt = 30
899+
val table1Path = s"$path/table1"
900+
val table2Path = s"$path/table2"
901+
spark.range(cnt).selectExpr("cast(id as bigint) as col1")
902+
.write.mode(SaveMode.Overwrite).parquet(table1Path)
903+
withTable("table1", "table2") {
904+
spark.sql(s"CREATE TABLE table1(col1 bigint) using parquet location '$table1Path/'")
905+
spark.sql(s"CREATE TABLE table2(COL1 bigint) using parquet location '$table2Path/'")
906+
907+
withView("view1") {
908+
spark.sql("CREATE VIEW view1 as select col1 from table1 where col1 > -20")
909+
spark.sql("INSERT OVERWRITE TABLE table2 select COL1 from view1")
910+
assert(spark.table("table2").count() === cnt)
911+
spark.read.parquet(table2Path).schema.zip(
912+
spark.table("table2").schema).foreach { case (actual, table) =>
913+
assert(actual.name.equals(table.name))
914+
}
915+
}
916+
}
917+
}
918+
}
894919
}
895920

896921
object TestingUDT {

0 commit comments

Comments
 (0)