Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.memory;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -25,27 +26,37 @@

public class TaskMemoryManagerSuite {

static TaskMemoryManager manager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not need to be static. Also should be private.


@After
public void after() {
Assert.assertEquals(0, manager.getMemoryConsumptionForThisTask());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Assert.assertNotNull(manager).

Also, probably minor, but if a test fails, these checks will probably fail in weird ways and might mask the original error... I'd try to insert a failure in a test and seeing what happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito had the same concern. I purposefully leaked some memory at the end of leakedPageMemoryIsDetected and got this very helpful trace:

[info] Test org.apache.spark.memory.TaskMemoryManagerSuite.leakedPageMemoryIsDetected started
[error] Test org.apache.spark.memory.TaskMemoryManagerSuite.leakedPageMemoryIsDetected failed: java.lang.AssertionError: expected:<0> but was:<4096>, took 0.007 sec
[error] at org.apache.spark.memory.TaskMemoryManagerSuite.after(TaskMemoryManagerSuite.java:34)
[error] ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like this assert triggering, not the actual test failing?

I'm talking about is leaking memory and inserting a fail("blah") in a test and seeing what's reported.

Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
manager = null;
}

@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
manager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Long.MAX_VALUE,
1),
0);
final MemoryConsumer c = new TestMemoryConsumer(manager);
manager.allocatePage(4096, c); // leak memory
final MemoryBlock block = manager.allocatePage(4096, c); // leak memory
Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
manager.freePage(block, c);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed? Why isn't the call above freeing this memory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should. We can check for that instead with:
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());

}

@Test
public void encodePageNumberAndOffsetOffHeap() {
final SparkConf conf = new SparkConf()
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
final MemoryBlock dataPage = manager.allocatePage(256, c);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
Expand All @@ -58,7 +69,7 @@ public void encodePageNumberAndOffsetOffHeap() {

@Test
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
manager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
final MemoryBlock dataPage = manager.allocatePage(256, c);
Expand All @@ -71,7 +82,7 @@ public void encodePageNumberAndOffsetOnHeap() {
public void cooperativeSpilling() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
manager = new TaskMemoryManager(memoryManager, 0);

TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
Expand Down Expand Up @@ -106,14 +117,13 @@ public void cooperativeSpilling() {

c1.free(0);
c2.free(100);
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}

@Test
public void cooperativeSpilling2() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
manager = new TaskMemoryManager(memoryManager, 0);

TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
Expand Down Expand Up @@ -141,14 +151,13 @@ public void cooperativeSpilling2() {
c1.free(0);
c2.free(80);
c3.free(10);
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}

@Test
public void shouldNotForceSpillingInDifferentModes() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
manager = new TaskMemoryManager(memoryManager, 0);

TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
Expand All @@ -170,7 +179,7 @@ public void offHeapConfigurationBackwardsCompatibility() {
final SparkConf conf = new SparkConf()
.set("spark.unsafe.offHeap", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode);
}

Expand Down