diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 2dff241900e82..a7bd4b3799a25 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -88,7 +88,7 @@ public void spill() throws IOException { * `LongArray` is too large to fit in a single page. The caller side should take care of these * two exceptions, or make sure the `size` is small enough that won't trigger exceptions. * - * @throws OutOfMemoryError + * @throws SparkOutOfMemoryError * @throws TooLargePageException */ public LongArray allocateArray(long size) { @@ -154,6 +154,6 @@ private void throwOom(final MemoryBlock page, final long required) { taskMemoryManager.freePage(page, this); } taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); } } diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java new file mode 100644 index 0000000000000..ca00ca58e9713 --- /dev/null +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.memory; + +import org.apache.spark.annotation.Private; + +/** + * This exception is thrown when a task can not acquire memory from the Memory manager. + * Instead of throwing {@link OutOfMemoryError}, which kills the executor, + * we should use throw this exception, which just kills the current task. + */ +@Private +public final class SparkOutOfMemoryError extends OutOfMemoryError { + + public SparkOutOfMemoryError(String s) { + super(s); + } + + public SparkOutOfMemoryError(OutOfMemoryError e) { + super(e.getMessage()); + } +} diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index f6b5ea3c0ad26..e8d3730daa7a4 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -192,7 +192,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " + throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : " + e.getMessage()); } } @@ -213,7 +213,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); - throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " + throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : " + e.getMessage()); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index e80f9734ecf7b..c3a07b2abf896 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -33,6 +33,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.internal.config.package$; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TooLargePageException; import org.apache.spark.serializer.DummySerializerInstance; @@ -337,7 +338,7 @@ private void growPointerArrayIfNecessary() throws IOException { // The pointer array is too big to fix in a single page, spill. spill(); return; - } catch (OutOfMemoryError e) { + } catch (SparkOutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8b8e15e3f78ed..66118f454159b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -349,7 +350,7 @@ private void growPointerArrayIfNecessary() throws IOException { // The pointer array is too big to fix in a single page, spill. spill(); return; - } catch (OutOfMemoryError e) { + } catch (SparkOutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 3bb87a6ed653d..951d076420ee6 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -24,6 +24,7 @@ import org.apache.spark.TaskContext; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.UnsafeAlignedOffset; @@ -212,7 +213,7 @@ public boolean hasSpaceForAnotherRecord() { public void expandPointerArray(LongArray newArray) { if (newArray.size() < array.size()) { - throw new OutOfMemoryError("Not enough memory to grow pointer array"); + throw new SparkOutOfMemoryError("Not enough memory to grow pointer array"); } Platform.copyMemory( array.getBaseObject(), diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index af0a0ab656564..2c3a8ef74800b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -35,7 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription} import org.apache.spark.shuffle.FetchFailedException @@ -553,10 +553,9 @@ private[spark] class Executor( // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. - if (Utils.isFatalError(t)) { + if (!t.isInstanceOf[SparkOutOfMemoryError] && Utils.isFatalError(t)) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t) } - } finally { runningTasks.remove(taskId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 756eeb642e2d0..9dc334c1ead3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.TaskContext import org.apache.spark.internal.Logging +import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -205,7 +206,7 @@ class TungstenAggregationIterator( buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) if (buffer == null) { // failed to allocate the first page - throw new OutOfMemoryError("No enough memory for aggregation") + throw new SparkOutOfMemoryError("No enough memory for aggregation") } } processRow(buffer, newInput)