Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,13 @@ public long spill() throws IOException {
}
allocatedPages.clear();
}

// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
logger.warn("released {} from {}", released, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems a little strong. should be info

return released;
}
}
Expand All @@ -489,10 +496,6 @@ public void loadNext() throws IOException {
}
upstream = nextUpstream;
nextUpstream = null;

assert(inMemSorter != null);
inMemSorter.free();
inMemSorter = null;
}
numRecords--;
upstream.loadNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public UnsafeInMemorySorter(
*/
public void free() {
consumer.freeArray(array);
array = null;
}

public void reset() {
Expand Down Expand Up @@ -160,28 +161,22 @@ public void insertRecord(long recordPointer, long keyPrefix) {
pos++;
}

public static final class SortedIterator extends UnsafeSorterIterator {
public final class SortedIterator extends UnsafeSorterIterator {

private final TaskMemoryManager memoryManager;
private final int sortBufferInsertPosition;
private final LongArray sortBuffer;
private int position = 0;
private final int numRecords;
private int position;
private Object baseObject;
private long baseOffset;
private long keyPrefix;
private int recordLength;

private SortedIterator(
TaskMemoryManager memoryManager,
int sortBufferInsertPosition,
LongArray sortBuffer) {
this.memoryManager = memoryManager;
this.sortBufferInsertPosition = sortBufferInsertPosition;
this.sortBuffer = sortBuffer;
private SortedIterator(int numRecords) {
this.numRecords = numRecords;
this.position = 0;
}

public SortedIterator clone () {
SortedIterator iter = new SortedIterator(memoryManager, sortBufferInsertPosition, sortBuffer);
SortedIterator iter = new SortedIterator(numRecords);
iter.position = position;
iter.baseObject = baseObject;
iter.baseOffset = baseOffset;
Expand All @@ -192,21 +187,21 @@ public SortedIterator clone () {

@Override
public boolean hasNext() {
return position < sortBufferInsertPosition;
return position / 2 < numRecords;
}

public int numRecordsLeft() {
return (sortBufferInsertPosition - position) / 2;
return numRecords - position / 2;
}

@Override
public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = sortBuffer.get(position);
final long recordPointer = array.get(position);
baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4);
keyPrefix = sortBuffer.get(position + 1);
keyPrefix = array.get(position + 1);
position += 2;
}

Expand All @@ -229,6 +224,6 @@ public void loadNext() {
*/
public SortedIterator getSortedIterator() {
sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(memoryManager, pos, array);
return new SortedIterator(pos / 2);
}
}