Skip to content

Commit e2546c2

Browse files
committed
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory. ## User-facing changes This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit. ## Internals changes This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies. There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances). I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution. ## TODOs - [x] Fix handful of test failures in the MemoryManagerSuites. - [x] Fix remaining TODO comments in code. - [ ] Document new configuration. - [x] Fix commented-out tests / asserts: - [x] UnifiedMemoryManagerSuite. - [x] Write tests that exercise the new off-heap memory management policies. Author: Josh Rosen <[email protected]> Closes #9344 from JoshRosen/offheap-memory-accounting. (cherry picked from commit 30b706b) Signed-off-by: Josh Rosen <[email protected]>
1 parent 68c1d9f commit e2546c2

21 files changed

Lines changed: 828 additions & 465 deletions

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717

1818
package org.apache.spark.memory;
1919

20-
2120
import java.io.IOException;
2221

2322
import org.apache.spark.unsafe.array.LongArray;
2423
import org.apache.spark.unsafe.memory.MemoryBlock;
2524

26-
2725
/**
2826
* An memory consumer of TaskMemoryManager, which support spilling.
27+
*
28+
* Note: this only supports allocation / spilling of Tungsten memory.
2929
*/
3030
public abstract class MemoryConsumer {
3131

@@ -36,7 +36,6 @@ public abstract class MemoryConsumer {
3636
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
3737
this.taskMemoryManager = taskMemoryManager;
3838
this.pageSize = pageSize;
39-
this.used = 0;
4039
}
4140

4241
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
@@ -67,6 +66,8 @@ public void spill() throws IOException {
6766
*
6867
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
6968
*
69+
* Note: today, this only frees Tungsten-managed pages.
70+
*
7071
* @param size the amount of memory should be released
7172
* @param trigger the MemoryConsumer that trigger this spilling
7273
* @return the amount of released memory in bytes
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.memory;
19+
20+
import org.apache.spark.annotation.Private;
21+
22+
@Private
23+
public enum MemoryMode {
24+
ON_HEAP,
25+
OFF_HEAP
26+
}

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ public class TaskMemoryManager {
103103
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
104104
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
105105
*/
106-
private final boolean inHeap;
106+
final MemoryMode tungstenMemoryMode;
107107

108108
/**
109-
* The size of memory granted to each consumer.
109+
* Tracks spillable memory consumers.
110110
*/
111111
@GuardedBy("this")
112112
private final HashSet<MemoryConsumer> consumers;
@@ -115,7 +115,7 @@ public class TaskMemoryManager {
115115
* Construct a new TaskMemoryManager.
116116
*/
117117
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
118-
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
118+
this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
119119
this.memoryManager = memoryManager;
120120
this.taskAttemptId = taskAttemptId;
121121
this.consumers = new HashSet<>();
@@ -127,23 +127,30 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
127127
*
128128
* @return number of bytes successfully granted (<= N).
129129
*/
130-
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
130+
public long acquireExecutionMemory(
131+
long required,
132+
MemoryMode mode,
133+
MemoryConsumer consumer) {
131134
assert(required >= 0);
135+
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
136+
// memory here, then it may not make sense to spill since that would only end up freeing
137+
// off-heap memory. This is subject to change, though, so it may be risky to make this
138+
// optimization now in case we forget to undo it late when making changes.
132139
synchronized (this) {
133-
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
140+
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
134141

135-
// try to release memory from other consumers first, then we can reduce the frequency of
142+
// Try to release memory from other consumers first, then we can reduce the frequency of
136143
// spilling, avoid to have too many spilled files.
137144
if (got < required) {
138145
// Call spill() on other consumers to release memory
139146
for (MemoryConsumer c: consumers) {
140147
if (c != consumer && c.getUsed() > 0) {
141148
try {
142149
long released = c.spill(required - got, consumer);
143-
if (released > 0) {
144-
logger.info("Task {} released {} from {} for {}", taskAttemptId,
150+
if (released > 0 && mode == tungstenMemoryMode) {
151+
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
145152
Utils.bytesToString(released), c, consumer);
146-
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
153+
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
147154
if (got >= required) {
148155
break;
149156
}
@@ -161,10 +168,10 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
161168
if (got < required && consumer != null) {
162169
try {
163170
long released = consumer.spill(required - got, consumer);
164-
if (released > 0) {
165-
logger.info("Task {} released {} from itself ({})", taskAttemptId,
171+
if (released > 0 && mode == tungstenMemoryMode) {
172+
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
166173
Utils.bytesToString(released), consumer);
167-
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
174+
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
168175
}
169176
} catch (IOException e) {
170177
logger.error("error while calling spill() on " + consumer, e);
@@ -184,9 +191,9 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
184191
/**
185192
* Release N bytes of execution memory for a MemoryConsumer.
186193
*/
187-
public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
194+
public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) {
188195
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
189-
memoryManager.releaseExecutionMemory(size, taskAttemptId);
196+
memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
190197
}
191198

192199
/**
@@ -195,11 +202,19 @@ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
195202
public void showMemoryUsage() {
196203
logger.info("Memory used in task " + taskAttemptId);
197204
synchronized (this) {
205+
long memoryAccountedForByConsumers = 0;
198206
for (MemoryConsumer c: consumers) {
199-
if (c.getUsed() > 0) {
200-
logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
207+
long totalMemUsage = c.getUsed();
208+
memoryAccountedForByConsumers += totalMemUsage;
209+
if (totalMemUsage > 0) {
210+
logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
201211
}
202212
}
213+
long memoryNotAccountedFor =
214+
memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
215+
logger.info(
216+
"{} bytes of memory were used by task {} but are not associated with specific consumers",
217+
memoryNotAccountedFor, taskAttemptId);
203218
}
204219
}
205220

@@ -214,15 +229,16 @@ public long pageSizeBytes() {
214229
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
215230
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
216231
*
217-
* Returns `null` if there was not enough memory to allocate the page.
232+
* Returns `null` if there was not enough memory to allocate the page. May return a page that
233+
* contains fewer bytes than requested, so callers should verify the size of returned pages.
218234
*/
219235
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
220236
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
221237
throw new IllegalArgumentException(
222238
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
223239
}
224240

225-
long acquired = acquireExecutionMemory(size, consumer);
241+
long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
226242
if (acquired <= 0) {
227243
return null;
228244
}
@@ -231,7 +247,7 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
231247
synchronized (this) {
232248
pageNumber = allocatedPages.nextClearBit(0);
233249
if (pageNumber >= PAGE_TABLE_SIZE) {
234-
releaseExecutionMemory(acquired, consumer);
250+
releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
235251
throw new IllegalStateException(
236252
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
237253
}
@@ -262,7 +278,7 @@ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
262278
}
263279
long pageSize = page.size();
264280
memoryManager.tungstenMemoryAllocator().free(page);
265-
releaseExecutionMemory(pageSize, consumer);
281+
releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
266282
}
267283

268284
/**
@@ -276,7 +292,7 @@ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
276292
* @return an encoded page address.
277293
*/
278294
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
279-
if (!inHeap) {
295+
if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
280296
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
281297
// encode. Due to our page size limitation, though, we can convert this into an offset that's
282298
// relative to the page's base offset; this relative offset will fit in 51 bits.
@@ -305,7 +321,7 @@ private static long decodeOffset(long pagePlusOffsetAddress) {
305321
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
306322
*/
307323
public Object getPage(long pagePlusOffsetAddress) {
308-
if (inHeap) {
324+
if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
309325
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
310326
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
311327
final MemoryBlock page = pageTable[pageNumber];
@@ -323,7 +339,7 @@ public Object getPage(long pagePlusOffsetAddress) {
323339
*/
324340
public long getOffsetInPage(long pagePlusOffsetAddress) {
325341
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
326-
if (inHeap) {
342+
if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
327343
return offsetInPage;
328344
} else {
329345
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
@@ -351,11 +367,19 @@ public long cleanUpAllAllocatedMemory() {
351367
}
352368
consumers.clear();
353369
}
370+
371+
for (MemoryBlock page : pageTable) {
372+
if (page != null) {
373+
memoryManager.tungstenMemoryAllocator().free(page);
374+
}
375+
}
376+
Arrays.fill(pageTable, null);
377+
354378
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
355379
}
356380

357381
/**
358-
* Returns the memory consumption, in bytes, for the current task
382+
* Returns the memory consumption, in bytes, for the current task.
359383
*/
360384
public long getMemoryConsumptionForThisTask() {
361385
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ object SparkEnv extends Logging {
341341
if (useLegacyMemoryManager) {
342342
new StaticMemoryManager(conf, numUsableCores)
343343
} else {
344-
new UnifiedMemoryManager(conf, numUsableCores)
344+
UnifiedMemoryManager(conf, numUsableCores)
345345
}
346346

347347
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)

0 commit comments

Comments
 (0)