Skip to content

Commit a65ac4f

Browse files
committed
SPARK-20872: make ShuffleExchange.nodeName handle null coordinator
1 parent f72ad30 commit a65ac4f

1 file changed

Lines changed: 6 additions & 3 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ case class ShuffleExchange(
4040
child: SparkPlan,
4141
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
4242

43+
// NOTE: coordinator can be null after serialization/deserialization,
44+
// e.g. it can be null on the Executor side
45+
4346
override lazy val metrics = Map(
4447
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
4548

4649
override def nodeName: String = {
4750
val extraInfo = coordinator match {
4851
case Some(exchangeCoordinator) =>
4952
s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
50-
case None => ""
53+
case _ => ""
5154
}
5255

5356
val simpleNodeName = "Exchange"
@@ -70,7 +73,7 @@ case class ShuffleExchange(
7073
// the plan.
7174
coordinator match {
7275
case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
73-
case None =>
76+
case _ =>
7477
}
7578
}
7679

@@ -117,7 +120,7 @@ case class ShuffleExchange(
117120
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
118121
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
119122
shuffleRDD
120-
case None =>
123+
case _ =>
121124
val shuffleDependency = prepareShuffleDependency()
122125
preparePostShuffleRDD(shuffleDependency)
123126
}

0 commit comments

Comments
 (0)