diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index f00596fa46e8..4ed57716603a 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -129,6 +129,15 @@ default RewriteDataFiles sort(SortOrder sortOrder) { throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework"); } + /** + * Choose Z-ORDER as a strategy for this rewrite operation with a specified list of columns to use + * @param columns Columns to be used to generate Z-Values + * @return this for method chaining + */ + default RewriteDataFiles zOrder(String... columns) { + throw new UnsupportedOperationException("Z-ORDER Rewrite Strategy not implemented for this framework"); + } + /** * A user provided filter for determining which files will be considered by the rewrite strategy. This will be used * in addition to whatever rules the rewrite strategy generates. For example this would be used for providing a diff --git a/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java index 213b222dc507..4a5001018da6 100644 --- a/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java +++ b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class ByteBuffers { @@ -46,6 +47,16 @@ public static byte[] toByteArray(ByteBuffer buffer) { } } + public static ByteBuffer reuse(ByteBuffer reuse, int length) { + Preconditions.checkArgument(reuse.hasArray(), "Cannot reuse a buffer not backed by an array"); + Preconditions.checkArgument(reuse.arrayOffset() == 0, "Cannot reuse a buffer whose array offset is not 0"); + Preconditions.checkArgument(reuse.capacity() == length, + "Canout use a buffer whose capacity (%s) is not equal to the requested length (%s)", length, reuse.capacity()); + reuse.position(0); + reuse.limit(length); + return reuse; + } + public static ByteBuffer copy(ByteBuffer buffer) { if (buffer == null) { return null; diff --git a/core/src/jmh/java/org/apache/iceberg/util/ZOrderByteUtilsBenchmark.java b/core/src/jmh/java/org/apache/iceberg/util/ZOrderByteUtilsBenchmark.java new file mode 100644 index 000000000000..77f66f12cff3 --- /dev/null +++ b/core/src/jmh/java/org/apache/iceberg/util/ZOrderByteUtilsBenchmark.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.infra.Blackhole; + +@Fork(1) +@State(Scope.Benchmark) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +@Timeout(time = 1000, timeUnit = TimeUnit.HOURS) +public class ZOrderByteUtilsBenchmark { + + private static final int NUM_ENTRIES = 10000000; + + private byte[][][] fourColumnInput; + private byte[][][] threeColumnInput; + private byte[][][] twoColumnInput; + + @Setup + public void setupBench() { + Random rand = new Random(42); + fourColumnInput = new byte[NUM_ENTRIES][][]; + threeColumnInput = new byte[NUM_ENTRIES][][]; + twoColumnInput = new byte[NUM_ENTRIES][][]; + for (int i = 0; i < NUM_ENTRIES; i++) { + fourColumnInput[i] = new byte[4][]; + threeColumnInput[i] = new byte[3][]; + twoColumnInput[i] = new byte[2][]; + for (int j = 0; j < 4; j++) { + byte[] value = ByteBuffer.allocate(Long.BYTES).putLong(rand.nextLong()).array(); + if (j < 2) { + twoColumnInput[i][j] = value; + } + if (j < 3) { + threeColumnInput[i][j] = value; + } + fourColumnInput[i][j] = value; + } + } + } + + @Benchmark + @Threads(1) + public void interleaveValuesFourColumns(Blackhole blackhole) { + int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 4; + ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); + + for (int i = 0; i < fourColumnInput.length; i++) { + byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(fourColumnInput[i], outputSize, outputBuffer); + blackhole.consume(interleavedBytes); + } + } + + @Benchmark + @Threads(1) + public void interleaveValuesThreeColumns(Blackhole blackhole) { + int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 3; + ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); + + for (int i = 0; i < fourColumnInput.length; i++) { + byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(threeColumnInput[i], outputSize, outputBuffer); + blackhole.consume(interleavedBytes); + } + } + + @Benchmark + @Threads(1) + public void interleaveValuesTwoColumns(Blackhole blackhole) { + int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 2; + ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); + + for (int i = 0; i < fourColumnInput.length; i++) { + byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(twoColumnInput[i], outputSize, outputBuffer); + blackhole.consume(interleavedBytes); + } + } + + @Benchmark + @Threads(1) + public void interleaveValuesFourColumns8ByteOutput(Blackhole blackhole) { + int outputSize = 8; + ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize); + + for (int i = 0; i < fourColumnInput.length; i++) { + byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(fourColumnInput[i], outputSize, outputBuffer); + blackhole.consume(interleavedBytes); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java new file mode 100644 index 000000000000..8a1b419a3bb0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Within Z-Ordering the byte representations of objects being compared must be ordered, + * this requires several types to be transformed when converted to bytes. The goal is to + * map object's whose byte representation are not lexicographically ordered into representations + * that are lexicographically ordered. Bytes produced should be compared lexicographically as + * unsigned bytes, big-endian. + *

+ * All types except for String are stored within an 8 Byte Buffer + *

+ * Most of these techniques are derived from + * https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/ + *

+ * Some implementation is taken from + * https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/OrderedBytes.java + */ +public class ZOrderByteUtils { + + public static final int PRIMITIVE_BUFFER_SIZE = 8; + + private ZOrderByteUtils() { + } + + static ByteBuffer allocatePrimitiveBuffer() { + return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE); + } + + /** + * Signed ints do not have their bytes in magnitude order because of the sign bit. + * To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially + * shifts the 0 value so that we don't break our ordering when we cross the new 0 value. + */ + public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(((long) val) ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(val ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(((long) val) ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(((long) val) ^ 0x8000000000000000L); + return bytes; + } + + /** + * IEEE 754 : + * “If two floating-point numbers in the same format are ordered (say, x {@literal <} y), + * they are ordered the same way when their bits are reinterpreted as sign-magnitude integers.” + * + * Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically + * comparable bytes + */ + public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + long lval = Double.doubleToLongBits(val); + lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE); + bytes.putLong(lval); + return bytes; + } + + /** + * Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)} + */ + public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + long lval = Double.doubleToLongBits(val); + lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE); + bytes.putLong(lval); + return bytes; + } + + /** + * Strings are lexicographically sortable BUT if different byte array lengths will + * ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time). + * This implementation just uses a set size to for all output byte representations. Truncating longer strings + * and right padding 0 for shorter strings. + */ + public static ByteBuffer stringToOrderedBytes(String val, int length, ByteBuffer reuse, CharsetEncoder encoder) { + Preconditions.checkArgument(encoder.charset().equals(StandardCharsets.UTF_8), + "Cannot use an encoder not using UTF_8 as it's Charset"); + + ByteBuffer bytes = ByteBuffers.reuse(reuse, length); + Arrays.fill(bytes.array(), 0, length, (byte) 0x00); + if (val != null) { + CharBuffer inputBuffer = CharBuffer.wrap(val); + encoder.encode(inputBuffer, bytes, true); + } + return bytes; + } + + /** + * Return a bytebuffer with the given bytes truncated to length, or filled with 0's to length depending on whether + * the given bytes are larger or smaller than the given length. + */ + public static ByteBuffer byteTruncateOrFill(byte[] val, int length, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, length); + if (val.length < length) { + bytes.put(val, 0, val.length); + Arrays.fill(bytes.array(), val.length, length, (byte) 0x00); + } else { + bytes.put(val, 0, length); + } + return bytes; + } + + static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) { + return interleaveBits(columnsBinary, interleavedSize, ByteBuffer.allocate(interleavedSize)); + } + + /** + * Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is + * required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all + * columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the + * interleaving. + * @param columnsBinary an array of ordered byte representations of the columns being ZOrdered + * @param interleavedSize the number of bytes to use in the output + * @return the columnbytes interleaved + */ + public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) { + byte[] interleavedBytes = reuse.array(); + Arrays.fill(interleavedBytes, 0, interleavedSize, (byte) 0x00); + + int sourceColumn = 0; + int sourceByte = 0; + int sourceBit = 7; + int interleaveByte = 0; + int interleaveBit = 7; + + while (interleaveByte < interleavedSize) { + // Take the source bit from source byte and move it to the output bit position + interleavedBytes[interleaveByte] |= + (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit; + --interleaveBit; + + // Check if an output byte has been completed + if (interleaveBit == -1) { + // Move to the next output byte + interleaveByte++; + // Move to the highest order bit of the new output byte + interleaveBit = 7; + } + + // Check if the last output byte has been completed + if (interleaveByte == interleavedSize) { + break; + } + + // Find the next source bit to interleave + do { + // Move to next column + ++sourceColumn; + if (sourceColumn == columnsBinary.length) { + // If the last source column was used, reset to next bit of first column + sourceColumn = 0; + --sourceBit; + if (sourceBit == -1) { + // If the last bit of the source byte was used, reset to the highest bit of the next byte + sourceByte++; + sourceBit = 7; + } + } + } while (columnsBinary[sourceColumn].length <= sourceByte); + } + return interleavedBytes; + } + +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java new file mode 100644 index 000000000000..1a2174b679ba --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import org.apache.iceberg.relocated.com.google.common.primitives.UnsignedBytes; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestZOrderByteUtil { + private static final byte IIIIIIII = (byte) 255; + private static final byte IOIOIOIO = (byte) 170; + private static final byte OIOIOIOI = (byte) 85; + private static final byte OOOOIIII = (byte) 15; + private static final byte OOOOOOOI = (byte) 1; + private static final byte OOOOOOOO = (byte) 0; + + private static final int NUM_TESTS = 100000; + private static final int NUM_INTERLEAVE_TESTS = 1000; + + private final Random random = new Random(42); + + private String bytesToString(byte[] bytes) { + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + result.append(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0')); + } + return result.toString(); + } + + /** + * Returns a non-0 length byte array + */ + private byte[] generateRandomBytes() { + int length = Math.abs(random.nextInt(100) + 1); + return generateRandomBytes(length); + } + + /** + * Returns a byte array of a specified length + */ + private byte[] generateRandomBytes(int length) { + byte[] result = new byte[length]; + random.nextBytes(result); + return result; + } + + /** + * Test method to ensure correctness of byte interleaving code + */ + private String interleaveStrings(String[] strings) { + StringBuilder result = new StringBuilder(); + int totalLength = Arrays.stream(strings).mapToInt(String::length).sum(); + int substringIndex = 0; + int characterIndex = 0; + while (characterIndex < totalLength) { + for (String str : strings) { + if (substringIndex < str.length()) { + result.append(str.charAt(substringIndex)); + characterIndex++; + } + } + substringIndex++; + } + return result.toString(); + } + + /** + * Compares the result of a string based interleaving algorithm implemented above + * versus the binary bit-shifting algorithm used in ZOrderByteUtils. Either both + * algorithms are identically wrong or are both identically correct. + */ + @Test + public void testInterleaveRandomExamples() { + for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) { + int numByteArrays = Math.abs(random.nextInt(6)) + 1; + byte[][] testBytes = new byte[numByteArrays][]; + String[] testStrings = new String[numByteArrays]; + for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) { + testBytes[byteIndex] = generateRandomBytes(); + testStrings[byteIndex] = bytesToString(testBytes[byteIndex]); + } + + int zOrderSize = Arrays.stream(testBytes).mapToInt(column -> column.length).sum(); + byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes, zOrderSize); + String byteResultAsString = bytesToString(byteResult); + + String stringResult = interleaveStrings(testStrings); + + Assert.assertEquals("String interleave didn't match byte interleave", stringResult, byteResultAsString); + } + } + + @Test + public void testReuseInterleaveBuffer() { + int numByteArrays = 2; + int colLength = 16; + ByteBuffer interleaveBuffer = ByteBuffer.allocate(numByteArrays * colLength); + for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) { + byte[][] testBytes = new byte[numByteArrays][]; + String[] testStrings = new String[numByteArrays]; + for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) { + testBytes[byteIndex] = generateRandomBytes(colLength); + testStrings[byteIndex] = bytesToString(testBytes[byteIndex]); + } + + byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes, numByteArrays * colLength, interleaveBuffer); + String byteResultAsString = bytesToString(byteResult); + + String stringResult = interleaveStrings(testStrings); + + Assert.assertEquals("String interleave didn't match byte interleave", stringResult, byteResultAsString); + } + } + + @Test + public void testInterleaveEmptyBits() { + byte[][] test = new byte[4][10]; + byte[] expected = new byte[40]; + + Assert.assertArrayEquals("Should combine empty arrays", + expected, ZOrderByteUtils.interleaveBits(test, 40)); + } + + @Test + public void testInterleaveFullBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[]{IIIIIIII, IIIIIIII}; + test[1] = new byte[]{IIIIIIII}; + test[2] = new byte[0]; + test[3] = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII}; + byte[] expected = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII}; + + Assert.assertArrayEquals("Should combine full arrays", + expected, ZOrderByteUtils.interleaveBits(test, 6)); + } + + @Test + public void testInterleaveMixedBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[]{OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII}; + test[1] = new byte[]{OOOOOOOI, OOOOOOOO, IIIIIIII}; + test[2] = new byte[]{OOOOOOOI}; + test[3] = new byte[]{OOOOOOOI}; + byte[] expected = new byte[]{ + OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII, + IOIOIOIO, IOIOIOIO, + OIOIOIOI, OIOIOIOI, + OOOOIIII}; + Assert.assertArrayEquals("Should combine mixed byte arrays", + expected, ZOrderByteUtils.interleaveBits(test, 9)); + } + + @Test + public void testIntOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + int aInt = random.nextInt(); + int bInt = random.nextInt(); + int intCompare = Integer.signum(Integer.compare(aInt, bInt)); + byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aInt, bInt, intCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + intCompare, byteCompare); + } + } + + @Test + public void testLongOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + long aLong = random.nextInt(); + long bLong = random.nextInt(); + int longCompare = Integer.signum(Long.compare(aLong, bLong)); + byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aLong, bLong, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testShortOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aShort, bShort)); + byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aShort, bShort, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testTinyOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aByte, bByte)); + byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aByte, bByte, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testFloatOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + float aFloat = random.nextFloat(); + float bFloat = random.nextFloat(); + int floatCompare = Integer.signum(Float.compare(aFloat, bFloat)); + byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of floats should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aFloat, bFloat, floatCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + floatCompare, byteCompare); + } + } + + @Test + public void testDoubleOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + double aDouble = random.nextDouble(); + double bDouble = random.nextDouble(); + int doubleCompare = Integer.signum(Double.compare(aDouble, bDouble)); + byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of doubles should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aDouble, bDouble, doubleCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + doubleCompare, byteCompare); + } + } + + @Test + public void testStringOrdering() { + CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder(); + ByteBuffer aBuffer = ByteBuffer.allocate(128); + ByteBuffer bBuffer = ByteBuffer.allocate(128); + for (int i = 0; i < NUM_TESTS; i++) { + String aString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random); + String bString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random); + int stringCompare = Integer.signum(aString.compareTo(bString)); + byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, aBuffer, encoder).array(); + byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, bBuffer, encoder).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of strings should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aString, bString, stringCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + stringCompare, byteCompare); + } + } + + @Test + public void testByteTruncateOrFill() { + ByteBuffer aBuffer = ByteBuffer.allocate(128); + ByteBuffer bBuffer = ByteBuffer.allocate(128); + for (int i = 0; i < NUM_TESTS; i++) { + byte[] aBytesRaw = (byte[]) RandomUtil.generatePrimitive(Types.BinaryType.get(), random); + byte[] bBytesRaw = (byte[]) RandomUtil.generatePrimitive(Types.BinaryType.get(), random); + int stringCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytesRaw, bBytesRaw)); + byte[] aBytes = ZOrderByteUtils.byteTruncateOrFill(aBytesRaw, 128, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.byteTruncateOrFill(bBytesRaw, 128, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of strings should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aBytesRaw, bBytesRaw, stringCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + stringCompare, byteCompare); + } + } +} diff --git a/jmh.gradle b/jmh.gradle index d458ae2c5903..538fd96af406 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -23,7 +23,7 @@ if (jdkVersion != '8' && jdkVersion != '11') { def sparkVersions = (System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions")).split(",") def scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") -def jmhProjects = [] +def jmhProjects = [project(":iceberg-core")] if (jdkVersion == '8' && sparkVersions.contains("2.4")) { jmhProjects.add(project(":iceberg-spark:iceberg-spark-2.4")) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java new file mode 100644 index 000000000000..8c205037f56e --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.action; + +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.relocated.com.google.common.io.Files; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataTypes; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +@Fork(1) +@State(Scope.Benchmark) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.SingleShotTime) +@Timeout(time = 1000, timeUnit = TimeUnit.HOURS) +public class IcebergSortCompactionBenchmark { + + private static final String[] NAMESPACE = new String[] {"default"}; + private static final String NAME = "sortbench"; + private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME); + private static final int NUM_FILES = 8; + private static final long NUM_ROWS = 7500000L; + private static final long UNIQUE_VALUES = NUM_ROWS / 4; + + private final Configuration hadoopConf = initHadoopConf(); + private SparkSession spark; + + @Setup + public void setupBench() { + setupSpark(); + } + + @TearDown + public void teardownBench() { + tearDownSpark(); + } + + @Setup(Level.Iteration) + public void setupIteration() { + initTable(); + appendData(); + } + + @TearDown(Level.Iteration) + public void cleanUpIteration() throws IOException { + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void sortInt() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt2() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt3() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol3", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol4", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt4() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol3", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol4", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortString() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortFourColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortSixColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("timestampCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("longCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt2() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol", "intCol2") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt3() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol", "intCol2", "intCol3") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt4() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol", "intCol2", "intCol3", "intCol4") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortString() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("stringCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortFourColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("stringCol", "intCol", "dateCol", "doubleCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortSixColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol") + .execute(); + } + + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected final void initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "intCol2", Types.IntegerType.get()), + required(4, "intCol3", Types.IntegerType.get()), + required(5, "intCol4", Types.IntegerType.get()), + required(6, "floatCol", Types.FloatType.get()), + optional(7, "doubleCol", Types.DoubleType.get()), + optional(8, "dateCol", Types.DateType.get()), + optional(9, "timestampCol", Types.TimestampType.withZone()), + optional(10, "stringCol", Types.StringType.get())); + + SparkSessionCatalog catalog; + try { + catalog = (SparkSessionCatalog) + Spark3Util.catalogAndIdentifier(spark(), "spark_catalog").catalog(); + catalog.dropTable(IDENT); + catalog.createTable(IDENT, SparkSchemaUtil.convert(schema), new Transform[0], Collections.emptyMap()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void appendData() { + Dataset df = spark().range(0, NUM_ROWS * NUM_FILES, 1, NUM_FILES) + .drop("id") + .withColumn("longCol", new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply()) + .withColumn( + "intCol", + new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply().cast(DataTypes.IntegerType)) + .withColumn( + "intCol2", + new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply().cast(DataTypes.IntegerType)) + .withColumn( + "intCol3", + new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply().cast(DataTypes.IntegerType)) + .withColumn( + "intCol4", + new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply().cast(DataTypes.IntegerType)) + .withColumn( + "floatCol", + new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply().cast(DataTypes.FloatType)) + .withColumn( + "doubleCol", + new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply().cast(DataTypes.DoubleType)) + .withColumn("dateCol", date_add(current_date(), col("intCol").mod(NUM_FILES))) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", new RandomGeneratingUDF(UNIQUE_VALUES).randomString().apply()); + writeData(df); + } + + private void writeData(Dataset df) { + df.write().format("iceberg").mode(SaveMode.Append).save(NAME); + } + + protected final Table table() { + try { + return Spark3Util.loadIcebergTable(spark(), NAME); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected final SparkSession spark() { + return spark; + } + + protected String getCatalogWarehouse() { + String location = Files.createTempDir().getAbsolutePath() + "/" + UUID.randomUUID() + "/"; + return location; + } + + protected void cleanupFiles() throws IOException { + spark.sql("DROP TABLE IF EXISTS " + NAME); + } + + protected void setupSpark() { + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse()) + .master("local[*]"); + spark = builder.getOrCreate(); + Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); + hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue())); + } + + protected void tearDownSpark() { + spark.stop(); + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/RandomGeneratingUDF.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/RandomGeneratingUDF.java new file mode 100644 index 000000000000..cfbd9d4fb3f6 --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/RandomGeneratingUDF.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.action; + +import java.io.Serializable; +import java.util.Random; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; + +import static org.apache.spark.sql.functions.udf; + +class RandomGeneratingUDF implements Serializable { + private final long uniqueValues; + private Random rand = new Random(); + + RandomGeneratingUDF(long uniqueValues) { + this.uniqueValues = uniqueValues; + } + + UserDefinedFunction randomLongUDF() { + return udf(() -> rand.nextLong() % (uniqueValues / 2), DataTypes.LongType).asNondeterministic().asNonNullable(); + } + + UserDefinedFunction randomString() { + return udf(() -> (String) RandomUtil.generatePrimitive(Types.StringType.get(), rand), DataTypes.StringType) + .asNondeterministic().asNonNullable(); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 5350e729c8ea..3a8d8a81fb86 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -130,6 +130,12 @@ public RewriteDataFiles sort() { return this; } + @Override + public RewriteDataFiles zOrder(String... columnNames) { + this.strategy = zOrderStrategy(columnNames); + return this; + } + @Override public RewriteDataFiles filter(Expression expression) { filter = Expressions.and(filter, expression); @@ -429,6 +435,10 @@ private SortStrategy sortStrategy() { return new SparkSortStrategy(table, spark()); } + private SortStrategy zOrderStrategy(String... columnNames) { + return new SparkZOrderStrategy(table, spark(), Lists.newArrayList(columnNames)); + } + @VisibleForTesting static class RewriteExecutionContext { private final Map numGroupsByPartition; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java index 832ff255579c..6c8f8c027dba 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java @@ -155,4 +155,16 @@ protected SparkSession spark() { protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) { return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf); } + + protected double sizeEstimateMultiple() { + return sizeEstimateMultiple; + } + + protected FileScanTaskSetManager manager() { + return manager; + } + + protected FileRewriteCoordinator rewriteCoordinator() { + return rewriteCoordinator; + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java new file mode 100644 index 000000000000..cdd47fe31372 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteStrategy; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SortOrderUtil; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.distributions.Distribution; +import org.apache.spark.sql.connector.distributions.Distributions; +import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkZOrderStrategy extends SparkSortStrategy { + private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderStrategy.class); + + private static final String Z_COLUMN = "ICEZVALUE"; + private static final Schema Z_SCHEMA = new Schema(NestedField.required(0, Z_COLUMN, Types.BinaryType.get())); + private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA) + .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST) + .build(); + + /** + * Controls the amount of bytes interleaved in the ZOrder Algorithm. Default is all bytes being interleaved. + */ + private static final String MAX_OUTPUT_SIZE_KEY = "max-output-size"; + private static final int DEFAULT_MAX_OUTPUT_SIZE = Integer.MAX_VALUE; + + /** + * Controls the number of bytes considered from an input column of a type with variable length (String, Binary). + * Default is to use the same size as primitives {@link ZOrderByteUtils#PRIMITIVE_BUFFER_SIZE} + */ + private static final String VAR_LENGTH_CONTRIBUTION_KEY = "var-length-contribution"; + private static final int DEFAULT_VAR_LENGTH_CONTRIBUTION = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE; + + private final List zOrderColNames; + + private int maxOutputSize; + private int varLengthContribution; + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .add(VAR_LENGTH_CONTRIBUTION_KEY) + .add(MAX_OUTPUT_SIZE_KEY) + .build(); + } + + @Override + public RewriteStrategy options(Map options) { + super.options(options); + + varLengthContribution = PropertyUtil.propertyAsInt(options, VAR_LENGTH_CONTRIBUTION_KEY, + DEFAULT_VAR_LENGTH_CONTRIBUTION); + Preconditions.checkArgument(varLengthContribution > 0, + "Cannot use less than 1 byte for variable length types with zOrder, %s was set to %s", + VAR_LENGTH_CONTRIBUTION_KEY, varLengthContribution); + + maxOutputSize = PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE_KEY, DEFAULT_MAX_OUTPUT_SIZE); + Preconditions.checkArgument(maxOutputSize > 0, + "Cannot have the interleaved ZOrder value use less than 1 byte, %s was set to %s", + MAX_OUTPUT_SIZE_KEY, maxOutputSize); + + return this; + } + + public SparkZOrderStrategy(Table table, SparkSession spark, List zOrderColNames) { + super(table, spark); + + Preconditions.checkArgument(zOrderColNames != null && !zOrderColNames.isEmpty(), + "Cannot ZOrder when no columns are specified"); + + Stream identityPartitionColumns = table.spec().fields().stream() + .filter(f -> f.transform().isIdentity()) + .map(PartitionField::name); + List partZOrderCols = identityPartitionColumns + .filter(zOrderColNames::contains) + .collect(Collectors.toList()); + + if (!partZOrderCols.isEmpty()) { + LOG.warn("Cannot ZOrder on an Identity partition column as these values are constant within a partition " + + "and will be removed from the ZOrder expression: {}", partZOrderCols); + zOrderColNames.removeAll(partZOrderCols); + Preconditions.checkArgument(!zOrderColNames.isEmpty(), + "Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used."); + } + + this.zOrderColNames = zOrderColNames; + } + + @Override + public String name() { + return "Z-ORDER"; + } + + @Override + protected void validateOptions() { + // Ignore SortStrategy validation + return; + } + + @Override + public Set rewriteFiles(List filesToRewrite) { + SparkZOrderUDF zOrderUDF = new SparkZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize); + + String groupID = UUID.randomUUID().toString(); + boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table().spec()); + + SortOrder[] ordering; + if (requiresRepartition) { + ordering = SparkDistributionAndOrderingUtil.convert(SortOrderUtil.buildSortOrder(table(), sortOrder())); + } else { + ordering = SparkDistributionAndOrderingUtil.convert(sortOrder()); + } + + Distribution distribution = Distributions.ordered(ordering); + + try { + manager().stageTasks(table(), groupID, filesToRewrite); + + // Disable Adaptive Query Execution as this may change the output partitioning of our write + SparkSession cloneSession = spark().cloneSession(); + cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + + // Reset Shuffle Partitions for our sort + long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple())); + cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); + + Dataset scanDF = cloneSession.read().format("iceberg") + .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID) + .load(table().name()); + + Column[] originalColumns = Arrays.stream(scanDF.schema().names()) + .map(n -> functions.col(n)) + .toArray(Column[]::new); + + List zOrderColumns = zOrderColNames.stream() + .map(scanDF.schema()::apply) + .collect(Collectors.toList()); + + Column zvalueArray = functions.array(zOrderColumns.stream().map(colStruct -> + zOrderUDF.sortedLexicographically(functions.col(colStruct.name()), colStruct.dataType()) + ).toArray(Column[]::new)); + + Dataset zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray)); + + SQLConf sqlConf = cloneSession.sessionState().conf(); + LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf); + Dataset sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder()); + sortedDf + .select(originalColumns) + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) + .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) + .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .mode("append") + .save(table().name()); + + return rewriteCoordinator().fetchNewDataFiles(table(), groupID); + } finally { + manager().removeTasks(table(), groupID); + rewriteCoordinator().clearRewrite(table(), groupID); + } + } + + @Override + protected org.apache.iceberg.SortOrder sortOrder() { + return Z_SORT_ORDER; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderUDF.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderUDF.java new file mode 100644 index 000000000000..eea3689211e2 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderUDF.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.TimestampType; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +class SparkZOrderUDF implements Serializable { + private static final byte[] PRIMITIVE_EMPTY = new byte[ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE]; + + /** + * Every Spark task runs iteratively on a rows in a single thread so ThreadLocal should protect from + * concurrent access to any of these structures. + */ + private transient ThreadLocal outputBuffer; + private transient ThreadLocal inputHolder; + private transient ThreadLocal inputBuffers; + private transient ThreadLocal encoder; + + private final int numCols; + + private int inputCol = 0; + private int totalOutputBytes = 0; + private final int varTypeSize; + private final int maxOutputSize; + + SparkZOrderUDF(int numCols, int varTypeSize, int maxOutputSize) { + this.numCols = numCols; + this.varTypeSize = varTypeSize; + this.maxOutputSize = maxOutputSize; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]); + inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]); + outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(totalOutputBytes)); + encoder = ThreadLocal.withInitial(() -> StandardCharsets.UTF_8.newEncoder()); + } + + private ByteBuffer inputBuffer(int position, int size) { + ByteBuffer buffer = inputBuffers.get()[position]; + if (buffer == null) { + buffer = ByteBuffer.allocate(size); + inputBuffers.get()[position] = buffer; + } + return buffer; + } + + byte[] interleaveBits(Seq scalaBinary) { + byte[][] columnsBinary = JavaConverters.seqAsJavaList(scalaBinary).toArray(inputHolder.get()); + return ZOrderByteUtils.interleaveBits(columnsBinary, totalOutputBytes, outputBuffer.get()); + } + + private UserDefinedFunction tinyToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Byte value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.tinyintToOrderedBytes(value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, DataTypes.BinaryType).withName("TINY_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction shortToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Short value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.shortToOrderedBytes(value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, DataTypes.BinaryType).withName("SHORT_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction intToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Integer value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.intToOrderedBytes(value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, DataTypes.BinaryType).withName("INT_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction longToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Long value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.longToOrderedBytes(value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, DataTypes.BinaryType).withName("LONG_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction floatToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Float value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.floatToOrderedBytes(value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, DataTypes.BinaryType).withName("FLOAT_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction doubleToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Double value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.doubleToOrderedBytes(value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, DataTypes.BinaryType).withName("DOUBLE_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction booleanToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Boolean value) -> { + ByteBuffer buffer = inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + buffer.put(0, (byte) (value ? -127 : 0)); + return buffer.array(); + }, DataTypes.BinaryType).withName("BOOLEAN-LEXICAL-BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + return udf; + } + + private UserDefinedFunction stringToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((String value) -> + ZOrderByteUtils.stringToOrderedBytes( + value, + varTypeSize, + inputBuffer(position, varTypeSize), + encoder.get()).array(), DataTypes.BinaryType) + .withName("STRING-LEXICAL-BYTES"); + + this.inputCol++; + increaseOutputSize(varTypeSize); + + return udf; + } + + private UserDefinedFunction bytesTruncateUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((byte[] value) -> + ZOrderByteUtils.byteTruncateOrFill(value, varTypeSize, inputBuffer(position, varTypeSize)).array(), + DataTypes.BinaryType) + .withName("BYTE-TRUNCATE"); + + this.inputCol++; + increaseOutputSize(varTypeSize); + + return udf; + } + + private final UserDefinedFunction interleaveUDF = + functions.udf((Seq arrayBinary) -> interleaveBits(arrayBinary), DataTypes.BinaryType) + .withName("INTERLEAVE_BYTES"); + + Column interleaveBytes(Column arrayBinary) { + return interleaveUDF.apply(arrayBinary); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + Column sortedLexicographically(Column column, DataType type) { + if (type instanceof ByteType) { + return tinyToOrderedBytesUDF().apply(column); + } else if (type instanceof ShortType) { + return shortToOrderedBytesUDF().apply(column); + } else if (type instanceof IntegerType) { + return intToOrderedBytesUDF().apply(column); + } else if (type instanceof LongType) { + return longToOrderedBytesUDF().apply(column); + } else if (type instanceof FloatType) { + return floatToOrderedBytesUDF().apply(column); + } else if (type instanceof DoubleType) { + return doubleToOrderedBytesUDF().apply(column); + } else if (type instanceof StringType) { + return stringToOrderedBytesUDF().apply(column); + } else if (type instanceof BinaryType) { + return bytesTruncateUDF().apply(column); + } else if (type instanceof BooleanType) { + return booleanToOrderedBytesUDF().apply(column); + } else if (type instanceof TimestampType) { + return longToOrderedBytesUDF().apply(column.cast(DataTypes.LongType)); + } else if (type instanceof DateType) { + return longToOrderedBytesUDF().apply(column.cast(DataTypes.LongType)); + } else { + throw new IllegalArgumentException( + String.format("Cannot use column %s of type %s in ZOrdering, the type is unsupported", column, type)); + } + } + + private void increaseOutputSize(int bytes) { + totalOutputBytes = Math.min(totalOutputBytes + bytes, maxOutputSize); + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 1d8695053123..4137bded9404 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -62,6 +62,7 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; @@ -95,6 +96,10 @@ import org.mockito.Mockito; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; @@ -1032,6 +1037,85 @@ public void testCommitStateUnknownException() { shouldHaveSnapshots(table, 2); // Commit actually Succeeded } + @Test + public void testZOrderSort() { + int originalFiles = 20; + Table table = createTable(originalFiles); + shouldHaveLastCommitUnsorted(table, "c2"); + shouldHaveFiles(table, originalFiles); + + List originalData = currentData(); + double originalFilesC2 = percentFilesRequired(table, "c2", "foo23"); + double originalFilesC3 = percentFilesRequired(table, "c3", "bar21"); + double originalFilesC2C3 = percentFilesRequired(table, new String[]{"c2", "c3"}, new String[]{"foo23", "bar23"}); + + Assert.assertTrue("Should require all files to scan c2", originalFilesC2 > 0.99); + Assert.assertTrue("Should require all files to scan c3", originalFilesC3 > 0.99); + + RewriteDataFiles.Result result = + basicRewrite(table) + .zOrder("c2", "c3") + .option(SortStrategy.MAX_FILE_SIZE_BYTES, Integer.toString((averageFileSize(table) / 2) + 2)) + // Divide files in 2 + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / 2)) + .option(SortStrategy.MIN_INPUT_FILES, "1") + .execute(); + + Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedFiles()); + Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 40); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + + double filesScannedC2 = percentFilesRequired(table, "c2", "foo23"); + double filesScannedC3 = percentFilesRequired(table, "c3", "bar21"); + double filesScannedC2C3 = percentFilesRequired(table, new String[]{"c2", "c3"}, new String[]{"foo23", "bar23"}); + + Assert.assertTrue("Should have reduced the number of files required for c2", + filesScannedC2 < originalFilesC2); + Assert.assertTrue("Should have reduced the number of files required for c3", + filesScannedC3 < originalFilesC3); + Assert.assertTrue("Should have reduced the number of files required for a c2,c3 predicate", + filesScannedC2C3 < originalFilesC2C3); + } + + @Test + public void testZOrderAllTypesSort() { + Table table = createTypeTestTable(); + shouldHaveFiles(table, 10); + + List originalRaw = spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); + List originalData = rowsToJava(originalRaw); + + // TODO add in UUID when it is supported in Spark + RewriteDataFiles.Result result = + basicRewrite(table) + .zOrder("longCol", "intCol", "floatCol", "doubleCol", "dateCol", "timestampCol", "stringCol", "binaryCol", + "booleanCol") + .option(SortStrategy.MIN_INPUT_FILES, "1") + .option(SortStrategy.REWRITE_ALL, "true") + .execute(); + + Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedFiles()); + Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal); + + table.refresh(); + + List postRaw = spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); + List postRewriteData = rowsToJava(postRaw); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @Test public void testInvalidAPIUsage() { Table table = createTable(1); @@ -1327,6 +1411,39 @@ protected Table createTablePartitioned(int partitions, int files) { return createTablePartitioned(partitions, files, SCALE, Maps.newHashMap()); } + private Table createTypeTestTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "dateCol", Types.DateType.get()), + optional(6, "timestampCol", Types.TimestampType.withZone()), + optional(7, "stringCol", Types.StringType.get()), + optional(8, "booleanCol", Types.BooleanType.get()), + optional(9, "binaryCol", Types.BinaryType.get())); + + Map options = Maps.newHashMap(); + Table table = TABLES.create(schema, PartitionSpec.unpartitioned(), options, tableLocation); + + spark.range(0, 10, 1, 10) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .withColumn("booleanCol", expr("longCol > 5")) + .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + return table; + } + protected int averageFileSize(Table table) { table.refresh(); return (int) Streams.stream(table.newScan().planFiles()).mapToLong(FileScanTask::length).average().getAsDouble(); @@ -1412,6 +1529,21 @@ private Set cacheContents(Table table) { .build(); } + private double percentFilesRequired(Table table, String col, String value) { + return percentFilesRequired(table, new String[]{col}, new String[]{value}); + } + + private double percentFilesRequired(Table table, String[] cols, String[] values) { + Preconditions.checkArgument(cols.length == values.length); + Expression restriction = Expressions.alwaysTrue(); + for (int i = 0; i < cols.length; i++) { + restriction = Expressions.and(restriction, Expressions.equal(cols[i], values[i])); + } + int totalFiles = Iterables.size(table.newScan().planFiles()); + int filteredFiles = Iterables.size(table.newScan().filter(restriction).planFiles()); + return (double) filteredFiles / (double) totalFiles; + } + class GroupInfoMatcher implements ArgumentMatcher { private final Set groupIDs;