@@ -103,7 +103,7 @@ public class TaskMemoryManager {
103103 * without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
104104 * this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
105105 */
106- private final boolean inHeap ;
106+ final MemoryMode tungstenMemoryMode ;
107107
108108 /**
109109 * The size of memory granted to each consumer.
@@ -115,7 +115,7 @@ public class TaskMemoryManager {
115115 * Construct a new TaskMemoryManager.
116116 */
117117 public TaskMemoryManager (MemoryManager memoryManager , long taskAttemptId ) {
118- this .inHeap = memoryManager .tungstenMemoryIsAllocatedInHeap ();
118+ this .tungstenMemoryMode = memoryManager .tungstenMemoryMode ();
119119 this .memoryManager = memoryManager ;
120120 this .taskAttemptId = taskAttemptId ;
121121 this .consumers = new HashSet <>();
@@ -127,23 +127,33 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
127127 *
128128 * @return number of bytes successfully granted (<= N).
129129 */
130- public long acquireExecutionMemory (long required , MemoryConsumer consumer ) {
130+ public long acquireExecutionMemory (
131+ long required ,
132+ MemoryMode mode ,
133+ MemoryConsumer consumer ) {
131134 assert (required >= 0 );
135+ // TODO(josh): handle spill differently based on type of request (on-heap vs off-heap).
136+ // If we are allocating tungsten pages off-heap and receive a request to allocate on-heap
137+ // memory here, then it may not make sense to spill since that would only end up freeing
138+ // off-heap memory. This is subject to change, though, so it may be risky to make this
139+ // optimization now in case we forget to undo it late when making changes.
132140 synchronized (this ) {
133- long got = memoryManager .acquireExecutionMemory (required , taskAttemptId );
141+ long got = memoryManager .acquireExecutionMemory (required , taskAttemptId , mode );
134142
135143 // try to release memory from other consumers first, then we can reduce the frequency of
136144 // spilling, avoid to have too many spilled files.
137145 if (got < required ) {
138146 // Call spill() on other consumers to release memory
139147 for (MemoryConsumer c : consumers ) {
140- if (c != null && c != consumer && c .getUsed ( ) > 0 ) {
148+ if (c != null && c != consumer && c .getMemoryUsed ( mode ) > 0 ) {
141149 try {
150+ // TODO(josh): subtlety / implementation detail: today, spill() happens to only
151+ // release Tungsten pages.
142152 long released = c .spill (required - got , consumer );
143- if (released > 0 ) {
153+ if (released > 0 && mode == tungstenMemoryMode ) {
144154 logger .info ("Task {} released {} from {} for {}" , taskAttemptId ,
145155 Utils .bytesToString (released ), c , consumer );
146- got += memoryManager .acquireExecutionMemory (required - got , taskAttemptId );
156+ got += memoryManager .acquireExecutionMemory (required - got , taskAttemptId , mode );
147157 if (got >= required ) {
148158 break ;
149159 }
@@ -161,10 +171,10 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
161171 if (got < required && consumer != null ) {
162172 try {
163173 long released = consumer .spill (required - got , consumer );
164- if (released > 0 ) {
174+ if (released > 0 && mode == tungstenMemoryMode ) {
165175 logger .info ("Task {} released {} from itself ({})" , taskAttemptId ,
166176 Utils .bytesToString (released ), consumer );
167- got += memoryManager .acquireExecutionMemory (required - got , taskAttemptId );
177+ got += memoryManager .acquireExecutionMemory (required - got , taskAttemptId , mode );
168178 }
169179 } catch (IOException e ) {
170180 logger .error ("error while calling spill() on " + consumer , e );
@@ -182,9 +192,9 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
182192 /**
183193 * Release N bytes of execution memory for a MemoryConsumer.
184194 */
185- public void releaseExecutionMemory (long size , MemoryConsumer consumer ) {
195+ public void releaseExecutionMemory (long size , MemoryMode mode , MemoryConsumer consumer ) {
186196 logger .debug ("Task {} release {} from {}" , taskAttemptId , Utils .bytesToString (size ), consumer );
187- memoryManager .releaseExecutionMemory (size , taskAttemptId );
197+ memoryManager .releaseExecutionMemory (size , taskAttemptId , mode );
188198 }
189199
190200 /**
@@ -194,8 +204,10 @@ public void showMemoryUsage() {
194204 logger .info ("Memory used in task " + taskAttemptId );
195205 synchronized (this ) {
196206 for (MemoryConsumer c : consumers ) {
197- if (c .getUsed () > 0 ) {
198- logger .info ("Acquired by " + c + ": " + Utils .bytesToString (c .getUsed ()));
207+ long totalMemUsage =
208+ c .getMemoryUsed (MemoryMode .OFF_HEAP ) + c .getMemoryUsed (MemoryMode .ON_HEAP );
209+ if (totalMemUsage > 0 ) {
210+ logger .info ("Acquired by " + c + ": " + Utils .bytesToString (totalMemUsage ));
199211 }
200212 }
201213 }
@@ -212,15 +224,16 @@ public long pageSizeBytes() {
212224 * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
213225 * intended for allocating large blocks of Tungsten memory that will be shared between operators.
214226 *
215- * Returns `null` if there was not enough memory to allocate the page.
227+ * Returns `null` if there was not enough memory to allocate the page. May return a page that
228+ * contains fewer bytes than requested, so callers should verify the size of returned pages.
216229 */
217230 public MemoryBlock allocatePage (long size , MemoryConsumer consumer ) {
218231 if (size > MAXIMUM_PAGE_SIZE_BYTES ) {
219232 throw new IllegalArgumentException (
220233 "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes" );
221234 }
222235
223- long acquired = acquireExecutionMemory (size , consumer );
236+ long acquired = acquireExecutionMemory (size , tungstenMemoryMode , consumer );
224237 if (acquired <= 0 ) {
225238 return null ;
226239 }
@@ -229,29 +242,12 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
229242 synchronized (this ) {
230243 pageNumber = allocatedPages .nextClearBit (0 );
231244 if (pageNumber >= PAGE_TABLE_SIZE ) {
232- releaseExecutionMemory (acquired , consumer );
245+ releaseExecutionMemory (acquired , tungstenMemoryMode , consumer );
233246 throw new IllegalStateException (
234247 "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages" );
235248 }
236249 allocatedPages .set (pageNumber );
237250 }
238- final long acquiredExecutionMemory ;
239- if (memoryManager .tungstenMemoryIsAllocatedInHeap ()) {
240- acquiredExecutionMemory = acquireOnHeapExecutionMemory (size );
241- } else {
242- acquiredExecutionMemory = acquireOffHeapExecutionMemory (size );
243- }
244- if (acquiredExecutionMemory != size ) {
245- if (memoryManager .tungstenMemoryIsAllocatedInHeap ()) {
246- releaseOnHeapExecutionMemory (acquiredExecutionMemory );
247- } else {
248- releaseOffHeapExecutionMemory (acquiredExecutionMemory );
249- }
250- synchronized (this ) {
251- allocatedPages .clear (pageNumber );
252- }
253- return null ;
254- }
255251 final MemoryBlock page = memoryManager .tungstenMemoryAllocator ().allocate (size );
256252 page .pageNumber = pageNumber ;
257253 pageTable [pageNumber ] = page ;
@@ -275,13 +271,7 @@ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
275271 if (logger .isTraceEnabled ()) {
276272 logger .trace ("Freed page number {} ({} bytes)" , page .pageNumber , page .size ());
277273 }
278- long pageSize = page .size ();
279- memoryManager .tungstenMemoryAllocator ().free (page );
280- if (memoryManager .tungstenMemoryIsAllocatedInHeap ()) {
281- releaseOnHeapExecutionMemory (pageSize );
282- } else {
283- releaseOffHeapExecutionMemory (pageSize );
284- }
274+ consumer .freePage (page );
285275 }
286276
287277 /**
@@ -295,7 +285,7 @@ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
295285 * @return an encoded page address.
296286 */
297287 public long encodePageNumberAndOffset (MemoryBlock page , long offsetInPage ) {
298- if (! inHeap ) {
288+ if (tungstenMemoryMode == MemoryMode . OFF_HEAP ) {
299289 // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
300290 // encode. Due to our page size limitation, though, we can convert this into an offset that's
301291 // relative to the page's base offset; this relative offset will fit in 51 bits.
@@ -324,7 +314,7 @@ private static long decodeOffset(long pagePlusOffsetAddress) {
324314 * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
325315 */
326316 public Object getPage (long pagePlusOffsetAddress ) {
327- if (inHeap ) {
317+ if (tungstenMemoryMode == MemoryMode . ON_HEAP ) {
328318 final int pageNumber = decodePageNumber (pagePlusOffsetAddress );
329319 assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE );
330320 final MemoryBlock page = pageTable [pageNumber ];
@@ -342,7 +332,7 @@ public Object getPage(long pagePlusOffsetAddress) {
342332 */
343333 public long getOffsetInPage (long pagePlusOffsetAddress ) {
344334 final long offsetInPage = decodeOffset (pagePlusOffsetAddress );
345- if (inHeap ) {
335+ if (tungstenMemoryMode == MemoryMode . ON_HEAP ) {
346336 return offsetInPage ;
347337 } else {
348338 // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
@@ -363,9 +353,15 @@ public long cleanUpAllAllocatedMemory() {
363353 synchronized (this ) {
364354 Arrays .fill (pageTable , null );
365355 for (MemoryConsumer c : consumers ) {
366- if (c != null && c .getUsed () > 0 ) {
356+ if (c != null && c .getMemoryUsed (MemoryMode .ON_HEAP ) > 0 ) {
357+ // In case of failed task, it's normal to see leaked memory
358+ logger .warn ("leak " + Utils .bytesToString (c .getMemoryUsed (MemoryMode .ON_HEAP )) +
359+ " of on-heap memory from " + c );
360+ }
361+ if (c != null && c .getMemoryUsed (MemoryMode .OFF_HEAP ) > 0 ) {
367362 // In case of failed task, it's normal to see leaked memory
368- logger .warn ("leak " + Utils .bytesToString (c .getUsed ()) + " memory from " + c );
363+ logger .warn ("leak " + Utils .bytesToString (c .getMemoryUsed (MemoryMode .OFF_HEAP )) +
364+ " of off-heap memory from " + c );
369365 }
370366 }
371367 consumers .clear ();
0 commit comments