-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Spark3 ZOrder Rewrite Strategy #3983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
7cb94b2
92516f7
ef1d214
545e373
1374247
2c48f0c
55fa4c8
8c7eef7
62a74b9
0bdabea
1e7e660
41d855c
2dfad57
a3e8543
fa2add8
6974f45
ba43cae
a50b496
82bfb07
e96b020
14637da
2e68428
859c558
46b1a16
a6981c8
ecf04d8
e5fdd4b
f4a100d
49a9703
bec34e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
|
|
||
| package org.apache.iceberg.util; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Random; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.Setup; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.Threads; | ||
| import org.openjdk.jmh.annotations.Timeout; | ||
| import org.openjdk.jmh.infra.Blackhole; | ||
|
|
||
| @Fork(1) | ||
| @State(Scope.Benchmark) | ||
| @Measurement(iterations = 5) | ||
| @BenchmarkMode(Mode.SingleShotTime) | ||
| @Timeout(time = 1000, timeUnit = TimeUnit.HOURS) | ||
| public class ZOrderByteUtilsBenchmark { | ||
|
|
||
| private static final int NUM_ENTRIES = 10000000; | ||
|
|
||
| private byte[][][] fourColumnInput; | ||
| private byte[][][] threeColumnInput; | ||
| private byte[][][] twoColumnInput; | ||
|
|
||
| @Setup | ||
| public void setupBench() { | ||
| Random rand = new Random(42); | ||
| fourColumnInput = new byte[NUM_ENTRIES][][]; | ||
| threeColumnInput = new byte[NUM_ENTRIES][][]; | ||
| twoColumnInput = new byte[NUM_ENTRIES][][]; | ||
| for (int i = 0; i < NUM_ENTRIES; i++) { | ||
| fourColumnInput[i] = new byte[4][]; | ||
| threeColumnInput[i] = new byte[3][]; | ||
| twoColumnInput[i] = new byte[2][]; | ||
| for (int j = 0; j < 4; j++) { | ||
| byte[] value = ByteBuffer.allocate(Long.BYTES).putLong(rand.nextLong()).array(); | ||
| if (j < 2) { | ||
| twoColumnInput[i][j] = value; | ||
| } | ||
| if (j < 3) { | ||
| threeColumnInput[i][j] = value; | ||
| } | ||
| fourColumnInput[i][j] = value; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Benchmark | ||
| @Threads(1) | ||
| public void interleaveValuesFourColumns(Blackhole blackhole) { | ||
| int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 4; | ||
| ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); | ||
|
|
||
| for (int i = 0; i < fourColumnInput.length; i++) { | ||
| byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(fourColumnInput[i], outputSize, outputBuffer); | ||
| blackhole.consume(interleavedBytes); | ||
| } | ||
| } | ||
|
|
||
| @Benchmark | ||
| @Threads(1) | ||
| public void interleaveValuesThreeColumns(Blackhole blackhole) { | ||
| int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 3; | ||
| ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); | ||
|
|
||
| for (int i = 0; i < fourColumnInput.length; i++) { | ||
| byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(threeColumnInput[i], outputSize, outputBuffer); | ||
| blackhole.consume(interleavedBytes); | ||
| } | ||
| } | ||
|
|
||
| @Benchmark | ||
| @Threads(1) | ||
| public void interleaveValuesTwoColumns(Blackhole blackhole) { | ||
| int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 2; | ||
| ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); | ||
|
|
||
| for (int i = 0; i < fourColumnInput.length; i++) { | ||
| byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(twoColumnInput[i], outputSize, outputBuffer); | ||
| blackhole.consume(interleavedBytes); | ||
| } | ||
| } | ||
|
|
||
| @Benchmark | ||
| @Threads(1) | ||
| public void interleaveValuesFourColumns8ByteOutput(Blackhole blackhole) { | ||
| int outputSize = 8; | ||
| ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); | ||
|
|
||
| for (int i = 0; i < fourColumnInput.length; i++) { | ||
| byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(fourColumnInput[i], outputSize, outputBuffer); | ||
| blackhole.consume(interleavedBytes); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,214 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this relate to #3960?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see comment below about splitting it out.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep sorry, this pr is built on top of 3960 so hopefully we can get things in faster. |
||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.util; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.nio.CharBuffer; | ||
| import java.nio.charset.CharsetEncoder; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Arrays; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| /** | ||
| * Within Z-Ordering the byte representations of objects being compared must be ordered, | ||
| * this requires several types to be transformed when converted to bytes. The goal is to | ||
| * map object's whose byte representation are not lexicographically ordered into representations | ||
| * that are lexicographically ordered. Bytes produced should be compared lexicographically as | ||
| * unsigned bytes, big-endian. | ||
| * <p> | ||
| * All types except for String are stored within an 8 Byte Buffer | ||
| * <p> | ||
| * Most of these techniques are derived from | ||
| * https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/ | ||
| * <p> | ||
| * Some implementation is taken from | ||
| * https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/OrderedBytes.java | ||
| */ | ||
| public class ZOrderByteUtils { | ||
|
|
||
| public static final int PRIMITIVE_BUFFER_SIZE = 8; | ||
|
|
||
| private ZOrderByteUtils() { | ||
|
|
||
|
||
| } | ||
|
|
||
| static ByteBuffer allocatePrimitiveBuffer() { | ||
| return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE); | ||
| } | ||
|
|
||
| /** | ||
| * Signed ints do not have their bytes in magnitude order because of the sign bit. | ||
| * To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially | ||
| * shifts the 0 value so that we don't break our ordering when we cross the new 0 value. | ||
| */ | ||
| public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); | ||
| bytes.putLong(((long) val) ^ 0x8000000000000000L); | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} | ||
| */ | ||
| public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); | ||
| bytes.putLong(val ^ 0x8000000000000000L); | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} | ||
| */ | ||
| public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); | ||
| bytes.putLong(((long) val) ^ 0x8000000000000000L); | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} | ||
| */ | ||
| public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); | ||
| bytes.putLong(((long) val) ^ 0x8000000000000000L); | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * IEEE 754 : | ||
| * “If two floating-point numbers in the same format are ordered (say, x {@literal <} y), | ||
| * they are ordered the same way when their bits are reinterpreted as sign-magnitude integers.” | ||
| * | ||
| * Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically | ||
| * comparable bytes | ||
| */ | ||
| public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); | ||
| long lval = Double.doubleToLongBits(val); | ||
| lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE); | ||
| bytes.putLong(lval); | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)} | ||
| */ | ||
| public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); | ||
| long lval = Double.doubleToLongBits(val); | ||
| lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE); | ||
| bytes.putLong(lval); | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Strings are lexicographically sortable BUT if different byte array lengths will | ||
| * ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time). | ||
| * This implementation just uses a set size to for all output byte representations. Truncating longer strings | ||
| * and right padding 0 for shorter strings. | ||
| */ | ||
| public static ByteBuffer stringToOrderedBytes(String val, int length, ByteBuffer reuse, CharsetEncoder encoder) { | ||
| Preconditions.checkArgument(encoder.charset().equals(StandardCharsets.UTF_8), | ||
| "Cannot use an encoder not using UTF_8 as it's Charset"); | ||
|
|
||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, length); | ||
| Arrays.fill(bytes.array(), 0, length, (byte) 0x00); | ||
| if (val != null) { | ||
| CharBuffer inputBuffer = CharBuffer.wrap(val); | ||
| encoder.encode(inputBuffer, bytes, true); | ||
| } | ||
| return bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Return a bytebuffer with the given bytes truncated to length, or filled with 0's to length depending on whether | ||
| * the given bytes are larger or smaller than the given length. | ||
| */ | ||
| public static ByteBuffer byteTruncateOrFill(byte[] val, int length, ByteBuffer reuse) { | ||
| ByteBuffer bytes = ByteBuffers.reuse(reuse, length); | ||
| if (val.length < length) { | ||
| bytes.put(val, 0, val.length); | ||
| Arrays.fill(bytes.array(), val.length, length, (byte) 0x00); | ||
| } else { | ||
| bytes.put(val, 0, length); | ||
| } | ||
| return bytes; | ||
| } | ||
|
|
||
| static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) { | ||
| return interleaveBits(columnsBinary, interleavedSize, ByteBuffer.allocate(interleavedSize)); | ||
| } | ||
|
|
||
| /** | ||
| * Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is | ||
| * required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all | ||
| * columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the | ||
| * interleaving. | ||
| * @param columnsBinary an array of ordered byte representations of the columns being ZOrdered | ||
| * @param interleavedSize the number of bytes to use in the output | ||
| * @return the columnbytes interleaved | ||
| */ | ||
| public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) { | ||
| byte[] interleavedBytes = reuse.array(); | ||
| int sourceColumn = 0; | ||
| int sourceByte = 0; | ||
| int sourceBit = 7; | ||
| int interleaveByte = 0; | ||
| int interleaveBit = 7; | ||
|
|
||
| while (interleaveByte < interleavedSize) { | ||
| // Take the source bit from source byte and move it to the output bit position | ||
| interleavedBytes[interleaveByte] |= | ||
| (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit; | ||
| --interleaveBit; | ||
|
|
||
| // Check if an output byte has been completed | ||
| if (interleaveBit == -1) { | ||
| // Move to the next output byte | ||
| interleaveByte++; | ||
| // Move to the highest order bit of the new output byte | ||
| interleaveBit = 7; | ||
| } | ||
|
|
||
| // Check if the last output byte has been completed | ||
| if (interleaveByte == interleavedSize) { | ||
| break; | ||
| } | ||
|
|
||
| // Find the next source bit to interleave | ||
| do { | ||
| // Move to next column | ||
| ++sourceColumn; | ||
| if (sourceColumn == columnsBinary.length) { | ||
| // If the last source column was used, reset to next bit of first column | ||
| sourceColumn = 0; | ||
| --sourceBit; | ||
| if (sourceBit == -1) { | ||
| // If the last bit of the source byte was used, reset to the highest bit of the next byte | ||
| sourceByte++; | ||
| sourceBit = 7; | ||
| } | ||
| } | ||
| } while (columnsBinary[sourceColumn].length <= sourceByte); | ||
| } | ||
| return interleavedBytes; | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember in the original design zOrder is considered a part of the sort order, so it makes more sense to me for the input here to be
SortOrderinstead ofString... columns. I guess this will be a shortcut for something likeSortOrder.builderFor(schema).zOrder().columns(a,b,c).bitWidth(10)when that interface is out? But how do people define things like string column width to cutoff in this case?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
End users would not define any of the inner parameters for z ordering, Ideally in a follow up version we take some basic statistics from the data set and use that to develop our z ordering. I think providing any additional user config here is essentially just giving users a bunch of very difficult knobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have any current thoughts about how this would be determined by statistics? I always imagined some human decision is still needed to tune the tradeoffs among different configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For most types my goal would be to bound and the condense the byte representation domain. So for example a column which has many values that translate to
We want to shift to
This involves finding min and max values, binning within that range and shifting left
At least those are my current thoughts