Skip to content

Commit 91cd06b

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-8981][CORE][FOLLOW-UP] Clean up MDC properties after running a task
### What changes were proposed in this pull request? This PR is a followup of #26624. This PR cleans up MDC properties if the original value is empty. Besides, this PR adds a warning and ignore the value when the user tries to override the value of `taskName`. ### Why are the changes needed? Before this PR, running the following jobs: ``` sc.setLocalProperty("mdc.my", "ABC") sc.parallelize(1 to 100).count() sc.setLocalProperty("mdc.my", null) sc.parallelize(1 to 100).count() ``` there's still MDC value "ABC" in the log of the second count job even if we've unset the value. ### Does this PR introduce _any_ user-facing change? Yes, user will 1) no longer see the MDC values after unsetting the value; 2) see a warning if he/she tries to override the value of `taskName`. ### How was this patch tested? Tested Manaually. Closes #28756 from Ngone51/followup-8981. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 912d45d commit 91cd06b

1 file changed

Lines changed: 4 additions & 8 deletions

File tree

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,7 @@ private[spark] class Executor(
401401
}
402402

403403
override def run(): Unit = {
404-
405404
setMDCForTask(taskName, mdcProperties)
406-
407405
threadId = Thread.currentThread.getId
408406
Thread.currentThread.setName(threadName)
409407
val threadMXBean = ManagementFactory.getThreadMXBean
@@ -703,11 +701,11 @@ private[spark] class Executor(
703701
}
704702

705703
private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
704+
// make sure we run the task with the user-specified mdc properties only
705+
MDC.clear()
706+
mdc.foreach { case (key, value) => MDC.put(key, value) }
707+
// avoid overriding the takName by the user
706708
MDC.put("taskName", taskName)
707-
708-
mdc.foreach { case (key, value) =>
709-
MDC.put(key, value)
710-
}
711709
}
712710

713711
/**
@@ -750,9 +748,7 @@ private[spark] class Executor(
750748
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)
751749

752750
override def run(): Unit = {
753-
754751
setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)
755-
756752
val startTimeNs = System.nanoTime()
757753
def elapsedTimeNs = System.nanoTime() - startTimeNs
758754
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs

0 commit comments

Comments
 (0)