Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -598,13 +597,21 @@ private[yarn] class YarnAllocator(
(false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r
val diag = vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding virtual memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key} or boosting " +
s"${YarnConfiguration.NM_VMEM_PMEM_RATIO} or disabling " +
s"${YarnConfiguration.NM_VMEM_CHECK_ENABLED} because of YARN-4714."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you removed the other config option I asked you to add... :-/

(true, message)
case PMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r
val diag = pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding physical memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
(true, message)
case _ =>
// all the failures which not covered above, like:
// disk failure, kill by app master or resource manager, ...
Expand Down Expand Up @@ -735,18 +742,6 @@ private[yarn] class YarnAllocator(

private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val PMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
val VMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
val VMEM_EXCEEDED_EXIT_CODE = -103
val PMEM_EXCEEDED_EXIT_CODE = -104

def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
val matcher = pattern.matcher(diagnostics)
val diag = if (matcher.find()) " " + matcher.group() + "." else ""
s"Container killed by YARN for exceeding memory limits. $diag " +
"Consider boosting spark.yarn.executor.memoryOverhead or " +
"disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714."
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.RpcEndpointRef
Expand Down Expand Up @@ -377,17 +376,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
}

test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}

test("window based failure executor counting") {
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
val handler = createAllocator(4)
Expand Down