-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Fix IllegarlArgumentException when filtering on BinaryType column #3460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Related issue: #2934 |
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @izchen!
Left some feedback. I have some concerns around the removal of sqlString function for literals as I don't see any corresponding changes for strings.
Eventually, we'll need to update more than just Spark v3.2. Additionally, it would be nice if Flink and even MR could be handled in this PR. But I'm especially interested in Flink, as the documentation seems that it uses the constructor looking syntax. I admit I'm not familiar enough with using byte literals in Flink SQL to be sure though.
| byte[] binary = new byte[value().remaining()]; | ||
| value().duplicate().get(binary); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same note as above.
| public String toString() { | ||
| byte[] binary = new byte[value().remaining()]; | ||
| value().duplicate().get(binary); | ||
| return "0x" + BaseEncoding.base16().encode(binary); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we have any other places where we convert a ByteBufer to a string.
If there are any, it seems like potentially we should add the stringify function to the ByteBuffer utility class and then try to use it uniformly everywhere. You don't necessarily need to update the other places to use them in this PR, but would you mind taking a look and seeing if you find any?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up question: Are there cases where people might not use base16 for the representation or where the leading zero might not be used? I don't think so for Spark, but I'm less sure with Flink for example.
For Spark, looking at the documentation, it seems like it's just X, without the leading zero: https://spark.apache.org/docs/latest/sql-ref-literals.html#binary-literal
I'm not as sure about Trino, Flink, and other systems though.
For Trino, it looks to also start with the capital X (without the zero): https://trino.io/docs/current/language/types.html#varbinary
Looking it up, for Flink, it looks like maybe it's actually represented as BINARY(n) or VARBINARY(n). This is the one I'm least sure about: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#binary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just be for debugging. If we are converting a literal to a string and back, then that's a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just be for debugging.
In the base class, the toString method is String.valueOf(value) (In StringLiteral, it is "\"" + value() + "\"").
For other types of literals, the return value of toString is clear. But for ByteBuffer, the return value is java.nio.HeapByteBuffer[pos=0 lim=16 cap=16].
So I implemented the literal toString method of ByteBuffer type, for logging.
| return pred.ref().name() + " = " + pred.literal(); | ||
| case NOT_EQ: | ||
| return pred.ref().name() + " != " + sqlString(pred.literal()); | ||
| return pred.ref().name() + " != " + pred.literal(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we get rid of the sqlString function here? Is there a change that removes the need to quote strings for example?
Also, since the representation of ByteBuffer is potentially engine specific, would it be better to add the conversion to the sqlString here for Spark (e.g. using the leading capital X, like X'123456')?
This would allow other engines to handle it themselves (e.g. if Flink doesn't use leading X format).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbendick is right. This should not remove the sqlString function. That converts a literal into a form that is usable by Spark. Instead, you should update that function to produce Spark's binary literal format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sqlString is implemented and used in the DescribeExpressionVisitor class.
I think this class is just for generating description information, and the description will be displayed in the log and Spark UI for debugging. So in my opinion, it is an acceptable way to use Literal.toString directly, which ensures that the description is the same as the style of other logs.
In fact, except for the difference between StringLiteral's single and double quotes, the current sqlString and Literal.toString are the same.
But if you think that keeping sqlString is a better way, I can also modify the code.
In addition, there are some differences between the description form of spark here, for example, the StringLiteral of spark does not have quotation marks. If you think that this should be the same as the behavior of spark, I can also modify the code.
| import org.junit.AfterClass; | ||
| import org.junit.Assert; | ||
| import org.junit.BeforeClass; | ||
| import org.junit.internal.ExactComparisonCriteria; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it normal to use this class that's got internal in the package name? Do we have to worry about behavioral changes if we upgrade JUnit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, I have modified the code
| byte[] binary = new byte[value().remaining()]; | ||
| value().duplicate().get(binary); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an existing ByteBuffers utility class, org.apache.iceberg.util.ByteBuffers, which you can use here. Can you please use that for consistency? It's also more efficient in some cases (doesn't always necessarily allocate, handles offsets more thoroughly, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, I have modified the code
| } else { | ||
| return lit.value().toString(); | ||
| } | ||
| return literals.stream().map(Object::toString).collect(Collectors.joining(", ")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems possibly incorrect or a deviation from the behavior before.
It looks like this code is calling Object.toString on Literal<T>, whereas before, the toString call came from Literal<T>::value. Is that the intended change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. After testing, I think this is correct
| new ExactComparisonCriteria().arrayEquals(newContext, expectedValue, actualValue); | ||
| } else { | ||
| assertEquals(newContext, (Object[]) expectedValue, (Object[]) actualValue); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, I have modified the code
| sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg", tableName); | ||
| sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", tableName); | ||
| sql("CREATE TABLE %s (id bigint, data string, float float, binary binary) USING iceberg", tableName); | ||
| sql("INSERT INTO %s VALUES (1, 'a', 1.0, X''), (2, 'b', 2.0, X'11'), (3, 'c', float('NaN'), X'1111')", tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than repurposing old test cases, can you create new ones? We want to avoid mixing tests together in confusing ways that look like other failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also add that repurposing existing test cases makes it harder for people who maintain forks and might not cherry-pick every commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, I have modified the code
24e6298 to
e78abd7
Compare
| } | ||
|
|
||
| public static String encodeHexString(ByteBuffer buffer) { | ||
| byte[] bytes = toByteArray(buffer.duplicate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for duplicate here because toByteArray won't modify the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, I have modified the code
FlinkExecuting this SQL test case in flink will report an error: It seems that this is an internal error of flink, which has nothing to do with iceberg. I can add the following code to the flink code to solve this problem (I will try to submit a PR to the flink community in the future): case BINARY =>
// convert to byte[]
literal.getValueAs(classOf[Array[Byte]])After using this code to fix flink, the SQL test case runs normally, returns the correct result, and the SQL description string in the flink UI is the correct X'110F'. I think this problem does not exist in Flink-runtime. HiveAccording to the hive documentation, there is no binary type literal in hive. I think this problem does not exist in Hive-runtime. TrinoFor Trino, I have not actually used Trino. Judging from the Trino code, Trino may not have this MR problem, but I do not have a Trino test environment. And we cannot modify the Trino-runtime code in this MR. |
|
I found a related trino community MR in the email. The current behavior of spark iceberg is used in this MR UT. |
| } else if (value instanceof scala.collection.Map) { | ||
| return row.getJavaMap(pos); | ||
| } else if (value.getClass().isArray() && value.getClass().getComponentType().isPrimitive()) { | ||
| return IntStream.range(0, Array.getLength(value)).mapToObj(i -> Array.get(value, i)).toArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct. The toJava method assumes that the elements of the returned array are all wrapper types, which is a false assumption. Spark's java api result has the potential to return an array of primitive types.
|
This is fixed by #3728, which I have been reviewing lately and just committed. I didn't realize this was a separate PR until yesterday, so sorry that I hadn't come back to this one yet (I actually thought I was reviewing the same one!). In any case, the fix is nearly identical so this should be fixed by that other issue. Thanks, @izchen! |
The test case reproducing the exception:
Like this SQL, BinaryLiteral(ByteBuffer) is not an illegal value.
in addition, the BinaryLiteral string format in this PR is the same as spark.
https://github.com/apache/spark/blob/b874bf5dca4f1b7272f458350eb153e7b272f8c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L347