-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22000][SQL] Address missing Upcast in JavaTypeInference.deserializerFor #23854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
acd5d55
443e74f
d869bba
60ca088
9eeb391
d4c2060
9c040e2
2007452
4d564ab
e01bfe6
8cbad26
7ff01db
24a1b19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,15 @@ | |
| import java.io.Serializable; | ||
| import java.util.*; | ||
|
|
||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; | ||
| import org.apache.spark.sql.catalyst.expressions.GenericRow; | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow; | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverterSuite; | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; | ||
| import org.apache.spark.sql.types.DataTypes; | ||
| import org.apache.spark.sql.types.StructType; | ||
| import org.apache.spark.unsafe.types.UTF8String; | ||
| import org.junit.*; | ||
|
|
||
| import org.apache.spark.sql.Dataset; | ||
|
|
@@ -115,6 +124,70 @@ public void testBeanWithMapFieldsDeserialization() { | |
| Assert.assertEquals(records, MAP_RECORDS); | ||
| } | ||
|
|
||
| private static final List<Row> ROWS_SPARK_22000 = new ArrayList<>(); | ||
|
||
| private static final List<RecordSpark22000> RECORDS_SPARK_22000 = new ArrayList<>(); | ||
|
|
||
| private static Row createRecordSpark22000Row(Long index) { | ||
| Object[] values = new Object[] { | ||
| index.shortValue(), | ||
| index.intValue(), | ||
| index, | ||
| index.floatValue(), | ||
| index.doubleValue(), | ||
| String.valueOf(index), | ||
| index % 2 == 0, | ||
| new java.sql.Timestamp(System.currentTimeMillis()) | ||
| }; | ||
| return new GenericRow(values); | ||
| } | ||
|
|
||
| private static RecordSpark22000 createRecordSpark22000(Row recordRow) { | ||
| RecordSpark22000 record = new RecordSpark22000(); | ||
| record.setShortField(String.valueOf(recordRow.getShort(0))); | ||
| record.setIntField(String.valueOf(recordRow.getInt(1))); | ||
| record.setLongField(String.valueOf(recordRow.getLong(2))); | ||
| record.setFloatField(String.valueOf(recordRow.getFloat(3))); | ||
| record.setDoubleField(String.valueOf(recordRow.getDouble(4))); | ||
| record.setStringField(recordRow.getString(5)); | ||
| record.setBooleanField(String.valueOf(recordRow.getBoolean(6))); | ||
| record.setTimestampField(String.valueOf(recordRow.getTimestamp(7).getTime() * 1000)); | ||
| return record; | ||
| } | ||
|
|
||
| static { | ||
| for (long idx = 0 ; idx < 5 ; idx++) { | ||
| Row row = createRecordSpark22000Row(idx); | ||
| ROWS_SPARK_22000.add(row); | ||
| RECORDS_SPARK_22000.add(createRecordSpark22000(row)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testSpark22000() { | ||
| // Here we try to convert the fields, from any types to string. | ||
| // Before applying SPARK-22000, Spark called toString() against variable which type might be primitive. | ||
| // SPARK-22000 it calls String.valueOf() which finally calls toString() but handles boxing | ||
| // if the type is primitive. | ||
| Encoder<RecordSpark22000> encoder = Encoders.bean(RecordSpark22000.class); | ||
|
|
||
| StructType schema = new StructType() | ||
| .add("shortField", DataTypes.ShortType) | ||
| .add("intField", DataTypes.IntegerType) | ||
| .add("longField", DataTypes.LongType) | ||
| .add("floatField", DataTypes.FloatType) | ||
| .add("doubleField", DataTypes.DoubleType) | ||
| .add("stringField", DataTypes.StringType) | ||
| .add("booleanField", DataTypes.BooleanType) | ||
| .add("timestampField", DataTypes.TimestampType); | ||
|
|
||
| Dataset<Row> dataFrame = spark.createDataFrame(ROWS_SPARK_22000, schema); | ||
| Dataset<RecordSpark22000> dataset = dataFrame.as(encoder); | ||
|
|
||
| List<RecordSpark22000> records = dataset.collectAsList(); | ||
|
|
||
| Assert.assertEquals(RECORDS_SPARK_22000, records); | ||
| } | ||
|
|
||
| public static class ArrayRecord { | ||
|
|
||
| private int id; | ||
|
|
@@ -252,4 +325,116 @@ public String toString() { | |
| return String.format("[%d,%d]", startTime, endTime); | ||
| } | ||
| } | ||
|
|
||
| public static class RecordSpark22000 { | ||
|
||
| private String shortField; | ||
| private String intField; | ||
| private String longField; | ||
| private String floatField; | ||
| private String doubleField; | ||
| private String stringField; | ||
| private String booleanField; | ||
| private String timestampField; | ||
|
|
||
| public RecordSpark22000() { } | ||
|
|
||
| public String getShortField() { | ||
| return shortField; | ||
| } | ||
|
|
||
| public void setShortField(String shortField) { | ||
| this.shortField = shortField; | ||
| } | ||
|
|
||
| public String getIntField() { | ||
| return intField; | ||
| } | ||
|
|
||
| public void setIntField(String intField) { | ||
| this.intField = intField; | ||
| } | ||
|
|
||
| public String getLongField() { | ||
| return longField; | ||
| } | ||
|
|
||
| public void setLongField(String longField) { | ||
| this.longField = longField; | ||
| } | ||
|
|
||
| public String getFloatField() { | ||
| return floatField; | ||
| } | ||
|
|
||
| public void setFloatField(String floatField) { | ||
| this.floatField = floatField; | ||
| } | ||
|
|
||
| public String getDoubleField() { | ||
| return doubleField; | ||
| } | ||
|
|
||
| public void setDoubleField(String doubleField) { | ||
| this.doubleField = doubleField; | ||
| } | ||
|
|
||
| public String getStringField() { | ||
| return stringField; | ||
| } | ||
|
|
||
| public void setStringField(String stringField) { | ||
| this.stringField = stringField; | ||
| } | ||
|
|
||
| public String getBooleanField() { | ||
| return booleanField; | ||
| } | ||
|
|
||
| public void setBooleanField(String booleanField) { | ||
| this.booleanField = booleanField; | ||
| } | ||
|
|
||
| public String getTimestampField() { | ||
| return timestampField; | ||
| } | ||
|
|
||
| public void setTimestampField(String timestampField) { | ||
| this.timestampField = timestampField; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| RecordSpark22000 that = (RecordSpark22000) o; | ||
| return Objects.equals(shortField, that.shortField) && | ||
| Objects.equals(intField, that.intField) && | ||
| Objects.equals(longField, that.longField) && | ||
| Objects.equals(floatField, that.floatField) && | ||
| Objects.equals(doubleField, that.doubleField) && | ||
| Objects.equals(stringField, that.stringField) && | ||
| Objects.equals(booleanField, that.booleanField) && | ||
| Objects.equals(timestampField, that.timestampField); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(shortField, intField, longField, floatField, doubleField, stringField, | ||
| booleanField, timestampField); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need toString? I understand hashCode and equals
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will help to compare expected and actual when test fails. Otherwise they would've seen as |
||
| return com.google.common.base.Objects.toStringHelper(this) | ||
| .add("shortField", shortField) | ||
| .add("intField", intField) | ||
| .add("longField", longField) | ||
| .add("floatField", floatField) | ||
| .add("doubleField", doubleField) | ||
| .add("stringField", stringField) | ||
| .add("booleanField", booleanField) | ||
| .add("timestampField", timestampField) | ||
| .toString(); | ||
| } | ||
HeartSaVioR marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ScalaReflectiondoes the same thing, do we have a problem there too?AFAIK the
pathshould be a string type column, and it's always safe to callUTF8String.toString. My gut feeling is, we miss to addUpcastsomewhere inJavaTypeInference.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sample code in JIRA issue tried to bind IntegerType column to String field in Java bean, which looks to break your expectation. (I guess ScalaReflection would not encounter this case.)
Spark doesn't throw error for this case though - actually Spark would show undefined behaviors, compilation failures on generated code, even might be possible to throw runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In scala, we can also do this and Spark will add
Upcast. e.g.spark.range(1).as[String].collectworks fine.I did a quick search and
JavaTypeInferencehas noUpcast. We should fix it and followScalaReflectionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. I'll check and address it. Maybe it would be a separate PR if it doesn't fix the new test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah your suggestion seems to work nicely! I left comment to ask which approach to choose: please compare both approach and comment. Thanks!