Skip to content

Commit f02ffad

Browse files
KnightChessyihua
authored andcommitted
[HUDI-4910] fix unknown variable or type "Cast"
1 parent 7cc614c commit f02ffad

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.avro.AvroSerializer
2525
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
2727
import org.apache.spark.sql.catalyst.expressions.codegen._
28-
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, LeafExpression, UnsafeArrayData, UnsafeMapData, UnsafeRow}
28+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast, Expression, GenericInternalRow, LeafExpression, UnsafeArrayData, UnsafeMapData, UnsafeRow}
2929
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
3030
import org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen.RECORD_NAME
3131
import org.apache.spark.sql.types.{DataType, Decimal}
@@ -122,7 +122,8 @@ object ExpressionCodeGen extends Logging {
122122
classOf[IndexedRecord].getName,
123123
classOf[AvroSerializer].getName,
124124
classOf[GenericRecord].getName,
125-
classOf[GenericInternalRow].getName
125+
classOf[GenericInternalRow].getName,
126+
classOf[Cast].getName
126127
)
127128
evaluator.setImplementedInterfaces(Array(classOf[IExpressionEvaluator]))
128129
try {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,4 +673,44 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
673673
)
674674
}
675675
}
676+
677+
test ("Test Merge into with String cast to Double") {
678+
withTempDir { tmp =>
679+
val tableName = generateTableName
680+
// Create a mor partitioned table.
681+
spark.sql(
682+
s"""
683+
| create table $tableName (
684+
| id int,
685+
| name string,
686+
| price double,
687+
| ts long,
688+
| dt string
689+
| ) using hudi
690+
| tblproperties (
691+
| type = 'cow',
692+
| primaryKey = 'id',
693+
| preCombineField = 'ts'
694+
| )
695+
| partitioned by(dt)
696+
| location '${tmp.getCanonicalPath}'
697+
""".stripMargin)
698+
// Insert data
699+
spark.sql(s"insert into $tableName select 1, 'a1', 10.0, 999, '2021-03-21'")
700+
spark.sql(
701+
s"""
702+
| merge into $tableName as t0
703+
| using (
704+
| select 'a1' as name, 1 as id, '10.1' as price, 1000 as ts, '2021-03-21' as dt
705+
| ) as s0
706+
| on t0.id = s0.id
707+
| when matched then update set t0.price = s0.price, t0.ts = s0.ts
708+
| when not matched then insert *
709+
""".stripMargin
710+
)
711+
checkAnswer(s"select id,name,price,dt from $tableName")(
712+
Seq(1, "a1", 10.1, "2021-03-21")
713+
)
714+
}
715+
}
676716
}

0 commit comments

Comments
 (0)