Skip to content

Commit 5e0ab9a

Browse files
committed
address comments
1 parent 92095b7 commit 5e0ab9a

2 files changed

Lines changed: 8 additions & 7 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.trees
1919

2020
import java.util.UUID
2121

22-
import scala.collection.{mutable, Map}
22+
import scala.collection.Map
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.commons.lang3.ClassUtils
@@ -88,18 +88,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
8888
* A mutable map for holding auxiliary information of this tree node. It will be carried over
8989
* when this node is copied via `makeCopy`, or transformed via `transformUp`/`transformDown`.
9090
*/
91-
private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
91+
private val tags = new java.util.concurrent.ConcurrentHashMap[TreeNodeTag[_], Any]()
9292

9393
protected def copyTagsFrom(other: BaseType): Unit = {
94-
tags ++= other.tags
94+
tags.putAll(other.tags)
9595
}
9696

9797
def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = {
98-
tags(tag) = value
98+
tags.put(tag, value)
9999
}
100100

101101
def getTagValue[T](tag: TreeNodeTag[T]): Option[T] = {
102-
tags.get(tag).map(_.asInstanceOf[T])
102+
Option(tags.get(tag)).map(_.asInstanceOf[T])
103103
}
104104

105105
/**
@@ -287,7 +287,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
287287
mapChildren(_.transformDown(rule))
288288
} else {
289289
// If the transform function replaces this node with a new one, carry over the tags.
290-
afterRule.tags ++= this.tags
290+
afterRule.copyTagsFrom(this)
291291
afterRule.mapChildren(_.transformDown(rule))
292292
}
293293
}
@@ -311,7 +311,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
311311
}
312312
}
313313
// If the transform function replaces this node with a new one, carry over the tags.
314-
newNode.tags ++= this.tags
314+
newNode.copyTagsFrom(this)
315315
newNode
316316
}
317317

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class QueryExecution(
6060

6161
lazy val analyzed: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.ANALYSIS) {
6262
SparkSession.setActiveSession(sparkSession)
63+
// We can't clone `logical` here, which will reset the `_analyzed` flag.
6364
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
6465
}
6566

0 commit comments

Comments
 (0)