Skip to content

Commit 4a4e62e

Browse files
committed
spark.unsafe.Platform interface changed, BufferHolder is replaced by MemoryBlockHolder, OffHeapMemoryBlock uses DirectByteBuffer for off-heap memory allocation, UnsafeRow and others hold memory in MemoryBlocks instead of indefinite Objects
1 parent f9f2776 commit 4a4e62e

File tree

23 files changed

+286
-342
lines changed

23 files changed

+286
-342
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.spark.unsafe.Platform;
2727
import org.apache.spark.unsafe.array.ByteArrayMethods;
2828
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
29+
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
30+
import org.apache.spark.unsafe.memory.MemoryBlock;
2931
import org.apache.spark.unsafe.types.CalendarInterval;
3032
import org.apache.spark.unsafe.types.UTF8String;
3133

@@ -49,7 +51,7 @@
4951
// todo: there is a lof of duplicated code between UnsafeRow and UnsafeArrayData.
5052
public class UnsafeArrayData extends ArrayData {
5153

52-
private Object baseObject;
54+
private MemoryBlock baseObject;
5355
private long baseOffset;
5456

5557
// The number of elements in this array
@@ -101,7 +103,7 @@ public UnsafeArrayData() { }
101103
* @param baseOffset the offset within the base object
102104
* @param sizeInBytes the size of this array's backing data, in bytes
103105
*/
104-
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
106+
public void pointTo(MemoryBlock baseObject, long baseOffset, int sizeInBytes) {
105107
// Read the number of elements from the first 4 bytes.
106108
final int numElements = Platform.getInt(baseObject, baseOffset);
107109
assert numElements >= 0 : "numElements (" + numElements + ") should >= 0";
@@ -314,7 +316,11 @@ public boolean equals(Object other) {
314316
return false;
315317
}
316318

317-
public void writeToMemory(Object target, long targetOffset) {
319+
public void writeToMemory(byte[] target, long targetOffset) {
320+
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
321+
}
322+
323+
public void writeToMemory(MemoryBlock target, long targetOffset) {
318324
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
319325
}
320326

@@ -330,10 +336,10 @@ public void writeTo(ByteBuffer buffer) {
330336
@Override
331337
public UnsafeArrayData copy() {
332338
UnsafeArrayData arrayCopy = new UnsafeArrayData();
333-
final byte[] arrayDataCopy = new byte[sizeInBytes];
339+
ByteArrayMemoryBlock newBlock = ByteArrayMemoryBlock.fromByteArray(new byte[sizeInBytes]);
334340
Platform.copyMemory(
335-
baseObject, baseOffset, arrayDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
336-
arrayCopy.pointTo(arrayDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
341+
baseObject, baseOffset, newBlock, newBlock.getBaseOffset(), sizeInBytes);
342+
arrayCopy.pointTo(newBlock, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
337343
return arrayCopy;
338344
}
339345
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.spark.sql.catalyst.util.MapData;
2323
import org.apache.spark.unsafe.Platform;
24+
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
25+
import org.apache.spark.unsafe.memory.MemoryBlock;
2426

2527
/**
2628
* An Unsafe implementation of Map which is backed by raw memory instead of Java objects.
@@ -32,15 +34,15 @@
3234
// TODO: Use a more efficient format which doesn't depend on unsafe array.
3335
public class UnsafeMapData extends MapData {
3436

35-
private Object baseObject;
37+
private MemoryBlock baseObject;
3638
private long baseOffset;
3739

3840
// The size of this map's backing data, in bytes.
3941
// The 4-bytes header of key array `numBytes` is also included, so it's actually equal to
4042
// 4 + key array numBytes + value array numBytes.
4143
private int sizeInBytes;
4244

43-
public Object getBaseObject() { return baseObject; }
45+
public MemoryBlock getBaseObject() { return baseObject; }
4446
public long getBaseOffset() { return baseOffset; }
4547
public int getSizeInBytes() { return sizeInBytes; }
4648

@@ -64,7 +66,7 @@ public UnsafeMapData() {
6466
* @param baseOffset the offset within the base object
6567
* @param sizeInBytes the size of this map's backing data, in bytes
6668
*/
67-
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
69+
public void pointTo(MemoryBlock baseObject, long baseOffset, int sizeInBytes) {
6870
// Read the numBytes of key array from the first 4 bytes.
6971
final int keyArraySize = Platform.getInt(baseObject, baseOffset);
7072
final int valueArraySize = sizeInBytes - keyArraySize - 4;
@@ -96,7 +98,11 @@ public UnsafeArrayData valueArray() {
9698
return values;
9799
}
98100

99-
public void writeToMemory(Object target, long targetOffset) {
101+
public void writeToMemory(byte[] target, long targetOffset) {
102+
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
103+
}
104+
105+
public void writeToMemory(MemoryBlock target, long targetOffset) {
100106
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
101107
}
102108

@@ -112,7 +118,7 @@ public void writeTo(ByteBuffer buffer) {
112118
@Override
113119
public UnsafeMapData copy() {
114120
UnsafeMapData mapCopy = new UnsafeMapData();
115-
final byte[] mapDataCopy = new byte[sizeInBytes];
121+
final ByteArrayMemoryBlock mapDataCopy = ByteArrayMemoryBlock.fromByteArray(new byte[sizeInBytes]);
116122
Platform.copyMemory(
117123
baseObject, baseOffset, mapDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
118124
mapCopy.pointTo(mapDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.spark.unsafe.array.ByteArrayMethods;
3737
import org.apache.spark.unsafe.bitset.BitSetMethods;
3838
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
39+
import org.apache.spark.unsafe.memory.MemoryBlock;
40+
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
3941
import org.apache.spark.unsafe.types.CalendarInterval;
4042
import org.apache.spark.unsafe.types.UTF8String;
4143

@@ -111,7 +113,7 @@ public static boolean isMutable(DataType dt) {
111113
// Private fields and methods
112114
//////////////////////////////////////////////////////////////////////////////
113115

114-
private Object baseObject;
116+
private MemoryBlock baseObject;
115117
private long baseOffset;
116118

117119
/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
@@ -150,35 +152,19 @@ public UnsafeRow(int numFields) {
150152
// for serializer
151153
public UnsafeRow() {}
152154

153-
public Object getBaseObject() { return baseObject; }
155+
public MemoryBlock getBaseObject() { return baseObject; }
154156
public long getBaseOffset() { return baseOffset; }
155157
public int getSizeInBytes() { return sizeInBytes; }
156158

157159
@Override
158160
public int numFields() { return numFields; }
159161

160-
/**
161-
* Update this UnsafeRow to point to different backing data.
162-
*
163-
* @param baseObject the base object
164-
* @param baseOffset the offset within the base object
165-
* @param sizeInBytes the size of this row's backing data, in bytes
166-
*/
167-
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
162+
public void pointTo( MemoryBlock aBlock, long anOffset, int aSizeInBytes ) {
168163
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
169-
this.baseObject = baseObject;
170-
this.baseOffset = baseOffset;
171-
this.sizeInBytes = sizeInBytes;
172-
}
164+
this.baseObject = aBlock;
165+
this.baseOffset = anOffset;
166+
this.sizeInBytes = aSizeInBytes;
173167

174-
/**
175-
* Update this UnsafeRow to point to the underlying byte array.
176-
*
177-
* @param buf byte array to point to
178-
* @param sizeInBytes the number of bytes valid in the byte array
179-
*/
180-
public void pointTo(byte[] buf, int sizeInBytes) {
181-
pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
182168
}
183169

184170
public void setTotalSize(int sizeInBytes) {
@@ -500,15 +486,15 @@ public UnsafeMapData getMap(int ordinal) {
500486
@Override
501487
public UnsafeRow copy() {
502488
UnsafeRow rowCopy = new UnsafeRow(numFields);
503-
final byte[] rowDataCopy = new byte[sizeInBytes];
489+
ByteArrayMemoryBlock block = ByteArrayMemoryBlock.fromByteArray(new byte[sizeInBytes]);
504490
Platform.copyMemory(
505491
baseObject,
506492
baseOffset,
507-
rowDataCopy,
508-
Platform.BYTE_ARRAY_OFFSET,
493+
block,
494+
block.getBaseOffset(),
509495
sizeInBytes
510496
);
511-
rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
497+
rowCopy.pointTo(block, block.getBaseOffset(), sizeInBytes);
512498
return rowCopy;
513499
}
514500

@@ -518,7 +504,8 @@ public UnsafeRow copy() {
518504
*/
519505
public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
520506
final UnsafeRow row = new UnsafeRow(numFields);
521-
row.pointTo(new byte[numBytes], numBytes);
507+
ByteArrayMemoryBlock block = ByteArrayMemoryBlock.fromByteArray(new byte[numBytes]);
508+
row.pointTo(block, block.getBaseOffset(), numBytes);
522509
return row;
523510
}
524511

@@ -528,13 +515,15 @@ public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
528515
*/
529516
public void copyFrom(UnsafeRow row) {
530517
// copyFrom is only available for UnsafeRow created from byte array.
531-
assert (baseObject instanceof byte[]) && baseOffset == Platform.BYTE_ARRAY_OFFSET;
518+
assert (baseObject instanceof ByteArrayMemoryBlock);
532519
if (row.sizeInBytes > this.sizeInBytes) {
533520
// resize the underlying byte[] if it's not large enough.
534-
this.baseObject = new byte[row.sizeInBytes];
521+
this.baseObject = ByteArrayMemoryBlock.fromByteArray(new byte[row.sizeInBytes]);
535522
}
536523
Platform.copyMemory(
537-
row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes);
524+
row.baseObject, row.baseOffset,
525+
this.baseObject, this.baseOffset,
526+
row.sizeInBytes );
538527
// update the sizeInBytes.
539528
this.sizeInBytes = row.sizeInBytes;
540529
}
@@ -548,9 +537,9 @@ public void copyFrom(UnsafeRow row) {
548537
* buffer will not be used and may be null.
549538
*/
550539
public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
551-
if (baseObject instanceof byte[]) {
552-
int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
553-
out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
540+
if (baseObject instanceof ByteArrayMemoryBlock) {
541+
int startIndex = (int) (baseOffset - baseObject.getBaseOffset());
542+
out.write((byte[])baseObject.getBaseObject(), startIndex, sizeInBytes);
554543
} else {
555544
int dataRemaining = sizeInBytes;
556545
long rowReadPosition = baseOffset;
@@ -585,9 +574,11 @@ public boolean equals(Object other) {
585574
* Returns the underlying bytes for this UnsafeRow.
586575
*/
587576
public byte[] getBytes() {
588-
if (baseObject instanceof byte[] && baseOffset == Platform.BYTE_ARRAY_OFFSET
589-
&& (((byte[]) baseObject).length == sizeInBytes)) {
590-
return (byte[]) baseObject;
577+
if ( baseObject instanceof ByteArrayMemoryBlock
578+
&& baseOffset == Platform.BYTE_ARRAY_OFFSET
579+
&& (((byte[]) baseObject.getBaseObject()).length == sizeInBytes) )
580+
{
581+
return (byte[]) baseObject.getBaseObject();
591582
} else {
592583
byte[] bytes = new byte[sizeInBytes];
593584
Platform.copyMemory(baseObject, baseOffset, bytes, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
@@ -617,7 +608,11 @@ public boolean anyNull() {
617608
* The target memory address must already been allocated, and have enough space to hold all the
618609
* bytes in this string.
619610
*/
620-
public void writeToMemory(Object target, long targetOffset) {
611+
public void writeToMemory(byte[] target, long targetOffset) {
612+
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
613+
}
614+
615+
public void writeToMemory(MemoryBlock target, long targetOffset) {
621616
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
622617
}
623618

@@ -665,8 +660,8 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
665660
this.sizeInBytes = in.readInt();
666661
this.numFields = in.readInt();
667662
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
668-
this.baseObject = new byte[sizeInBytes];
669-
in.readFully((byte[]) baseObject);
663+
this.baseObject = ByteArrayMemoryBlock.fromByteArray(new byte[sizeInBytes]);
664+
in.readFully((byte[]) baseObject.getBaseObject());
670665
}
671666

672667
@Override
@@ -683,7 +678,7 @@ public void read(Kryo kryo, Input in) {
683678
this.sizeInBytes = in.readInt();
684679
this.numFields = in.readInt();
685680
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
686-
this.baseObject = new byte[sizeInBytes];
687-
in.read((byte[]) baseObject);
681+
this.baseObject = ByteArrayMemoryBlock.fromByteArray(new byte[sizeInBytes]);
682+
in.read((byte[]) baseObject.getBaseObject());
688683
}
689684
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

0 commit comments

Comments
 (0)