Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Literals.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.NaNUtil;

class Literals {
Expand Down Expand Up @@ -599,6 +600,11 @@ protected Type.TypeID typeId() {
Object writeReplace() throws ObjectStreamException {
return new SerializationProxies.FixedLiteralProxy(value());
}

@Override
public String toString() {
return "0x" + ByteBuffers.encodeHexString(value());
}
}

static class BinaryLiteral extends BaseLiteral<ByteBuffer> {
Expand Down Expand Up @@ -639,5 +645,10 @@ Object writeReplace() throws ObjectStreamException {
protected Type.TypeID typeId() {
return Type.TypeID.BINARY;
}

@Override
public String toString() {
return "0x" + ByteBuffers.encodeHexString(value());
}
}
}
6 changes: 6 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/ByteBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;

public class ByteBuffers {

Expand Down Expand Up @@ -58,6 +59,11 @@ public static ByteBuffer copy(ByteBuffer buffer) {
return ByteBuffer.wrap(copyArray);
}

public static String encodeHexString(ByteBuffer buffer) {
byte[] bytes = toByteArray(buffer);
return BaseEncoding.base16().encode(bytes);
}

private ByteBuffers() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.spark;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -636,17 +635,17 @@ public <T> String predicate(UnboundPredicate<T> pred) {
case NOT_NAN:
return "not_nan(" + pred.ref().name() + ")";
case LT:
return pred.ref().name() + " < " + sqlString(pred.literal());
return pred.ref().name() + " < " + pred.literal();
case LT_EQ:
return pred.ref().name() + " <= " + sqlString(pred.literal());
return pred.ref().name() + " <= " + pred.literal();
case GT:
return pred.ref().name() + " > " + sqlString(pred.literal());
return pred.ref().name() + " > " + pred.literal();
case GT_EQ:
return pred.ref().name() + " >= " + sqlString(pred.literal());
return pred.ref().name() + " >= " + pred.literal();
case EQ:
return pred.ref().name() + " = " + sqlString(pred.literal());
return pred.ref().name() + " = " + pred.literal();
case NOT_EQ:
return pred.ref().name() + " != " + sqlString(pred.literal());
return pred.ref().name() + " != " + pred.literal();
Copy link
Contributor

@kbendick kbendick Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we get rid of the sqlString function here? Is there a change that removes the need to quote strings for example?

Also, since the representation of ByteBuffer is potentially engine specific, would it be better to add the conversion to the sqlString here for Spark (e.g. using the leading capital X, like X'123456')?

This would allow other engines to handle it themselves (e.g. if Flink doesn't use leading X format).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick is right. This should not remove the sqlString function. That converts a literal into a form that is usable by Spark. Instead, you should update that function to produce Spark's binary literal format.

Copy link
Contributor Author

@izchen izchen Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlString is implemented and used in the DescribeExpressionVisitor class.

I think this class is just for generating description information, and the description will be displayed in the log and Spark UI for debugging. So in my opinion, it is an acceptable way to use Literal.toString directly, which ensures that the description is the same as the style of other logs.

In fact, except for the difference between StringLiteral's single and double quotes, the current sqlString and Literal.toString are the same.

But if you think that keeping sqlString is a better way, I can also modify the code.
In addition, there are some differences between the description form of spark here, for example, the StringLiteral of spark does not have quotation marks. If you think that this should be the same as the behavior of spark, I can also modify the code.

case STARTS_WITH:
return pred.ref().name() + " LIKE '" + pred.literal() + "%'";
case IN:
Expand All @@ -659,17 +658,7 @@ public <T> String predicate(UnboundPredicate<T> pred) {
}

private static <T> String sqlString(List<org.apache.iceberg.expressions.Literal<T>> literals) {
return literals.stream().map(DescribeExpressionVisitor::sqlString).collect(Collectors.joining(", "));
}

private static String sqlString(org.apache.iceberg.expressions.Literal<?> lit) {
if (lit.value() instanceof String) {
return "'" + lit.value() + "'";
} else if (lit.value() instanceof ByteBuffer) {
throw new IllegalArgumentException("Cannot convert bytes to SQL literal: " + lit);
} else {
return lit.value().toString();
}
return literals.stream().map(Object::toString).collect(Collectors.joining(", "));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems possibly incorrect or a deviation from the behavior before.

It looks like this code is calling Object.toString on Literal<T>, whereas before, the toString call came from Literal<T>::value. Is that the intended change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. After testing, I think this is correct

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.spark;

import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,6 +119,8 @@ private Object[] toJava(Row row) {
return row.getList(pos);
} else if (value instanceof scala.collection.Map) {
return row.getJavaMap(pos);
} else if (value.getClass().isArray() && value.getClass().getComponentType().isPrimitive()) {
return IntStream.range(0, Array.getLength(value)).mapToObj(i -> Array.get(value, i)).toArray();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct. The toJava method assumes that the elements of the returned array are all wrapper types, which is a false assumption. Spark's java api result has the potential to return an array of primitive types.

} else {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TestSelect extends SparkCatalogTestBase {
private int scanEventCount = 0;
private ScanEvent lastScanEvent = null;

private final String binaryTableName = tableName("binary_table");

public TestSelect(String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);

Expand All @@ -52,13 +54,17 @@ public void createTables() {
sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg", tableName);
sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", tableName);

sql("CREATE TABLE %s (id bigint, binary binary) USING iceberg", binaryTableName);
sql("INSERT INTO %s VALUES (1, X''), (2, X'11'), (3, X'1111')", binaryTableName);

this.scanEventCount = 0;
this.lastScanEvent = null;
}

@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS %s", binaryTableName);
}

@Test
Expand Down Expand Up @@ -120,4 +126,12 @@ public void testMetadataTables() {
ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
sql("SELECT * FROM %s.snapshots", tableName));
}

@Test
public void testFilterBinary() {
List<Object[]> expected = ImmutableList.of(row(3L, new Byte[]{0x11, 0x11}));

assertEquals("Should return all expected rows", expected,
sql("SELECT * FROM %s where binary > X'1101'", binaryTableName));
}
}