-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11593][SQL] Replace catalyst converter with RowEncoder in ScalaUDF #9565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
942dad7
Replace catalyst converter with RowEncoder.
viirya 39f6c26
Add UserDefinedType to RowEncoder.
viirya 75ffaeb
Fix scala style.
viirya 1e13ff9
Call serialize on udt instead of user class.
viirya 07ff97a
Add getField for UserDefinedType.
viirya ecf01bf
Move outputEncoder outside of eval and add calling copy().
viirya 5186777
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 39c0b7a
Replace catalyst converter with RowEncoder for the generated ScalaUDF.
viirya c910e6e
Fix scala style.
viirya 1234515
Fix scala style.
viirya 5c18c0c
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 934f2cc
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya fc882ca
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 2c85714
Use reflection to call function for interpreted version. Add more com…
viirya f806755
Move reflection code outside eval function.
viirya 26b4d85
Process exception thrown in UDF.
viirya 693a6fe
Try to solve failed tests.
viirya 1ca2efc
Try again.
viirya b8f3cce
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 2fcbe69
Fix scala style.
viirya 5da1c13
Try it.
viirya 60f4ca0
Try to fix test.
viirya 7a046fa
Make createTransformFunc as val.
viirya 898acfa
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya dd43918
Make createTransformFunc as lazy val.
viirya 8dbc551
Pass Transformer into UDF to get the updated param values.
viirya 5f987c0
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 597c971
Fix MiMa problem.
viirya 405e8b0
Fix passing null into ScalaUDF.
viirya 648c7b2
Check PrimitiveType.
viirya 10a9f91
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 2a0c319
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 21a2af2
Merge remote-tracking branch 'upstream/master' into rowencoder-scalaudf
viirya 884a176
Remove ScalaUDF non code-generated evaluation support.
viirya 30a867e
Remove unnecessary import.
viirya File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,6 +50,14 @@ object RowEncoder { | |
| case BooleanType | ByteType | ShortType | IntegerType | LongType | | ||
| FloatType | DoubleType | BinaryType => inputObject | ||
|
|
||
| case udt: UserDefinedType[_] => | ||
| val obj = NewInstance( | ||
| udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), | ||
| Nil, | ||
| false, | ||
| dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) | ||
| Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil) | ||
|
|
||
| case TimestampType => | ||
| StaticInvoke( | ||
| DateTimeUtils, | ||
|
|
@@ -109,11 +117,16 @@ object RowEncoder { | |
|
|
||
| case StructType(fields) => | ||
| val convertedFields = fields.zipWithIndex.map { case (f, i) => | ||
| val method = if (f.dataType.isInstanceOf[StructType]) { | ||
| "getStruct" | ||
| } else { | ||
| "get" | ||
| } | ||
| If( | ||
| Invoke(inputObject, "isNullAt", BooleanType, Literal(i) :: Nil), | ||
| Literal.create(null, f.dataType), | ||
| extractorsFor( | ||
| Invoke(inputObject, "get", externalDataTypeFor(f.dataType), Literal(i) :: Nil), | ||
| Invoke(inputObject, method, externalDataTypeFor(f.dataType), Literal(i) :: Nil), | ||
|
||
| f.dataType)) | ||
| } | ||
| CreateStruct(convertedFields) | ||
|
|
@@ -129,6 +142,7 @@ object RowEncoder { | |
| case _: ArrayType => ObjectType(classOf[scala.collection.Seq[_]]) | ||
| case _: MapType => ObjectType(classOf[scala.collection.Map[_, _]]) | ||
| case _: StructType => ObjectType(classOf[Row]) | ||
| case udt: UserDefinedType[_] => ObjectType(udt.userClass) | ||
| } | ||
|
|
||
| private def constructorFor(schema: StructType): Expression = { | ||
|
|
@@ -147,6 +161,14 @@ object RowEncoder { | |
| case BooleanType | ByteType | ShortType | IntegerType | LongType | | ||
| FloatType | DoubleType | BinaryType => input | ||
|
|
||
| case udt: UserDefinedType[_] => | ||
| val obj = NewInstance( | ||
| udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), | ||
| Nil, | ||
| false, | ||
| dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) | ||
| Invoke(obj, "deserialize", ObjectType(udt.userClass), input :: Nil) | ||
|
|
||
| case TimestampType => | ||
| StaticInvoke( | ||
| DateTimeUtils, | ||
|
|
@@ -234,5 +256,7 @@ object RowEncoder { | |
| Invoke(row, "getArray", dataType, Literal(ordinal) :: Nil) | ||
| case _: MapType => | ||
| Invoke(row, "getMap", dataType, Literal(ordinal) :: Nil) | ||
| case udt: UserDefinedType[_] => | ||
| getField(row, ordinal, udt.sqlType) | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use
schemaForto get a catalyst DataType for udf's return type. ForProducttype, we return aStructTypenow. That causes a problem inRowEncoderbecauseRowEncoderwill try to get aRownot aProductfor a field ofStructType. You will get a casting exception if your udf returns something like(1, 2).The problem is a field of
StructTypein aRowcan be aProductor aRow. I modified thegetStructmethod inRowto turn aRowfor aProduct.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to update the javadoc of
Rowto say thatProductis also a valid value type ofStructType.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.