Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -598,13 +597,23 @@ private[yarn] class YarnAllocator(
(false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
val diag = VMEM_EXCEEDED_PATTERN.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val additional = if (conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't answer my question. How can you even get here if this config is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED)) {
s"or disabling ${YarnConfiguration.NM_VMEM_CHECK_ENABLED} because of YARN-4714"
} else {
s"or boosting ${YarnConfiguration.NM_VMEM_PMEM_RATIO}"
}
val message = "Container killed by YARN for exceeding virtual memory limits." +
s" $diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key} $additional."
(true, message)
case PMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
val diag = PMEM_EXCEEDED_PATTERN.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding physical memory limits." +
s" $diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
(true, message)
case _ =>
// all the failures which not covered above, like:
// disk failure, kill by app master or resource manager, ...
Expand Down Expand Up @@ -735,18 +744,8 @@ private[yarn] class YarnAllocator(

private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val PMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
val VMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
val PMEM_EXCEEDED_PATTERN = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still inline these patterns right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will do it.

val VMEM_EXCEEDED_PATTERN = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r
val VMEM_EXCEEDED_EXIT_CODE = -103
val PMEM_EXCEEDED_EXIT_CODE = -104

def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
val matcher = pattern.matcher(diagnostics)
val diag = if (matcher.find()) " " + matcher.group() + "." else ""
s"Container killed by YARN for exceeding memory limits. $diag " +
"Consider boosting spark.yarn.executor.memoryOverhead or " +
"disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714."
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.RpcEndpointRef
Expand Down Expand Up @@ -377,17 +376,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
}

test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}

test("window based failure executor counting") {
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
val handler = createAllocator(4)
Expand Down