Skip to content
Closed
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/ByteBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class ByteBuffers {

Expand All @@ -46,6 +47,15 @@ public static byte[] toByteArray(ByteBuffer buffer) {
}
}

public static ByteBuffer reuse(ByteBuffer reuse, int length) {
Preconditions.checkArgument(reuse.hasArray() && reuse.arrayOffset() == 0 && reuse.capacity() == length,
"Cannot reuse buffer: Should be an array %s, should have an offset of 0 %s, should be of size %s was %s",
reuse.hasArray(), reuse.arrayOffset(), length, reuse.capacity());
reuse.position(0);
reuse.limit(length);
return reuse;
}

public static ByteBuffer copy(ByteBuffer buffer) {
if (buffer == null) {
return null;
Expand Down
84 changes: 50 additions & 34 deletions core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,35 @@ private ZOrderByteUtils() {
* To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially
* shifts the 0 value so that we don't break our ordering when we cross the new 0 value.
*/
public static byte[] intToOrderedBytes(int val) {
ByteBuffer bytes = ByteBuffer.allocate(Integer.BYTES);
public static byte[] intToOrderedBytes(int val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Integer.BYTES);
bytes.putInt(val ^ 0x80000000);
return bytes.array();
}

/**
* Signed longs are treated the same as the signed ints
* Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
*/
public static byte[] longToOrderBytes(long val) {
ByteBuffer bytes = ByteBuffer.allocate(Long.BYTES);
public static byte[] longToOrderedBytes(long val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Long.BYTES);
bytes.putLong(val ^ 0x8000000000000000L);
return bytes.array();
}

/**
* Signed shorts are treated the same as the signed ints
* Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
*/
public static byte[] shortToOrderBytes(short val) {
ByteBuffer bytes = ByteBuffer.allocate(Short.BYTES);
public static byte[] shortToOrderedBytes(short val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Short.BYTES);
bytes.putShort((short) (val ^ (0x8000)));
return bytes.array();
}

/**
* Signed tiny ints are treated the same as the signed ints
* Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
*/
public static byte[] tinyintToOrderedBytes(byte val) {
ByteBuffer bytes = ByteBuffer.allocate(Byte.BYTES);
public static byte[] tinyintToOrderedBytes(byte val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Byte.BYTES);
bytes.put((byte) (val ^ (0x80)));
return bytes.array();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return the ByteBuffer instead since we're passing in a ByteBuffer?

}
Expand All @@ -85,19 +85,19 @@ public static byte[] tinyintToOrderedBytes(byte val) {
* Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically
* comparable bytes
*/
public static byte[] floatToOrderedBytes(float val) {
ByteBuffer bytes = ByteBuffer.allocate(Integer.BYTES);
public static byte[] floatToOrderedBytes(float val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Float.BYTES);
int ival = Float.floatToIntBits(val);
ival ^= ((ival >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
bytes.putInt(ival);
return bytes.array();
}

/**
* Doubles are treated the same as floats
* Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)}
*/
public static byte[] doubleToOrderedBytes(double val) {
ByteBuffer bytes = ByteBuffer.allocate(Long.BYTES);
public static byte[] doubleToOrderedBytes(double val, ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, Double.BYTES);
long lng = Double.doubleToLongBits(val);
lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
bytes.putLong(lng);
Expand All @@ -108,54 +108,70 @@ public static byte[] doubleToOrderedBytes(double val) {
* Strings are lexicographically sortable BUT if different byte array lengths will
* ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time).
* This implementation just uses a set size to for all output byte representations. Truncating longer strings
* and right padding 0 for shorter strings.
* and right padding 0 for shorter strings. Requires UTF8 (or ASCII) encoding for ordering guarantees to hold.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't add a comment. Just replace String.getBytes() with String.getBytes(java.nio.charset.StandardCharsets#UTF_8).

Of course, i don't know the rules of the project, but otherwise i don't see why we would want to rely on JVM's default encoding. This is z-order, not sorting, but z-order should be consistent with sorting, and varchar value ordering is codepoint-based (in the absence of collation which doesn't exist in Iceberg world). The UTF-8 encoding produces bytes that, if treated as unsigned, lead to the same order as codepoint-based lexicographical ordering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should ensure that we use UTF-8 to convert to bytes.

*/
public static byte[] stringToOrderedBytes(String val, int length) {
ByteBuffer bytes = ByteBuffer.allocate(length);
public static byte[] stringToOrderedBytes(String val, int length, ByteBuffer reuse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the length going to be calculated in practice?
Is it going to be a fixed prefix, like 128 or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debate we were having before was wether to limit it to the max common length of columns, or whether to let it go beyond that.

Like with (A, B, CC, DDD)

Do you return

A. : All bytes that are available
ABCDCDD

B. : All bytes that can be interleaved with at least one other column
ABCDCD

Or

C. : All bytes that can be interleaved with all other columns
ABCD

My current implementation in Spark just does A which is the sum of all column lengths, but we could do B and save some space at the cost of losing a bit of single column ordering. I don't think C actually makes a lot of sense unless we do some hind of bin hashing and actually generate bytes all of the same size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the our Spark implementation (see the follow up to this pr) we just set a length for the moment. Mostly to save us from unbounded strings increasing the size of our z-order algorithm and making sure the column always contributes the same number of bytes.

In the future I hope we can use statistics to do something smarter, or maybe not even use truncation and use some binning function or something like that based on the range of values in the column.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A. : All bytes that are available

i think this is what we should do (Maybe with some max length for safety) because

  1. z-order on 1 column should be consistent with sort on that column
  2. adding a low-cardinality, short column (like boolean, or tinyint, or varchar(1) if we had it), should not reduce z-order sorting strength; so one short columns should not stop us from ordering longer columns

ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
if (val != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you update this to accept a reused buffer, then we need to remember to zero out the bytes.

int maxLength = Math.min(length, val.length());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val.length() is not "encoding-aware". Maybe add a comment that it doesn't matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, don't add a comment. We must take byte-length into account, otherwise we would be truncating short non-all-ASCII string values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... an example would be String "ą", single character, so length=1.
We would copy only the first byte of the two-byte sequence (0xC4 0x85) for the letter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH sorry I thought you were talking about something else, yeah let me switch this to the byte length of the encoded string

// We may truncate mid-character
bytes.put(val.getBytes(), 0, maxLength);
Copy link
Member

@findepi findepi Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The encoding must be fixed, and it must be UTF-8, if we want the ordering of the produced bytes be coherent with value ordering.

Also byte order will match string/varchar order only if bytes are treated as unsigned.
I guess he unsignedness applies to other types, so maybe it's documented already, i didn't notice.

}
return bytes.array();
}

/**
* Interleave bits using a naive loop.
* @param columnsBinary an array of byte arrays, none of which are empty
* @return their bits interleaved
* Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is
* required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all
* columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the
* interleaving.
* @param columnsBinary an array of ordered byte representations of the columns being ZOrdered
* @return the columnbytes interleaved
*/
public static byte[] interleaveBits(byte[][] columnsBinary) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mention it at first because it is an optimization but an alternative to this is a builder pattern that has specific methods for int/long/float/double/byte[] and a predertmined number of dimensions and a bit-width for each dimension. then the addFloat/addInt methods could be called in the correct sequence. I don't know if the JVM is smart enough to compile the ByteBuffer.of(...).put pattern to simple casts, but with the builder methods these could be avoided.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be a good thing for a future improvement. If you see the Spark request has some benchmarks and even with wrapping all these functions in UDFS and applying them to rows that way, it's still only about 2~3 x slower than sort with as many expressions. So I think the perf is probably ok to start with.

int interleavedSize = Arrays.stream(columnsBinary).mapToInt(a -> a.length).sum();
byte[] interleavedBytes = new byte[interleavedSize];
int sourceBit = 7;
int sourceByte = 0;
int sourceColumn = 0;
int interleaveBit = 7;
int sourceByte = 0;
int sourceBit = 7;
int interleaveByte = 0;
while (interleaveByte < interleavedSize) {
// Take what we have, Get the source Bit of the source Byte, move it to the interleaveBit position
interleavedBytes[interleaveByte] =
(byte) (interleavedBytes[interleaveByte] |
(columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >> sourceBit << interleaveBit);
int interleaveBit = 7;

while (interleaveByte < interleavedSize) {
// Take the source bit from source byte and move it to the output bit position
interleavedBytes[interleaveByte] |=
(columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit;
--interleaveBit;

// Check if an output byte has been completed
if (interleaveBit == -1) {
// Finished a byte in our interleave byte array start a new byte
// Move to the next output byte
interleaveByte++;
// Move to the highest order bit of the new output byte
interleaveBit = 7;
}

// Find next column with a byte we can use
// Check if the last output byte has been completed
if (interleaveByte == interleavedSize) {
break;
}

// Find the next source bit to interleave
do {
// Move to next column
++sourceColumn;
if (sourceColumn == columnsBinary.length) {
// If the last source column was used, reset to next bit of first column
sourceColumn = 0;
if (--sourceBit == -1) {
--sourceBit;
if (sourceBit == -1) {
// If the last bit of the source byte was used, reset to the highest bit of the next byte
sourceByte++;
sourceBit = 7;
}
}
} while (columnsBinary[sourceColumn].length <= sourceByte && interleaveByte < interleavedSize);
} while (columnsBinary[sourceColumn].length <= sourceByte);
}
return interleavedBytes;
}
Expand Down
50 changes: 33 additions & 17 deletions core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.iceberg.util;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import org.apache.iceberg.relocated.com.google.common.primitives.UnsignedBytes;
Expand All @@ -36,6 +37,7 @@ public class TestZOrderByteUtil {
private static final byte OOOOOOOO = (byte) 0;

private static final int NUM_TESTS = 100000;
private static final int NUM_INTERLEAVE_TESTS = 1000;

private final Random random = new Random(42);

Expand Down Expand Up @@ -84,7 +86,7 @@ private String interleaveStrings(String[] strings) {
*/
@Test
public void testInterleaveRandomExamples() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

takes 17-18 seconds to run on my local machine, just wondering whether we want to have such long running unit tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably @Ignore this test, but it's good to see it working.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can cut down the number of example :) I just really wanted to have some kind of fuzzing since I know we are going to want to optimize the interleave example with fancier byte tricks in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the number of trials run so now this takes a few hundred ms. The longest test in the suite now is the String compare test which is around 1 second on my 2019 MBP

for (int test = 0; test < NUM_TESTS; test++) {
for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) {
int numByteArrays = Math.abs(random.nextInt(6)) + 1;
byte[][] testBytes = new byte[numByteArrays][];
String[] testStrings = new String[numByteArrays];
Expand Down Expand Up @@ -141,12 +143,14 @@ public void testInterleaveMixedBits() {

@Test
public void testIntOrdering() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good tests.

ByteBuffer aBuffer = ByteBuffer.allocate(Integer.BYTES);
ByteBuffer bBuffer = ByteBuffer.allocate(Integer.BYTES);
for (int i = 0; i < NUM_TESTS; i++) {
int aInt = random.nextInt();
int bInt = random.nextInt();
int intCompare = Integer.signum(Integer.compare(aInt, bInt));
byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt);
byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt);
byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt, aBuffer);
byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand All @@ -158,12 +162,14 @@ public void testIntOrdering() {

@Test
public void testLongOrdering() {
ByteBuffer aBuffer = ByteBuffer.allocate(Long.BYTES);
ByteBuffer bBuffer = ByteBuffer.allocate(Long.BYTES);
for (int i = 0; i < NUM_TESTS; i++) {
long aLong = random.nextInt();
long bLong = random.nextInt();
int longCompare = Integer.signum(Long.compare(aLong, bLong));
byte[] aBytes = ZOrderByteUtils.longToOrderBytes(aLong);
byte[] bBytes = ZOrderByteUtils.longToOrderBytes(bLong);
byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong, aBuffer);
byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand All @@ -175,12 +181,14 @@ public void testLongOrdering() {

@Test
public void testShortOrdering() {
ByteBuffer aBuffer = ByteBuffer.allocate(Short.BYTES);
ByteBuffer bBuffer = ByteBuffer.allocate(Short.BYTES);
for (int i = 0; i < NUM_TESTS; i++) {
short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1));
short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1));
int longCompare = Integer.signum(Long.compare(aShort, bShort));
byte[] aBytes = ZOrderByteUtils.longToOrderBytes(aShort);
byte[] bBytes = ZOrderByteUtils.longToOrderBytes(bShort);
byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort, aBuffer);
byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand All @@ -192,12 +200,14 @@ public void testShortOrdering() {

@Test
public void testTinyOrdering() {
ByteBuffer aBuffer = ByteBuffer.allocate(Byte.BYTES);
ByteBuffer bBuffer = ByteBuffer.allocate(Byte.BYTES);
for (int i = 0; i < NUM_TESTS; i++) {
long aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
long bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
int longCompare = Integer.signum(Long.compare(aByte, bByte));
byte[] aBytes = ZOrderByteUtils.longToOrderBytes(aByte);
byte[] bBytes = ZOrderByteUtils.longToOrderBytes(bByte);
byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte, aBuffer);
byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand All @@ -209,12 +219,14 @@ public void testTinyOrdering() {

@Test
public void testFloatOrdering() {
ByteBuffer aBuffer = ByteBuffer.allocate(Float.BYTES);
ByteBuffer bBuffer = ByteBuffer.allocate(Float.BYTES);
for (int i = 0; i < NUM_TESTS; i++) {
float aFloat = random.nextFloat();
float bFloat = random.nextFloat();
int floatCompare = Integer.signum(Float.compare(aFloat, bFloat));
byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat);
byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat);
byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat, aBuffer);
byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand All @@ -226,12 +238,14 @@ public void testFloatOrdering() {

@Test
public void testDoubleOrdering() {
ByteBuffer aBuffer = ByteBuffer.allocate(Double.BYTES);
ByteBuffer bBuffer = ByteBuffer.allocate(Double.BYTES);
for (int i = 0; i < NUM_TESTS; i++) {
double aDouble = random.nextDouble();
double bDouble = random.nextDouble();
int doubleCompare = Integer.signum(Double.compare(aDouble, bDouble));
byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble);
byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble);
byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble, aBuffer);
byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand All @@ -243,12 +257,14 @@ public void testDoubleOrdering() {

@Test
public void testStringOrdering() {
ByteBuffer aBuffer = ByteBuffer.allocate(128);
ByteBuffer bBuffer = ByteBuffer.allocate(128);
for (int i = 0; i < NUM_TESTS; i++) {
String aString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random);
String bString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random);
int stringCompare = Integer.signum(aString.compareTo(bString));
byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128);
byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128);
byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, aBuffer);
byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, bBuffer);
int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));

Assert.assertEquals(String.format(
Expand Down