Skip to content
Closed
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
f427ca2
introduce ByteArrayMemoryBlock, IntArrayMemoryBlock, LongArrayMemoryB…
kiszk Sep 13, 2017
5d7ccdb
OffHeapColumnVector uses UnsafeMemoryAllocator
kiszk Sep 13, 2017
251fa09
UTF8String uses UnsafeMemoryAllocator
kiszk Sep 13, 2017
790bbe7
Platform.copymemory() in UsafeInMemorySorter uses new MemoryBlock
kiszk Sep 13, 2017
93a792e
address review comments
kiszk Sep 14, 2017
0beab03
fix test failures (e.g. String in UnsafeArrayData)
kiszk Sep 14, 2017
fcf764c
fix failures
kiszk Sep 18, 2017
d2d2e50
minor update of UTF8String constructor
kiszk Sep 21, 2017
f5e10bb
rename method name
kiszk Sep 22, 2017
1905e8c
remove unused code
kiszk Sep 22, 2017
7778e58
update arrayEquals
kiszk Sep 22, 2017
4f96c82
rebase master
kiszk Sep 22, 2017
d1d6ae9
make more methods final
kiszk Sep 22, 2017
914dcd1
make fill method final in MemoryBlock
kiszk Sep 22, 2017
336e4b7
fix test failures
kiszk Sep 23, 2017
5be9ccb
add testsuite
kiszk Sep 24, 2017
43e6b57
pass concrete type to the first argument of Platform.get*/put* to get…
kiszk Sep 24, 2017
05f024e
rename methods related to hash
kiszk Sep 28, 2017
9071cf6
added methods for MemoryBlock
kiszk Sep 28, 2017
37ee9fa
rebase with master
kiszk Sep 28, 2017
d0b5d59
fix scala style error
kiszk Sep 28, 2017
5cdad44
use MemoryBlock in Murmur3 for performance reason
kiszk Oct 14, 2017
91028fa
fix typo in comment
kiszk Oct 14, 2017
0210bd1
address review comment
kiszk Oct 29, 2017
df6dad3
rebase with master
kiszk Nov 28, 2017
1fa47a8
fix failures
kiszk Feb 20, 2018
01f9c8e
fix failures in ArrowColumnVectorSuite and FeatureHasherSuite
kiszk Feb 20, 2018
2ed8f82
address review comments
kiszk Feb 21, 2018
5e3afd1
address review comments
kiszk Feb 23, 2018
95fbdee
fix test failures
kiszk Feb 24, 2018
9b4975b
fix test failures
kiszk Feb 25, 2018
9e2697c
remove MemoryBlock offset at caller site for allocate() and copyMemory()
kiszk Feb 25, 2018
8cd4853
provide generic copyFrom() and writeTo() in MemoryBlock
kiszk Feb 25, 2018
1bed048
address review comments
kiszk Feb 26, 2018
c79585f
address review comments
kiszk Feb 27, 2018
77cdb81
fix indent
kiszk Feb 27, 2018
ee5a798
remove assert for performance
kiszk Feb 27, 2018
c9f401a
address review comments
kiszk Mar 2, 2018
cf2d532
fix compilation failure
kiszk Mar 2, 2018
eb0cc6d
address review comment
kiszk Mar 2, 2018
6f57994
fix test failures
kiszk Mar 2, 2018
3a93d61
fix test failures
kiszk Mar 3, 2018
abf6ba0
reduce duplicated code in hash
kiszk Mar 3, 2018
4567781
address review comments
kiszk Mar 3, 2018
95ffd0f
address review comments
kiszk Mar 3, 2018
9cbb876
add review comment
kiszk Mar 4, 2018
291203c
add review comments
kiszk Mar 4, 2018
a62770b
address review comments
kiszk Mar 6, 2018
f9bc4d6
address review comments
kiszk Mar 7, 2018
c6d45ea
address review comments
kiszk Mar 16, 2018
5284593
address review comment
kiszk Mar 17, 2018
b1750a1
address review comments
kiszk Mar 20, 2018
38ddf48
address review comment
kiszk Mar 20, 2018
4e46a18
make MemoryBlock.length mutable for reuse
kiszk Mar 22, 2018
511d58d
address review comment
kiszk Mar 26, 2018
1939fda
include MemoryBlock.offset in get/putXXX
kiszk Mar 26, 2018
cb4b30b
Merge branch 'SPARK-10399' of github.com:kiszk/spark into SPARK-10399
kiszk Mar 26, 2018
c53b6b8
address review comments
kiszk Mar 27, 2018
45aa736
fix test failure
kiszk Mar 27, 2018
8690e43
address review comment
kiszk Mar 29, 2018
59fd393
fix test failures
kiszk Mar 29, 2018
b69cb64
address yesterday's review comment
kiszk Mar 29, 2018
94c9648
address review comment
kiszk Mar 29, 2018
e4a7016
address review comments
kiszk Apr 5, 2018
3d03f92
Address review comments
kiszk Apr 5, 2018
50326ca
Address review comments
kiszk Apr 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* Simulates Hive's hashing function from Hive v1.2.1
Expand All @@ -38,12 +39,18 @@ public static int hashLong(long input) {
return (int) ((input >>> 32) ^ input);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
public static int hashUnsafeBytesBlock(MemoryBlock mb) {
long offset = mb.getBaseOffset();
int lengthInBytes = (int)mb.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overflow check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we just use long here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us use long.

assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
for (int i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) Platform.getByte(base, offset + i);
result = (result * 31) + (int) mb.getByte(offset + i);
}
return result;
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,6 @@ public static void freeMemory(long address) {
_UNSAFE.freeMemory(address);
}

public static long reallocateMemory(long address, long oldSize, long newSize) {
long newMemory = _UNSAFE.allocateMemory(newSize);
copyMemory(null, address, null, newMemory, oldSize);
freeMemory(address);
return newMemory;
}

/**
* Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
Expand Down Expand Up @@ -187,7 +180,7 @@ public static void setMemory(long address, byte value, long size) {
}

public static void copyMemory(
Object src, long srcOffset, Object dst, long dstOffset, long length) {
Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy
// forward or backwards. This is necessary in case src and dst overlap.
if (dstOffset < srcOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

public class ByteArrayMethods {

Expand Down Expand Up @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;

private static final boolean unaligned = Platform.unaligned();
/**
* MemoryBlock equality check for MemoryBlocks.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEqualsBlock(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should have new doc instead of using arrayEquals's.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no one is calling it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called from UTF8String class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this only works for ByteArrayMemoryBlock, doesn't it?

MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, final long length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any semantic difference between this method and one right below ? Can we do getBaseObject() over the leftBase and rightBase and call the method below at L#85 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you missed this. I still see the code duplication. This entire method block could be replaced as:

arrayEquals(
  leftBase.getBaseObject(),
  leftOffset,
  rightBase.getBaseObject(),
  rightOffset,
  length
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I missed this.

return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
}

/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
Expand All @@ -56,7 +67,7 @@ public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;

// check if stars align and we can get both offsets to be aligned
// check if starts align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
Expand All @@ -33,16 +32,12 @@ public final class LongArray {
private static final long WIDTH = 8;

private final MemoryBlock memory;
private final Object baseObj;
private final long baseOffset;

private final long length;

public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}

Expand All @@ -51,11 +46,11 @@ public MemoryBlock memoryBlock() {
}

public Object getBaseObject() {
return baseObj;
return memory.getBaseObject();
}

public long getBaseOffset() {
return baseOffset;
return memory.getBaseOffset();
}

/**
Expand All @@ -69,8 +64,9 @@ public long size() {
* Fill this all with 0L.
*/
public void zeroOut() {
long baseOffset = memory.getBaseOffset();
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the off should starts with 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. These three unchanges seems to lead to failures.

Platform.putLong(baseObj, off, 0);
memory.putLong(off, 0);
}
}

