Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ private[spark] class Executor(

private[executor] val conf = env.conf

// SPARK-48131: Unify MDC key mdc.taskName and task_name in Spark 4.0 release.
private[executor] val taskNameMDCKey = if (conf.get(LEGACY_TASK_NAME_MDC_ENABLED)) {
"mdc.taskName"
} else {
LogKeys.TASK_NAME.name
}

// SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword
// so that tasks can exit quickly if they are interrupted while waiting on another task to
// finish downloading dependencies.
Expand Down Expand Up @@ -914,7 +921,7 @@ private[spark] class Executor(
try {
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put(LogKeys.TASK_NAME.name, taskName)
MDC.put(taskNameMDCKey, taskName)
} catch {
case _: NoSuchFieldError => logInfo("MDC is not supported.")
}
Expand All @@ -923,7 +930,7 @@ private[spark] class Executor(
private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
try {
mdc.foreach { case (key, _) => MDC.remove(key) }
MDC.remove(LogKeys.TASK_NAME.name)
MDC.remove(taskNameMDCKey)
} catch {
case _: NoSuchFieldError => logInfo("MDC is not supported.")
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ package object config {
.booleanConf
.createWithDefault(true)

private[spark] val LEGACY_TASK_NAME_MDC_ENABLED =
ConfigBuilder("spark.log.legacyTaskNameMdc.enabled")
.doc("When true, the MDC (Mapped Diagnostic Context) key `mdc.taskName` will be set in the " +
"log output, which is the behavior of Spark version 3.1 through Spark 3.5 releases. " +
"When false, the logging framework will use `task_name` as the MDC key, " +
"aligning it with the naming convention of newer MDC keys introduced in Spark 4.0 release.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val DRIVER_LOG_LOCAL_DIR =
ConfigBuilder("spark.driver.log.localDir")
.doc("Specifies a local directory to write driver logs and enable Driver Log UI Tab.")
Expand Down
2 changes: 2 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ license: |

- Since Spark 4.0, the default log4j output of `spark-submit` has shifted from plain text to JSON lines to enhance analyzability. To revert to plain text output, you can rename the file `conf/log4j2.properties.pattern-layout-template` as `conf/log4j2.properties`, or use a custom log4j configuration file.

- Since Spark 4.0, the MDC (Mapped Diagnostic Context) key for Spark task names in Spark logs has been changed from `mdc.taskName` to `task_name`. To use the key `mdc.taskName`, you can set `spark.log.legacyTaskNameMdc.enabled` to `true`.

- Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`.

## Upgrading from Core 3.4 to 3.5
Expand Down