diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3edba45ef89f..68c38fb6179f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -95,6 +95,13 @@ private[spark] class Executor( private[executor] val conf = env.conf + // SPARK-48131: Unify MDC key mdc.taskName and task_name in Spark 4.0 release. + private[executor] val taskNameMDCKey = if (conf.get(LEGACY_TASK_NAME_MDC_ENABLED)) { + "mdc.taskName" + } else { + LogKeys.TASK_NAME.name + } + // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword // so that tasks can exit quickly if they are interrupted while waiting on another task to // finish downloading dependencies. @@ -914,7 +921,7 @@ private[spark] class Executor( try { mdc.foreach { case (key, value) => MDC.put(key, value) } // avoid overriding the takName by the user - MDC.put(LogKeys.TASK_NAME.name, taskName) + MDC.put(taskNameMDCKey, taskName) } catch { case _: NoSuchFieldError => logInfo("MDC is not supported.") } @@ -923,7 +930,7 @@ private[spark] class Executor( private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { try { mdc.foreach { case (key, _) => MDC.remove(key) } - MDC.remove(LogKeys.TASK_NAME.name) + MDC.remove(taskNameMDCKey) } catch { case _: NoSuchFieldError => logInfo("MDC is not supported.") } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2e207422ae06..d11e9fdea0f3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -151,6 +151,16 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val LEGACY_TASK_NAME_MDC_ENABLED = + ConfigBuilder("spark.log.legacyTaskNameMdc.enabled") + .doc("When true, the MDC (Mapped Diagnostic Context) key `mdc.taskName` will be set in the " + + "log output, which is the behavior of Spark version 3.1 through Spark 3.5 releases. " + + "When false, the logging framework will use `task_name` as the MDC key, " + + "aligning it with the naming convention of newer MDC keys introduced in Spark 4.0 release.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val DRIVER_LOG_LOCAL_DIR = ConfigBuilder("spark.driver.log.localDir") .doc("Specifies a local directory to write driver logs and enable Driver Log UI Tab.") diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 597900630b3f..00a6690492ea 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -44,6 +44,8 @@ license: | - Since Spark 4.0, the default log4j output of `spark-submit` has shifted from plain text to JSON lines to enhance analyzability. To revert to plain text output, you can rename the file `conf/log4j2.properties.pattern-layout-template` as `conf/log4j2.properties`, or use a custom log4j configuration file. +- Since Spark 4.0, the MDC (Mapped Diagnostic Context) key for Spark task names in Spark logs has been changed from `mdc.taskName` to `task_name`. To use the key `mdc.taskName`, you can set `spark.log.legacyTaskNameMdc.enabled` to `true`. + - Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. ## Upgrading from Core 3.4 to 3.5