Expand All @@ -80,7 +76,7 @@ public void zeroOut() {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update it to use 0-based offset.

memory.putLong(memory.getBaseOffset() + index * WIDTH, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future, shall we change the usage to memory.putLong(index * WIDTH, value)? One concern might be, under the hood, we need to do an add operation for each call, while we can avoid it by using a loop, e.g. https://github.com/apache/spark/pull/19222/files#diff-ecd88449f17ed41b7ce71d70a09429abR67

Copy link
Member Author

@kiszk kiszk Mar 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan good point. We will want use memory.putLong(index * WIDTH, value).
I expect that JIT compiler could move loop invariants out of a loop or could map the sequence to a scaled index instruction move targetreg, [basereg + scalereg * constant + offsetreg] well.

I will investigate what happens in the native code using the example that you pointed out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan sorry for my delay. I was busy with several stuffs.
I ran a benchmark program for LongArray.zeroOut with two version. One is current implementation. The other is future implementation like memory.putLong(index * WIDTH, value). I got almost the same performance using two versions.
WDYT?

OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Platform MemoryAccess:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
zeroOutCurrent                                1066 / 1071       1259.4           0.8       1.0X
zeroOutFuture                                 1061 / 1063       1265.6           0.8       1.0X
class MemoryBlockLoopAccessBenchmark extends SparkFunSuite {
  test("benchmark") {
    val N = 128 * 1024 * 1024
    val iters = 2
    val M = 5
    val benchmark = new Benchmark("Platform MemoryAccess", 1L * M * N * iters,
      minNumIters = 20)

    val array = new Array[Long](N)
    val mb = OnHeapMemoryBlock.fromArray(array)
    val la = new LongArray(mb)

    benchmark.addCase("zeroOutCurrent") { _: Int =>
      for (_ <- 0L until iters) {
        var i = 0
        while (i < M) {
          la.zeroOut()
          i += 1
        }
      }
    }

    benchmark.addCase("zeroOutFuture") { _: Int =>
      for (_ <- 0L until iters) {
        var i = 0
        while (i < M) {
          la.zeroOutFuture()
          i += 1
        }
      }
    }

    benchmark.run()
  }
}

public final class LongArray {
  ...
  public void zeroOut() {
    long baseOffset = memory.getBaseOffset();
    for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
      memory.putLong(off, 0);
    }
  }

  public void zeroOutFuture() {
    for (long off = 0; off < length * WIDTH; off += WIDTH) {
      memory.putLong2(off, 0);
    }
  }
  ...
}

public final class OnHeapMemoryBlock extends MemoryBlock {
  ...
  @Override
  public final void putLong(long offset, long value) {
    Platform.putLong(array, offset, value);
  }
  @Override
  public final void putLong2(long ofs, long value) {
    Platform.putLong(array, ofs + offset, value);
  }
  ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your data! I think it proves that we should make the API clearer and hide these offsets stuff in the implementation. If we could fix this in this patch, I think we can encourage more people to use the new way to operate memory. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can encourage more people to use the new memory management and its API.
On the other hand, it would be good to take two-step approach:

  1. Use new MemoryBlock with the current explicit offset usage
  2. Hide offset stuff in the implementation.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that, the semantic of the offset argument is pretty important to the MemoryBlock component. If we don't do it right at the first version, we bring a bad history to MemoryBlock, which may confuse other people.

}

/**
Expand All @@ -89,6 +85,6 @@ public void set(int index, long value) {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
return Platform.getLong(baseObj, baseOffset + index * WIDTH);
return memory.getLong(memory.getBaseOffset() + index * WIDTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.bitset;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary import?


/**
* Methods for working with fixed-size uncompressed bitsets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.hash;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
Expand Down Expand Up @@ -49,55 +50,81 @@ public static int hashInt(int input, int seed) {
}

public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
return hashUnsafeWords(base, offset, lengthInBytes, seed);
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
int lengthInBytes = (int)base.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

@kiszk kiszk Mar 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this requires int, I will add overflow check.

assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
int h1 = hashBytesByIntBlock(base, seed);
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need this, it's checked in hashUnsafeWordsBlock

int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
int h1 = hashBytesByIntBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more consistent to call hashUnsafeWordsBlock here.

return fmix(h1, lengthInBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this line now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for good catch. I realized that current UTs do not test hashUnsafeWords well.

}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
long offset = base.getBaseOffset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to leave a comment why we have this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just call hashUnsafeBytes with the block's baseObject, baseOffset and size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to leave a comment at every method with ...ByteBlock(?

Copy link
Member Author

@kiszk kiszk Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering to change to call hashUnsafeBytesBlock() from hashUnsafeBytes(). It can also reduce the duplication. Does it make sense?
This is because hashUnsafeBytesBlock() is faster than hashUnsafeBytes(). Also, we will remove hashUnsafeBytes() in another PR.

Would you please share your motivation to call hashUnsafeBytes from hashUnsafeBytesBlock?
`

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems they are somehow duplicate.

Copy link
Member Author

@kiszk kiszk Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, to eliminate duplication, we will call hashUnsafeBytesBlock() from hashUnsafeBytes().
cc: @cloud-fan

int lengthInBytes = (int)base.size();
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int halfWord = Platform.getByte(base, offset + i);
int halfWord = base.getByte(offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just call hashUnsafeBytes2 with the baseObject, baseOffset and size of base?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
long offset = base.getBaseOffset();
int lengthInBytes = (int)base.size();
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
k1 ^= (base.getByte(offset + i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}

private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
private static int hashBytesByIntBlock(MemoryBlock base, int seed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just call hashBytesByInt(base.getBaseObject(), base.getBaseOffset(), (int)base.size())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

long offset = base.getBaseOffset();
int lengthInBytes = (int)base.size();
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = Platform.getInt(base, offset + i);
int halfWord = base.getInt(offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return h1;
}

private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
return hashBytesByIntBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public int hashLong(long input) {
return hashLong(input, seed);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;

import org.apache.spark.unsafe.Platform;

/**
* A consecutive block of memory with a byte array on Java heap.
*/
public final class ByteArrayMemoryBlock extends MemoryBlock {

private final byte[] array;

public ByteArrayMemoryBlock(byte[] obj, long offset, long length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for convenient we can add

public ByteArrayMemoryBlock(int size) {
  this(new byte[size], Platform.BYTE_ARRAY_OFFSET, size)
}

super(obj, offset, (long)length);
this.array = obj;
}

public ByteArrayMemoryBlock(long length) {
this(new byte[(int)length], Platform.BYTE_ARRAY_OFFSET, length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check length overflow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to check. Now, I remember good check method Ints.checkedCast().

}

@Override
public MemoryBlock subBlock(long offset, long size) {
return new ByteArrayMemoryBlock(array, this.offset + offset, size);
}

public byte[] getByteArray() { return array; }

/**
* Creates a memory block pointing to the memory used by the byte array.
*/
public static ByteArrayMemoryBlock fromArray(final byte[] array) {
return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
}

@Override
public final int getInt(long offset) {
return Platform.getInt(array, offset);
}

@Override
public final void putInt(long offset, int value) {
Platform.putInt(array, offset, value);
}

@Override
public final boolean getBoolean(long offset) {
return Platform.getBoolean(array, offset);
}

@Override
public final void putBoolean(long offset, boolean value) {
Platform.putBoolean(array, offset, value);
}

@Override
public final byte getByte(long offset) {
return array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)];
}

@Override
public final void putByte(long offset, byte value) {
array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)] = value;
}

@Override
public final short getShort(long offset) {
return Platform.getShort(array, offset);
}

@Override
public final void putShort(long offset, short value) {
Platform.putShort(array, offset, value);
}

@Override
public final long getLong(long offset) {
return Platform.getLong(array, offset);
}

@Override
public final void putLong(long offset, long value) {
Platform.putLong(array, offset, value);
}

@Override
public final float getFloat(long offset) {
return Platform.getFloat(array, offset);
}

@Override
public final void putFloat(long offset, float value) {
Platform.putFloat(array, offset, value);
}

@Override
public final double getDouble(long offset) {
return Platform.getDouble(array, offset);
}

@Override
public final void putDouble(long offset, double value) {
Platform.putDouble(array, offset, value);
}
}
Loading